Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Fuzzy dedup params for long strings support. #77

Merged
merged 6 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions nemo_curator/modules/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class FuzzyDuplicatesConfig(BaseConfig):
# Only required for fp check
num_anchors: int = 2
jaccard_threshold: float = 0.8
bucket_mapping_blocksize: int = 256
parts_per_worker: int = 1
bucket_parts_per_worker: int = 8

def __post_init__(self):
self.num_hashes = self.num_buckets * self.hashes_per_bucket
Expand Down
8 changes: 4 additions & 4 deletions nemo_curator/modules/fuzzy_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,9 +521,9 @@ def __call__(self, dataset: DocumentDataset):
documents_df=dataset.df,
bucket_w_anchors_path=mapped_buckets_w_anchors_path,
output_shuffled_docs_path=shuffled_docs_path,
bucket_mapping_df_blocksize=256,
parts_per_worker=1,
bucket_parts_per_worker=8,
bucket_mapping_df_blocksize=self.config.bucket_mapping_blocksize,
parts_per_worker=self.config.parts_per_worker,
bucket_parts_per_worker=self.config.bucket_parts_per_worker,
)
print(f"Stage{stage_num} (False Postive Check): Shuffle docs complete!")
stage_num += 1
Expand Down Expand Up @@ -755,7 +755,7 @@ def _get_output_map_from_text_bytes_per_bucket(
):
# String bytes limit for cuDF
# https://github.com/rapidsai/cudf/issues/13733
max_text_bytes_per_part = int(np.iinfo(np.int32).max // 1.2)
max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3)
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved

self._logger.info(f"max_text_bytes_per_part = {max_text_bytes_per_part}")
# Increasing in an attempt to prevent hitting
Expand Down
2 changes: 2 additions & 0 deletions nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def main(args):
backend="cudf",
files_per_partition=args.files_per_partition,
add_filename=False,
input_meta=args.input_meta,
)[[id_field, text_field]]

if num_files is not None:
Expand Down Expand Up @@ -119,6 +120,7 @@ def attach_args(parser=None):
help="Random seed used for intializing the hash "
"functions used to compute the MinHashes"
)
argumentHelper.add_arg_input_meta()
parser.add_argument(
"--char-ngram",
type=int,
Expand Down
1 change: 1 addition & 0 deletions nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def main(args):
dask_cudf.read_parquet(data_path, blocksize="2GB", aggregate_files=True)
)
df = dask_cudf.concat(dfs, ignore_unknown_divisions=True)
df = df[~df.id_field.isna()]
df = df.map_partitions(
convert_str_id_to_int,
id_column=id_field,
Expand Down
2 changes: 2 additions & 0 deletions nemo_curator/utils/distributed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ def read_single_partition(
read_kwargs = {"lines": filetype == "jsonl"}
if backend == "cudf":
read_f = cudf.read_json
if input_meta is not None:
read_kwargs["prune_columns"] = True
Comment on lines +240 to +241
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should move away from input_meta in favor of a keyword like dtype (like Pandas' and cuDF's read_json) and having the user configure prune_columns themselves?

I think this would align with #50 too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm generally in favor of overhauling the IO helpers in the current setup for something better. When we tackle #50. I'll share more thoughts there, but moving to encouraging users using the read_xyz api's is easier.
We can then have a common helper that based on the filetype directs to the relevant read_xyz api rather than the other way around where read_json goes to a common read method that handles different formats.

Regarding: prune_columns specifically: This change is important in newer versions of rapids because many public datasets like rpv1 do not have consistent metadata across all their files. If we do not prune columns to just ID & Text, cuDF will now fail with inconsistent metadata errors.

else:
read_kwargs["dtype"] = False
read_f = pd.read_json
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def get_shuffle_part_ids_df(
num_workers=0,
):
sizes = agg_df[size_col].values
max_text_bytes_per_part = int(np.iinfo(np.int32).max // 1.2)
max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3)
ayushdg marked this conversation as resolved.
Show resolved Hide resolved

# Adjust max_text_bytes_per_part if the number of output
# partitions is small compared to the number of workers.
Expand Down