diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 5ef465407..c487b3e97 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -602,7 +602,7 @@ def _get_output_map_from_text_bytes_per_bucket( ): # String bytes limit for cuDF # https://github.com/rapidsai/cudf/issues/13733 - max_text_bytes_per_part = int(np.iinfo(np.int32).max * 1.2) + max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3) self._logger.info(f"max_text_bytes_per_part = {max_text_bytes_per_part}") # Increasing in an attempt to prevent hitting diff --git a/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py b/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py index 01baee051..362d47c0a 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py +++ b/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py @@ -70,6 +70,7 @@ def main(args): print(f"Processed {args.num_files}... quitting") break + print(args.input_meta) files = get_all_files_paths_under(root=data_path, recurse_subdirectories=False) files = [f for f in files if f.endswith(".jsonl")] df = read_data( @@ -78,6 +79,7 @@ def main(args): backend="cudf", files_per_partition=args.files_per_partition, add_filename=False, + input_meta=args.input_meta, )[[id_field, text_field]] if num_files is not None: @@ -120,6 +122,7 @@ def attach_args(parser=None): help="Random seed used for intializing the hash " "functions used to compute the MinHashes" ) + argumentHelper.add_arg_input_meta() parser.add_argument( "--char-ngram", type=int, diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index ef69963cf..dff34b954 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -223,6 +223,8 @@ def read_single_partition( read_kwargs = {"lines": filetype == "jsonl"} if backend == "cudf": read_f = cudf.read_json + if input_meta is not None: + read_kwargs["prune_columns"] = True else: read_kwargs["dtype"] = False read_f = pd.read_json diff --git a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py index 9d1915603..4567f6e26 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py @@ -83,7 +83,7 @@ def get_shuffle_part_ids_df( num_workers=0, ): sizes = agg_df[size_col].values - max_text_bytes_per_part = int(np.iinfo(np.int32).max // 1.2) + max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3) # Adjust max_text_bytes_per_part if the number of output # partitions is small compared to the number of workers.