From deca1606c889c9e000e9338fc2b99999ad27b3a6 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Wed, 10 Jul 2024 12:49:44 -0700 Subject: [PATCH] Default to using larger batches Signed-off-by: Ayush Dattagupta --- nemo_curator/modules/fuzzy_dedup.py | 2 +- nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py | 3 +++ nemo_curator/utils/distributed_utils.py | 2 ++ nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py | 2 +- 4 files changed, 7 insertions(+), 2 deletions(-) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index deeae5c86..ff7691d68 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -755,7 +755,7 @@ def _get_output_map_from_text_bytes_per_bucket( ): # String bytes limit for cuDF # https://github.com/rapidsai/cudf/issues/13733 - max_text_bytes_per_part = int(np.iinfo(np.int32).max * 1.2) + max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3) self._logger.info(f"max_text_bytes_per_part = {max_text_bytes_per_part}") # Increasing in an attempt to prevent hitting diff --git a/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py b/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py index 2add7587f..8f36a0a6f 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py +++ b/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py @@ -69,6 +69,7 @@ def main(args): print(f"Processed {args.num_files}... quitting") break + print(args.input_meta) files = get_all_files_paths_under(root=data_path, recurse_subdirectories=False) files = [f for f in files if f.endswith(".jsonl")] df = read_data( @@ -77,6 +78,7 @@ def main(args): backend="cudf", files_per_partition=args.files_per_partition, add_filename=False, + input_meta=args.input_meta, )[[id_field, text_field]] if num_files is not None: @@ -119,6 +121,7 @@ def attach_args(parser=None): help="Random seed used for intializing the hash " "functions used to compute the MinHashes" ) + argumentHelper.add_arg_input_meta() parser.add_argument( "--char-ngram", type=int, diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index ea9a2920b..dabda543e 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -237,6 +237,8 @@ def read_single_partition( read_kwargs = {"lines": filetype == "jsonl"} if backend == "cudf": read_f = cudf.read_json + if input_meta is not None: + read_kwargs["prune_columns"] = True else: read_kwargs["dtype"] = False read_f = pd.read_json diff --git a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py index 323fe7e81..20f15e787 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py @@ -83,7 +83,7 @@ def get_shuffle_part_ids_df( num_workers=0, ): sizes = agg_df[size_col].values - max_text_bytes_per_part = int(np.iinfo(np.int32).max // 1.2) + max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3) # Adjust max_text_bytes_per_part if the number of output # partitions is small compared to the number of workers.