From 762a670627e6da3164d1d664d03f3753c28464a6 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Fri, 6 Sep 2024 16:29:19 -0700 Subject: [PATCH] Update Fuzzy dedup params for long strings support. (#77) * Expose more configurations to test long string support Signed-off-by: Ayush Dattagupta * Export libcudf env for long string support Signed-off-by: Ayush Dattagupta * Default to using larger batches Signed-off-by: Ayush Dattagupta * Remove large strings env variable since it's enabled by default in 24.8 and above Signed-off-by: Ayush Dattagupta * Remove debug print, filter nulls before bucketing Signed-off-by: Ayush Dattagupta * Remove hardcoded id field Signed-off-by: Ayush Dattagupta --------- Signed-off-by: Ayush Dattagupta --- nemo_curator/modules/config.py | 3 +++ nemo_curator/modules/fuzzy_dedup.py | 8 ++++---- .../scripts/fuzzy_deduplication/compute_minhashes.py | 2 ++ nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py | 1 + nemo_curator/utils/distributed_utils.py | 2 ++ nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py | 2 +- 6 files changed, 13 insertions(+), 5 deletions(-) diff --git a/nemo_curator/modules/config.py b/nemo_curator/modules/config.py index 1ef8a0fd5..f129e59d3 100644 --- a/nemo_curator/modules/config.py +++ b/nemo_curator/modules/config.py @@ -77,6 +77,9 @@ class FuzzyDuplicatesConfig(BaseConfig): # Only required for fp check num_anchors: int = 2 jaccard_threshold: float = 0.8 + bucket_mapping_blocksize: int = 256 + parts_per_worker: int = 1 + bucket_parts_per_worker: int = 8 def __post_init__(self): self.num_hashes = self.num_buckets * self.hashes_per_bucket diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 556911a96..ff7691d68 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -521,9 +521,9 @@ def __call__(self, dataset: DocumentDataset): documents_df=dataset.df, bucket_w_anchors_path=mapped_buckets_w_anchors_path, output_shuffled_docs_path=shuffled_docs_path, - bucket_mapping_df_blocksize=256, - parts_per_worker=1, - bucket_parts_per_worker=8, + bucket_mapping_df_blocksize=self.config.bucket_mapping_blocksize, + parts_per_worker=self.config.parts_per_worker, + bucket_parts_per_worker=self.config.bucket_parts_per_worker, ) print(f"Stage{stage_num} (False Postive Check): Shuffle docs complete!") stage_num += 1 @@ -755,7 +755,7 @@ def _get_output_map_from_text_bytes_per_bucket( ): # String bytes limit for cuDF # https://github.com/rapidsai/cudf/issues/13733 - max_text_bytes_per_part = int(np.iinfo(np.int32).max // 1.2) + max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3) self._logger.info(f"max_text_bytes_per_part = {max_text_bytes_per_part}") # Increasing in an attempt to prevent hitting diff --git a/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py b/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py index 2add7587f..7ec17815f 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py +++ b/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py @@ -77,6 +77,7 @@ def main(args): backend="cudf", files_per_partition=args.files_per_partition, add_filename=False, + input_meta=args.input_meta, )[[id_field, text_field]] if num_files is not None: @@ -119,6 +120,7 @@ def attach_args(parser=None): help="Random seed used for intializing the hash " "functions used to compute the MinHashes" ) + argumentHelper.add_arg_input_meta() parser.add_argument( "--char-ngram", type=int, diff --git a/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py b/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py index d9740f412..40c42ec92 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py +++ b/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py @@ -54,6 +54,7 @@ def main(args): dask_cudf.read_parquet(data_path, blocksize="2GB", aggregate_files=True) ) df = dask_cudf.concat(dfs, ignore_unknown_divisions=True) + df = df[~df.id_field.isna()] df = df.map_partitions( convert_str_id_to_int, id_column=id_field, diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index ea9a2920b..dabda543e 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -237,6 +237,8 @@ def read_single_partition( read_kwargs = {"lines": filetype == "jsonl"} if backend == "cudf": read_f = cudf.read_json + if input_meta is not None: + read_kwargs["prune_columns"] = True else: read_kwargs["dtype"] = False read_f = pd.read_json diff --git a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py index 323fe7e81..20f15e787 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py @@ -83,7 +83,7 @@ def get_shuffle_part_ids_df( num_workers=0, ): sizes = agg_df[size_col].values - max_text_bytes_per_part = int(np.iinfo(np.int32).max // 1.2) + max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3) # Adjust max_text_bytes_per_part if the number of output # partitions is small compared to the number of workers.