Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removal logic for fuzzy / exact (no class abstraction) #509

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
89b9005
fc
praateekmahajan Jan 28, 2025
37f6bee
add shuffle/ tests
praateekmahajan Jan 30, 2025
69c8955
more test for class
praateekmahajan Jan 30, 2025
de25476
pre-commit
praateekmahajan Jan 30, 2025
a698bf0
remove class abstractions
praateekmahajan Jan 31, 2025
a2e0c42
remove unused import
praateekmahajan Jan 31, 2025
845cae3
add __call__ methods back
praateekmahajan Jan 31, 2025
2a1da6b
change from modules / update docs
praateekmahajan Feb 3, 2025
48bef03
add tests
praateekmahajan Feb 4, 2025
958161d
update blocksize to 1024 in exact
praateekmahajan Feb 4, 2025
7275609
pr suggestions
praateekmahajan Feb 5, 2025
cba7fcd
warning
praateekmahajan Feb 5, 2025
bcb7cea
Update docs/user-guide/gpudeduplication.rst
praateekmahajan Feb 6, 2025
c929927
Update docs/user-guide/gpudeduplication.rst
praateekmahajan Feb 6, 2025
0afd1a1
Update docs/user-guide/gpudeduplication.rst
praateekmahajan Feb 6, 2025
6f1e4d9
Update examples/exact_deduplication.py
praateekmahajan Feb 6, 2025
1347e37
Update examples/exact_deduplication.py
praateekmahajan Feb 6, 2025
2e3c908
Update examples/fuzzy_deduplication.py
praateekmahajan Feb 6, 2025
bc20a5d
Update examples/fuzzy_deduplication.py
praateekmahajan Feb 6, 2025
6e26edb
Update examples/fuzzy_deduplication.py
praateekmahajan Feb 6, 2025
8ba196a
Update nemo_curator/modules/config.py
praateekmahajan Feb 6, 2025
8936ac9
Update nemo_curator/modules/config.py
praateekmahajan Feb 6, 2025
e41c5fa
Update nemo_curator/modules/exact_dedup.py
praateekmahajan Feb 6, 2025
9c7f4bf
add file back
praateekmahajan Feb 6, 2025
fe6f018
merge
praateekmahajan Feb 6, 2025
7f0da3e
pre-commit
praateekmahajan Feb 6, 2025
b438c80
forgot to rename back to identify_duplicates after merge
praateekmahajan Feb 6, 2025
f8040b5
renmaed func in call
praateekmahajan Feb 6, 2025
82f0c6c
split code / read fpp=1
praateekmahajan Feb 7, 2025
bf5498f
Update docs/user-guide/gpudeduplication.rst
praateekmahajan Feb 7, 2025
f172c72
Update nemo_curator/modules/fuzzy_dedup/fuzzyduplicates.py
praateekmahajan Feb 7, 2025
2beca67
Update nemo_curator/modules/exact_dedup.py
praateekmahajan Feb 7, 2025
f8d89da
Merge branch 'main' into praateek/removal-code-no-abstraction
praateekmahajan Feb 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions docs/user-guide/gpudeduplication.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,19 @@ After ensuring your dataset has a unique ID field (or creating one with the code
from nemo_curator.datasets import DocumentDataset

# Initialize the deduplication object
ExactDups = ExactDuplicates(id_field="my_id", text_field="text")
exact_duplicates = ExactDuplicates(
id_field="my_id",
text_field="text",
perform_removal=True,
cache_dir="/path/to/dedup_outputs", # Recommended to specify a cache_dir if perform_removal=True
)

dataset = DocumentDataset.read_parquet(
input_files="/path/to/parquet/data",
backend="cudf", # or "pandas" for CPU
)

duplicate_docs = ExactDups(dataset)
# Users who have specified perform_removal=False can split as following
duplicate_docs = exact_duplicates.identify_duplicates(dataset)

"""
Sample output:
Expand All @@ -82,9 +87,14 @@ After ensuring your dataset has a unique ID field (or creating one with the code
107 doc_prefix-52271 0f763a2937d57b9d96bf9f220e55f2bd
"""

deduplicated_dataset = exact_duplicates.remove(dataset, duplicate_docs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also include the perform_removal option above?


# Users who have specified perform_removal=True can get the output deduplicated dataset directly as follows
# deduplicated_dataset = exact_duplicates(dataset)


.. tip::
A more comprehensive example, including how to remove documents from a corpus using the list of
duplicate IDs generated from the exact deduplication step above, can be found in `examples/exact_deduplication.py <https://github.com/NVIDIA/NeMo-Curator/blob/main/examples/exact_deduplication.py>`_.
A more comprehensive example, can be found in `examples/exact_deduplication.py <https://github.com/NVIDIA/NeMo-Curator/blob/main/examples/exact_deduplication.py>`_.

""""""""""""
CLI Utility
Expand Down Expand Up @@ -187,6 +197,7 @@ Python API
cache_dir="/path/to/dedup_outputs", # must be cleared between runs
id_field="my_id",
text_field="text",
perform_removal=False, # dictates if deduplicated dataset or IDs of duplicates are returned
seed=42,
char_ngrams=24,
num_buckets=20,
Expand All @@ -203,6 +214,7 @@ Python API
cache_dir: /path/to/dedup_outputs
id_field: my_id
text_field: text
perform_removal: False
seed: 42
char_ngrams: 24
num_buckets: 20
Expand All @@ -226,14 +238,15 @@ Python API
from nemo_curator.datasets import DocumentDataset

# Initialize the deduplication object
FuzzyDups = FuzzyDuplicates(config=config, logger="./")
fuzzy_duplicates = FuzzyDuplicates(config=config, logger="./")

dataset = DocumentDataset.read_json(
input_files="/path/to/jsonl/data",
backend="cudf", # FuzzyDuplicates only supports datasets with the cuDF backend.
)

duplicate_docs = FuzzyDups(dataset)
# Users who have specified perform_removal=False can split as following
duplicate_docs = fuzzy_duplicates.identify_duplicates(dataset)
"""
Sample output:
my_id group
Expand All @@ -244,10 +257,15 @@ Python API
4 doc_prefix-42050 154
"""

deduplicated_dataset = fuzzy_duplicates.remove(dataset, duplicate_docs)

# Users who have specified perform_removal=True can get the output deduplicated dataset directly as follows
# deduplicated_dataset = fuzzy_duplicates(dataset)


.. tip::

- A more comprehensive example for the above, including how to remove documents from a corpus using the list of
duplicate IDs generated from fuzzy deduplication, can be found in `examples/fuzzy_deduplication.py <https://github.com/NVIDIA/NeMo-Curator/blob/main/examples/fuzzy_deduplication.py>`_.
- A comprehensive example can be found in `examples/fuzzy_deduplication.py <https://github.com/NVIDIA/NeMo-Curator/blob/main/examples/fuzzy_deduplication.py>`_.
- The default values of ``num_buckets`` and ``hashes_per_bucket`` are set to find documents with an approximately Jaccard similarity of 0.8 or above.
- Higher ``buckets_per_shuffle`` values can lead to better performance but might lead to out of memory errors.
- Setting the ``false_positive_check`` flag to ``False`` is ideal for optimal performance.
Expand Down
32 changes: 14 additions & 18 deletions examples/exact_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

from nemo_curator.datasets import DocumentDataset
from nemo_curator.modules import ExactDuplicates
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.distributed_utils import get_client, write_to_disk
from nemo_curator.utils.script_utils import ArgumentHelper


Expand All @@ -40,36 +39,33 @@ def main(args):
client.run(pre_imports)

t0 = time.time()
input_dataset = DocumentDataset.read_json(dataset_dir, backend=backend)
input_dataset = DocumentDataset.read_json(
dataset_dir, backend=backend, blocksize="1GiB", files_per_partition=None
)

exact_dup = ExactDuplicates(
logger=log_dir,
id_field=dataset_id_field,
text_field=dataset_text_field,
# Decides whether output of the module is deduplicated dataset or duplicates
# If true, you should set cache_dir for performance improvement
perform_removal=False,
# cache_dir=output_dir # Optionally write the output to disk
)

duplicates = exact_dup(dataset=input_dataset)
# When perform_removal=False, it will only call .identify_duplicates() and return the list of duplicate IDs.
# When perform_removal=True, then exact_dup outputs the dataset with the duplicates removed.
# It will behave by calling .identify_duplicates() and .remove() in sequence.
duplicates = exact_dup(
dataset=input_dataset
) # or exact_dup.identify_duplicates(input_dataset)

# If caching, result is a path to the output dataset.
if isinstance(duplicates, str):
duplicates = DocumentDataset.read_parquet(duplicates, backend=backend)

# It's easy to apply dataframe operations to the dataset by using the underlying df.

# By default all duplicate id's are included in the result
# keep 1 document from each group of duplcates and mark the others to remove
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.duplicated.html
docs_to_remove = duplicates.df.map_partitions(
lambda x: x[x._hashes.duplicated(keep="first")]
)

# When there are few duplicates we can compute the results to a list and use `isin`.
result = input_dataset.df[
~input_dataset.df[dataset_id_field].isin(
docs_to_remove[dataset_id_field].compute()
)
]
result = exact_dup.remove(input_dataset, duplicates)
write_to_disk(result, output_dir, output_type="parquet")
print(time.time() - t0)

Expand Down
24 changes: 10 additions & 14 deletions examples/fuzzy_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def main(args):
cache_dir=cache_dir,
id_field=dataset_id_field,
text_field=dataset_text_field,
# Decides whether output of the module is a deduplicated dataset or the IDs of the duplicates
perform_removal=False,
seed=42,
char_ngrams=24,
num_buckets=20,
Expand All @@ -77,26 +79,20 @@ def main(args):
false_positive_check=False,
)
fuzzy_dup = FuzzyDuplicates(logger=log_dir, config=fuzzy_dedup_config)
duplicates = fuzzy_dup(dataset=input_dataset)

# When perform_removal=False, it will only call .identify_duplicates() and return the list of duplicate IDs.
# When perform_removal=True, then exact_dup outputs the dataset with the duplicates removed.
# It will behave by calling .identify_duplicates() and .remove() in sequence.
duplicates = fuzzy_dup(
dataset=input_dataset
) # or fuzzy_dup.identify_duplicates(input_dataset)

if duplicates is None:
print("No duplicates found")
print(f"Time taken:{time.time() - t0}s")
return

# By default all duplicate id's and the group they belong to are included in the result
# keep 1 document from each group of duplcates and mark the others to remove
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.duplicated.html
docs_to_remove = duplicates.df.map_partitions(
lambda x: x[x.group.duplicated(keep="first")]
)

# When there are few duplicates we can compute the results to a list and use `isin`.
result = input_dataset.df[
~input_dataset.df[dataset_id_field].isin(
docs_to_remove[dataset_id_field].compute()
)
]
result = fuzzy_dup.remove(input_dataset, duplicates)
write_to_disk(result, output_dir, output_type=filetype)
print(f"Time taken:{time.time() - t0}s")

Expand Down
8 changes: 8 additions & 0 deletions nemo_curator/modules/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class FuzzyDuplicatesConfig(BaseConfig):
but might lead to memory pressures and related errors.
id_field: Column in the Dataset denoting document ID.
text_field: Column in the Dataset denoting document content.
perform_removal: Boolean value to specify whether calling the module should remove the duplicates from
the original dataset, or return the list of IDs denoting duplicates.
profile_dir: str, Default None
If specified directory to write dask profile
cache_dir: str, Default None
Expand All @@ -64,6 +66,7 @@ class FuzzyDuplicatesConfig(BaseConfig):
profile_dir: Optional[str] = None
id_field: str = "id"
text_field: str = "text"
perform_removal: bool = False

# Minhash + LSH Config
seed: int = 42
Expand Down Expand Up @@ -131,6 +134,11 @@ def __post_init__(self):
if not 1 <= self.buckets_per_shuffle <= self.num_buckets:
raise ValueError("Buckets per shuffle must be between [1, num_buckets]")

if not self.perform_removal:
warnings.warn(
"In future releases (starting with 0.8.0) the default will be True."
)


@dataclass
class SemDedupConfig(BaseConfig):
Expand Down
53 changes: 45 additions & 8 deletions nemo_curator/modules/exact_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import time
import warnings
from contextlib import nullcontext
from datetime import datetime
from hashlib import md5
from typing import Optional, Union

Expand All @@ -31,6 +30,7 @@
from nemo_curator.log import create_logger
from nemo_curator.modules.base import BaseModule
from nemo_curator.utils.distributed_utils import performance_report_if_with_ts_suffix
from nemo_curator.utils.duplicates_removal import remove_duplicates
from nemo_curator.utils.gpu_utils import is_cudf_type


Expand All @@ -45,6 +45,7 @@ def __init__(
id_field: str = "id",
text_field: str = "text",
hash_method: str = "md5",
perform_removal: bool = False,
profile_dir: Optional[str] = None,
cache_dir: Optional[str] = None,
):
Expand All @@ -66,9 +67,17 @@ def __init__(
raise ValueError(
f"{hash_method} not in supported hash_methods. Choose a hash_method from {self.SUPPORTED_HASHES}"
)

self.hash_method = hash_method
self.id_field = id_field
self.text_field = text_field
self.perform_removal = perform_removal
if not self.perform_removal:
warnings.warn(
"In future releases (starting with 0.8.0) the default will be True."
)
if self.perform_removal and cache_dir is None:
warnings.warn("cache_dir is recommended to remove duplicates.")
if cache_dir is None and profile_dir is not None:
warnings.warn(
"cache_dir for intermediate outputs is required to generate profiles"
Expand Down Expand Up @@ -137,7 +146,7 @@ def hash_documents(
# TODO: Generalize ty using self.hash_method
return df.apply(lambda x: md5(x.encode()).hexdigest())

def call(self, dataset: DocumentDataset) -> Union[DocumentDataset, str]:
def identify_duplicates(self, dataset: DocumentDataset) -> DocumentDataset:
"""
Find document ID's for exact duplicates in a given DocumentDataset
Parameters
Expand Down Expand Up @@ -168,10 +177,38 @@ def call(self, dataset: DocumentDataset) -> Union[DocumentDataset, str]:
self._logger.info(
f"Time taken for Exact Dedup Computation = {time.time() - t0}s and output written at {write_path}"
)
if is_cudf_type(result):
import dask_cudf
backend = "cudf" if is_cudf_type(result) else "pandas"
return DocumentDataset.read_parquet(
ayushdg marked this conversation as resolved.
Show resolved Hide resolved
write_path,
backend=backend,
# We read with files_per_partition=1 so that groups are read in whole (and do not exist across partitions)
files_per_partition=1,
blocksize=None,
)

result_dataset = dask_cudf.read_parquet(write_path, split_row_groups=False)
else:
result_dataset = dd.read_parquet(write_path)
return DocumentDataset(result_dataset)
def remove(
self, dataset: DocumentDataset, duplicates_to_remove: Optional[DocumentDataset]
) -> DocumentDataset:
"""
Remove exact duplicates from a given DocumentDataset
Parameters
----------
dataset: DocumentDataset
The input datset to remove exact duplicates
Returns
-------
DocumentDataset containing only non-duplicate documents
"""
result = remove_duplicates(
left=dataset.df,
duplicates=duplicates_to_remove.df,
id_field=self.id_field,
group_field="_hashes",
)
return DocumentDataset(result)

def call(self, dataset: DocumentDataset) -> DocumentDataset:
duplicates = self.identify_duplicates(dataset)
if self.perform_removal:
return self.remove(dataset, duplicates)
return duplicates
Loading