Skip to content

Commit

Permalink
Add option to skip false positive checks during Fuzzy Deduplication (#…
Browse files Browse the repository at this point in the history
…199)

* Initial logic for buckets to cc input

Signed-off-by: Ayush Dattagupta <[email protected]>

* Update Fuzzy Duplicates to use no fp check codepath

Signed-off-by: Ayush Dattagupta <[email protected]>

* Allow no fp check in fuzzy dedup config

Signed-off-by: Ayush Dattagupta <[email protected]>

* update semdedup test class name

Signed-off-by: Ayush Dattagupta <[email protected]>

* Rename class and method names

Signed-off-by: Ayush Dattagupta <[email protected]>

* Update tests

Signed-off-by: Ayush Dattagupta <[email protected]>

* Rename deprecated shuffle arg to shuffle_method

Signed-off-by: Ayush Dattagupta <[email protected]>

* Use Pairwise instead of Combinations

Co-authored-by: yury-tokpanov <[email protected]>
Signed-off-by: Ayush Dattagupta <[email protected]>

* Fix bad rebase

Signed-off-by: Ayush Dattagupta <[email protected]>

* Fix type annotations and update __init__

Signed-off-by: Ayush Dattagupta <[email protected]>

* Add false positive % in warning

Signed-off-by: Ayush Dattagupta <[email protected]>

* Add CLI script for buckets to edges

Signed-off-by: Ayush Dattagupta <[email protected]>

* Apply suggestions from code review

Co-authored-by: Sarah Yurick <[email protected]>
Signed-off-by: Ayush Dattagupta <[email protected]>

* Update use of groupby collect with agg(list)

Signed-off-by: Ayush Dattagupta <[email protected]>

---------

Signed-off-by: Ayush Dattagupta <[email protected]>
Co-authored-by: yury-tokpanov <[email protected]>
Co-authored-by: Sarah Yurick <[email protected]>
  • Loading branch information
