-
Notifications
You must be signed in to change notification settings - Fork 109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Removal logic for fuzzy / exact (no class abstraction) #509
Changes from 10 commits
89b9005
37f6bee
69c8955
de25476
a698bf0
a2e0c42
845cae3
2a1da6b
48bef03
958161d
7275609
cba7fcd
bcb7cea
c929927
0afd1a1
6f1e4d9
1347e37
2e3c908
bc20a5d
6e26edb
8ba196a
8936ac9
e41c5fa
9c7f4bf
fe6f018
7f0da3e
b438c80
f8040b5
82f0c6c
bf5498f
f172c72
2beca67
f8d89da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -18,7 +18,6 @@ | |||||
import time | ||||||
import warnings | ||||||
from contextlib import nullcontext | ||||||
from datetime import datetime | ||||||
from hashlib import md5 | ||||||
from typing import Optional, Union | ||||||
|
||||||
|
@@ -31,6 +30,7 @@ | |||||
from nemo_curator.log import create_logger | ||||||
from nemo_curator.utils.distributed_utils import performance_report_if_with_ts_suffix | ||||||
from nemo_curator.utils.gpu_utils import is_cudf_type | ||||||
from nemo_curator.utils.removal import remove_duplicates | ||||||
|
||||||
|
||||||
class ExactDuplicates: | ||||||
|
@@ -64,6 +64,7 @@ def __init__( | |||||
raise ValueError( | ||||||
f"{hash_method} not in supported hash_methods. Choose a hash_method from {self.SUPPORTED_HASHES}" | ||||||
) | ||||||
|
||||||
self.hash_method = hash_method | ||||||
self.id_field = id_field | ||||||
self.text_field = text_field | ||||||
|
@@ -135,7 +136,7 @@ def hash_documents( | |||||
# TODO: Generalize ty using self.hash_method | ||||||
return df.apply(lambda x: md5(x.encode()).hexdigest()) | ||||||
|
||||||
def __call__(self, dataset: DocumentDataset) -> Union[DocumentDataset, str]: | ||||||
def identify(self, dataset: DocumentDataset) -> DocumentDataset: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Nit, but maybe call them There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's keep them exposed especially since remove won't work at scales where size of duplicate >> host memory, in which case the user will need to break down identify and remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that makes sense to me. What about calling it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good, initially I thought it's slightly verbose, but another argument in favor of cc @ayushdg / @ryantwolf / @VibhuJawa |
||||||
""" | ||||||
Find document ID's for exact duplicates in a given DocumentDataset | ||||||
Parameters | ||||||
|
@@ -166,10 +167,42 @@ def __call__(self, dataset: DocumentDataset) -> Union[DocumentDataset, str]: | |||||
self._logger.info( | ||||||
f"Time taken for Exact Dedup Computation = {time.time() - t0}s and output written at {write_path}" | ||||||
) | ||||||
if is_cudf_type(result): | ||||||
import dask_cudf | ||||||
backend = "cudf" if is_cudf_type(result) else "pandas" | ||||||
return DocumentDataset.read_parquet( | ||||||
ayushdg marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
write_path, | ||||||
backend=backend, | ||||||
blocksize="1024MiB", | ||||||
ayushdg marked this conversation as resolved.
Show resolved
Hide resolved
ayushdg marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
files_per_partition=None, | ||||||
split_row_groups=False, | ||||||
) | ||||||
|
||||||
result_dataset = dask_cudf.read_parquet(write_path, split_row_groups=False) | ||||||
else: | ||||||
result_dataset = dd.read_parquet(write_path) | ||||||
return DocumentDataset(result_dataset) | ||||||
def remove( | ||||||
self, dataset: DocumentDataset, duplicates_to_remove: Optional[DocumentDataset] | ||||||
) -> DocumentDataset: | ||||||
""" | ||||||
Remove exact duplicates from a given DocumentDataset | ||||||
Parameters | ||||||
---------- | ||||||
dataset: DocumentDataset | ||||||
The input datset to remove exact duplicates | ||||||
Returns | ||||||
------- | ||||||
DocumentDataset containing only non-duplicate documents | ||||||
""" | ||||||
if not duplicates_to_remove: | ||||||
return None | ||||||
result = remove_duplicates( | ||||||
left=dataset.df, | ||||||
duplicates=duplicates_to_remove.df, | ||||||
id_field=self.id_field, | ||||||
group_field="_hashes", | ||||||
) | ||||||
return DocumentDataset(result) | ||||||
|
||||||
def __call__( | ||||||
self, dataset: DocumentDataset, perform_removal: bool = False | ||||||
praateekmahajan marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
) -> DocumentDataset: | ||||||
duplicates = self.identify(dataset) | ||||||
if duplicates and perform_removal: | ||||||
return self.remove(dataset, duplicates) | ||||||
return duplicates |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -125,8 +125,6 @@ def _run_connected_components( | |
f"# rows in labels_df = {len(labels_df)}" | ||
) | ||
assert num_nodes == len(labels_df) | ||
# Ensure all docs in the same group are in the same partition | ||
labels_df = labels_df.shuffle(on=["group"], ignore_index=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ayushdg we're doing this here |
||
labels_df.to_parquet(output_path, write_index=False, overwrite=True) | ||
Comms.destroy() | ||
self._logger.info( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe call it "duplicates_removal" or something similar? |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
import dask.dataframe as dd | ||
|
||
|
||
def remove_duplicates( | ||
left: dd.DataFrame, | ||
duplicates: dd.DataFrame, | ||
id_field: str, | ||
group_field: str, | ||
) -> dd.DataFrame: | ||
if left.npartitions < duplicates.npartitions: | ||
msg = ( | ||
"The number of partitions in `left` is less than the number of partitions in the duplicates dataset. " | ||
"This may lead to a shuffle join. Please re-read left and right with different partition sizes, or repartition left / right." | ||
) | ||
raise ValueError(msg) | ||
|
||
# Create a new column name for temporary ID storage during merge | ||
new_id_field = f"{id_field}_new" | ||
|
||
duplicates_to_remove = ( | ||
duplicates | ||
# Redistribute data across partitions so that all duplicates are in same partition | ||
.shuffle(on=[group_field], ignore_index=True) | ||
# For each partition, keep only the duplicated rows (excluding first occurrence) | ||
.map_partitions(lambda x: x[x[group_field].duplicated(keep="first")]).drop( | ||
columns=group_field | ||
) | ||
# Rename the ID field to avoid conflicts in the upcoming merge | ||
.rename(columns={id_field: new_id_field})[[new_id_field]] | ||
) | ||
|
||
merge = left.merge( | ||
right=duplicates_to_remove, | ||
how="left", | ||
broadcast=True, # Broadcast smaller DataFrame to all partitions | ||
left_on=id_field, | ||
right_on=new_id_field, | ||
) | ||
|
||
# This effectively removes all rows that were not in duplicates_to_remove | ||
removed_result = merge[merge[new_id_field].isna()].drop(columns=[new_id_field]) | ||
return removed_result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should also include the
perform_removal
option above?