3 people authored Sep 6, 2024
1 parent ad13c61 commit 982e7ec
Show file tree
Hide file tree
Showing 6 changed files with 383 additions and 78 deletions.
4 changes: 4 additions & 0 deletions nemo_curator/modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
FuzzyDuplicates = gpu_only_import_from(
"nemo_curator.modules.fuzzy_dedup", "FuzzyDuplicates"
)
BucketsToEdges = gpu_only_import_from(
"nemo_curator.modules.fuzzy_dedup", "BucketsToEdges"
)
# Pytorch related imports must come after all imports that require cugraph,
# because of context cleanup issues b/w pytorch and cugraph
# See this issue: https://github.com/rapidsai/cugraph/issues/2718
Expand All @@ -55,6 +58,7 @@
"Filter",
"FuzzyDuplicatesConfig",
"FuzzyDuplicates",
"BucketsToEdges",
"LSH",
"MinHash",
"Modify",
Expand Down
12 changes: 9 additions & 3 deletions nemo_curator/modules/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,15 @@ def __post_init__(self):
raise ValueError(
"Finding fuzzy duplicates requires a cache directory accessible via all workers to store intermediates"
)
if not self.false_positive_check:
raise NotImplementedError(
"Skipping false positive checks is not supported at the moment"
if self.false_positive_check:
warnings.warn(
"Identifying false positives during the Minhash deduplication is computationally expensive."
" For improved performance consider setting this to False"
)
if not self.false_positive_check and self.char_ngrams < 20:
warnings.warn(
"Using a small char_ngrams value might lead to a large number (~5%) of false positives during deduplication."
" Using a value of at least 20 for char_ngrams is recommended."
)
if self.num_anchors <= 0:
raise ValueError("Number of anchors must be greater than 0")
Expand Down
287 changes: 220 additions & 67 deletions nemo_curator/modules/fuzzy_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import time
import warnings
from datetime import datetime
from itertools import pairwise
from typing import List, Tuple, Union

import cudf
Expand All @@ -27,6 +28,8 @@
import cupy as cp
import dask_cudf
import numpy as np
import pandas as pd
import pyarrow as pa
from cugraph import MultiGraph
from dask import dataframe as dd
from dask.dataframe.shuffle import shuffle as dd_shuffle
Expand Down Expand Up @@ -431,32 +434,44 @@ def __init__(
id_fields=[self.config.id_field],
profile_dir=self.config.profile_dir,
)
self.map_buckets = _MapBuckets(
id_fields=[self.config.id_field],
text_field=self.config.text_field,
logger=self._logger,
num_anchors=self.config.num_anchors,
)
self.jaccard_shuffle = _Shuffle(
id_fields=[self.config.id_field],
text_field=self.config.text_field,
logger=self._logger,
profile_dir=self.config.profile_dir,
)
self.jaccard_compute = JaccardSimilarity(
id_field=self.config.id_field,
text_field=self.config.text_field,
ngram_width=self.config.char_ngrams,
anchor_id_fields=[
f"anchor_{i}_{self.config.id_field}"
for i in range(self.config.num_anchors)
],

if self.config.false_positive_check:
self.map_buckets = _MapBuckets(
id_fields=[self.config.id_field],
text_field=self.config.text_field,
logger=self._logger,
num_anchors=self.config.num_anchors,
)
self.jaccard_shuffle = _Shuffle(
id_fields=[self.config.id_field],
text_field=self.config.text_field,
logger=self._logger,
profile_dir=self.config.profile_dir,
)
self.jaccard_compute = JaccardSimilarity(
id_field=self.config.id_field,
text_field=self.config.text_field,
ngram_width=self.config.char_ngrams,
anchor_id_fields=[
f"anchor_{i}_{self.config.id_field}"
for i in range(self.config.num_anchors)
],
)
else:
self.buckets_to_edges = BucketsToEdges(
cache_dir=self.config.cache_dir,
id_fields=self.config.id_field,
logger=self._logger,
)

jaccard_pairs_fname = (
"jaccard_similarity_results.parquet"
if self.config.false_positive_check
else "_edges.parquet"
)
self.connected_components = ConnectedComponents(
cache_dir=self.config.cache_dir,
jaccard_pairs_path=os.path.join(
self.config.cache_dir, "jaccard_similarity_results.parquet"
),
jaccard_pairs_path=os.path.join(self.config.cache_dir, jaccard_pairs_fname),
id_column=self.config.id_field,
convert_str_ids=False,
jaccard_threshold=self.config.jaccard_threshold,
Expand All @@ -475,62 +490,200 @@ def __call__(self, dataset: DocumentDataset):
they belong to. Documents in the same group are near duplicates.
"""
# Minhash + LSH
print("Stage1: Starting Minhash + LSH computation")
stage_num = 1
print(f"Stage{stage_num}: Starting Minhash + LSH computation")
minhashLSH = Sequential([self.minhash, self.lsh])
buckets_df = minhashLSH(dataset)
print("Stage1: Minhash + LSH complete!")
print(f"Stage{stage_num}: Minhash + LSH complete!")
stage_num += 1

if self.config.false_positive_check:
# Map buckets to lower cardinality distribution
print(f"Stage{stage_num} (False Positive Check): Starting Map_Buckets")
ddf_mapped_buckets_w_anchors = self.map_buckets.map_buckets_with_anchors(
documents_df=dataset.df, buckets_df=buckets_df.df
)
mapped_buckets_w_anchors_path = os.path.join(
self.config.cache_dir, "anchor_docs_with_bk.parquet"
)
ddf_mapped_buckets_w_anchors.to_parquet(
mapped_buckets_w_anchors_path, write_index=False
)
print(f"Stage{stage_num} (False Postive Check): Map_Buckets Complete!")
stage_num += 1

# Map buckets to lower cardinality distribution
print("Stage2 (False Postive Check): Starting Map_Buckets")
ddf_mapped_buckets_w_anchors = self.map_buckets.map_buckets_with_anchors(
documents_df=dataset.df, buckets_df=buckets_df.df
)
mapped_buckets_w_anchors_path = os.path.join(
self.config.cache_dir, "anchor_docs_with_bk.parquet"
)
ddf_mapped_buckets_w_anchors.to_parquet(
mapped_buckets_w_anchors_path, write_index=False
)
print("Stage2 (False Postive Check): Map_Buckets Complete!")
# Shuffle documents based on mapped buckets
print(f"Stage{stage_num} (False Postive Check): Shuffle docs")
shuffled_docs_path = os.path.join(
self.config.cache_dir, "shuffled_docs.parquet"
)
self.jaccard_shuffle.shuffle_docs_on_buckets(
documents_df=dataset.df,
bucket_w_anchors_path=mapped_buckets_w_anchors_path,
output_shuffled_docs_path=shuffled_docs_path,
bucket_mapping_df_blocksize=256,
parts_per_worker=1,
bucket_parts_per_worker=8,
)
print(f"Stage{stage_num} (False Postive Check): Shuffle docs complete!")
stage_num += 1

# Shuffle documents based on mapped buckets
print("Stage3 (False Postive Check): Shuffle docs")
shuffled_docs_path = os.path.join(
self.config.cache_dir, "shuffled_docs.parquet"
)
self.jaccard_shuffle.shuffle_docs_on_buckets(
documents_df=dataset.df,
bucket_w_anchors_path=mapped_buckets_w_anchors_path,
output_shuffled_docs_path=shuffled_docs_path,
bucket_mapping_df_blocksize=256,
parts_per_worker=1,
bucket_parts_per_worker=8,
)
print("Stage3 (False Postive Check): Shuffle docs complete!")
# jaccard comparision within buckets
print(
f"Stage{stage_num} (False Postive Check): Jaccard Similarity in Buckets"
)
jaccard_pairs_path = os.path.join(
self.config.cache_dir, "jaccard_similarity_results.parquet"
)
jaccard_pairs_df = self.jaccard_compute.jaccard_compute(
shuffled_docs_path=shuffled_docs_path
)
jaccard_pairs_df.to_parquet(
jaccard_pairs_path,
write_index=False,
write_metadata_file=False,
)
print(
f"Stage{stage_num} (False Postive Check): Jaccard Similarity in Buckets Complete!"
)
stage_num += 1

# jaccard comparision within buckets
print("Stage4 (False Postive Check): Jaccard Similarity in Buckets")
jaccard_pairs_path = os.path.join(
self.config.cache_dir, "jaccard_similarity_results.parquet"
)
jaccard_pairs_df = self.jaccard_compute.jaccard_compute(
shuffled_docs_path=shuffled_docs_path
)
jaccard_pairs_df.to_parquet(
jaccard_pairs_path,
write_index=False,
write_metadata_file=False,
)
print("Stage4 (False Postive Check): Jaccard Similarity in Buckets Complete!")
else:
# Map buckets to lower cardinality distribution
print(f"Stage{stage_num}: Starting LSH Buckets to Graph edgelist")
self.buckets_to_edges(buckets_df)
print(f"Stage{stage_num}: Starting LSH Buckets to Graph edgelist Complete!")
stage_num += 1

# Connected components across buckets
print("Stage5: Connected Components across buckets")
print(f"Stage{stage_num}: Connected Components across buckets")
cc_path = os.path.join(self.config.cache_dir, "connected_components.parquet")
self.connected_components.cc_workflow(cc_path)
print("Stage5: Connected Components across buckets complete!")
print(f"Stage{stage_num}: Connected Components across buckets complete!")
stage_num += 1

return DocumentDataset(dask_cudf.read_parquet(cc_path, split_row_groups=False))


class BucketsToEdges:
"""
Maps buckets generated from LSH into an edgelist that
can be processed further by Connected Components to find duplicate
documents
"""

def __init__(
self,
cache_dir: str = None,
id_fields: Union[list, str] = "id",
str_id_name: str = "id",
bucket_field: str = "_bucket_id",
logger: Union[logging.LoggerAdapter, str] = "./",
):
"""
Parameters
----------
cache_dir: str or None
If specified, will compute & write the edgelist to a file
id_fields: list or str
id fields of documents in buckets_df
str_id_name: str
Ignored if there is a single id field. Multiple id fields
will be combined into a single id field with the given name.
bucket_field: str
Column denoting bucket ID
num_buckets: Number of bands/buckets to create from the minhash signature.
Hashes_per_signature = num_hashes / num_buckets
"""
self.cache_dir = cache_dir
self.id_fields = [id_fields] if isinstance(id_fields, str) else id_fields
self.str_id_name = str_id_name if len(self.id_fields) > 1 else self.id_fields[0]
self.output_ids = [f"{self.str_id_name}_x", f"{self.str_id_name}_y"]
self.bucket_field = bucket_field
if isinstance(logger, str):
self._logger = create_logger(
rank=0,
log_file=os.path.join(logger, "Buckets_to_Edges.log"),
name="Buckets_to_Edges",
)
else:
self._logger = logger

@staticmethod
def _combine_multiple_ids(
input_df: cudf.DataFrame, input_id_fields: list, output_id_field: str
) -> cudf.DataFrame:
if output_id_field in input_df.columns:
raise ValueError(
f"Input df already contains column named: {output_id_field}"
)

output_df = input_df.copy()[input_df.columns.difference(input_id_fields)]

output_df[output_id_field] = input_df[input_id_fields[0]].astype(str)
for input_field in input_id_fields[1:]:
output_df[output_id_field] = output_df[output_id_field] = (
input_df[input_id_fields[0]].astype(str)
+ "-"
+ input_df[input_field].astype(str)
)

return output_df

def buckets_to_edges(
self,
buckets_df: cudf.DataFrame,
) -> cudf.DataFrame:

grouped_buckets = (
buckets_df.groupby(self.bucket_field)[self.str_id_name]
.agg(list)
.list.sort_values()
)
bucket_docs = grouped_buckets.to_arrow().to_pylist()
edges = []
# Create pairs of all documents within a bucket since they are near duplicates
# Effectively create a edge list of all near duplicate documents
for bucket_doc in bucket_docs:
edges.extend(pairwise(bucket_doc))
edges = pd.DataFrame(edges, columns=self.output_ids)
edges = pa.Table.from_pandas(edges)
result_df = cudf.DataFrame.from_arrow(edges)
del edges
result_df = result_df.drop_duplicates(self.output_ids).reset_index(drop=True)
result_df["jaccard"] = np.float32(1.0)
return result_df

def __call__(self, dataset: DocumentDataset) -> DocumentDataset:
buckets_df = dataset.df
if len(self.id_fields) > 1:
buckets_df = buckets_df.map_partitions(
BucketsToEdges._combine_multiple_ids,
input_id_fields=self.id_fields,
output_id_field=self.str_id_name,
)

meta = [(output_id, str) for output_id in self.output_ids]
meta.append(("jaccard", np.float32))
edges_df = buckets_df.map_partitions(self.buckets_to_edges, meta=meta)

if self.cache_dir is None:
return DocumentDataset(edges_df)

write_path = os.path.join(self.cache_dir, "_edges.parquet")
if os.path.exists(write_path):
warnings.warn(
f"Output path {write_path} already exists and will be overwritten"
)
t0 = time.time()
edges_df.to_parquet(write_path, write_index=False, overwrite=True)
self._logger.info(f"Converted buckets to edgelist took {time.time() - t0} s")

return DocumentDataset(
dask_cudf.read_parquet(write_path, split_row_groups=False)
)


class _MapBuckets:
"""
buckets to a logical partition by using a modified bin packing algorithm.
Expand Down
Loading

0 comments on commit 982e7ec

Please sign in to comment.