From 53a86caf279dd5157c4b10bb447865914aebdb98 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Tue, 21 May 2024 10:27:01 -0700 Subject: [PATCH 1/6] Expose more configurations to test long string support Signed-off-by: Ayush Dattagupta --- nemo_curator/modules/config.py | 3 +++ nemo_curator/modules/fuzzy_dedup.py | 8 ++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/nemo_curator/modules/config.py b/nemo_curator/modules/config.py index 1ef8a0fd5..f129e59d3 100644 --- a/nemo_curator/modules/config.py +++ b/nemo_curator/modules/config.py @@ -77,6 +77,9 @@ class FuzzyDuplicatesConfig(BaseConfig): # Only required for fp check num_anchors: int = 2 jaccard_threshold: float = 0.8 + bucket_mapping_blocksize: int = 256 + parts_per_worker: int = 1 + bucket_parts_per_worker: int = 8 def __post_init__(self): self.num_hashes = self.num_buckets * self.hashes_per_bucket diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 556911a96..deeae5c86 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -521,9 +521,9 @@ def __call__(self, dataset: DocumentDataset): documents_df=dataset.df, bucket_w_anchors_path=mapped_buckets_w_anchors_path, output_shuffled_docs_path=shuffled_docs_path, - bucket_mapping_df_blocksize=256, - parts_per_worker=1, - bucket_parts_per_worker=8, + bucket_mapping_df_blocksize=self.config.bucket_mapping_blocksize, + parts_per_worker=self.config.parts_per_worker, + bucket_parts_per_worker=self.config.bucket_parts_per_worker, ) print(f"Stage{stage_num} (False Postive Check): Shuffle docs complete!") stage_num += 1 @@ -755,7 +755,7 @@ def _get_output_map_from_text_bytes_per_bucket( ): # String bytes limit for cuDF # https://github.com/rapidsai/cudf/issues/13733 - max_text_bytes_per_part = int(np.iinfo(np.int32).max // 1.2) + max_text_bytes_per_part = int(np.iinfo(np.int32).max * 1.2) self._logger.info(f"max_text_bytes_per_part = {max_text_bytes_per_part}") # Increasing in an attempt to prevent hitting From d29ce8f41e80203f0438b855389413c3aaaecc24 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Tue, 21 May 2024 10:43:55 -0700 Subject: [PATCH 2/6] Export libcudf env for long string support Signed-off-by: Ayush Dattagupta --- nemo_curator/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nemo_curator/__init__.py b/nemo_curator/__init__.py index 80af4d698..650a8c511 100644 --- a/nemo_curator/__init__.py +++ b/nemo_curator/__init__.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import sys import dask @@ -48,3 +49,6 @@ # See https://github.com/NVIDIA/NeMo-Curator/issues/33 # This also happens when reading and writing to files dask.config.set({"dataframe.convert-string": False}) + +# Enable libcudf large string support +os.environ["LIBCUDF_LARGE_STRINGS_ENABLED"] = "1" From deca1606c889c9e000e9338fc2b99999ad27b3a6 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Wed, 10 Jul 2024 12:49:44 -0700 Subject: [PATCH 3/6] Default to using larger batches Signed-off-by: Ayush Dattagupta --- nemo_curator/modules/fuzzy_dedup.py | 2 +- nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py | 3 +++ nemo_curator/utils/distributed_utils.py | 2 ++ nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py | 2 +- 4 files changed, 7 insertions(+), 2 deletions(-) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index deeae5c86..ff7691d68 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -755,7 +755,7 @@ def _get_output_map_from_text_bytes_per_bucket( ): # String bytes limit for cuDF # https://github.com/rapidsai/cudf/issues/13733 - max_text_bytes_per_part = int(np.iinfo(np.int32).max * 1.2) + max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3) self._logger.info(f"max_text_bytes_per_part = {max_text_bytes_per_part}") # Increasing in an attempt to prevent hitting diff --git a/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py b/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py index 2add7587f..8f36a0a6f 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py +++ b/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py @@ -69,6 +69,7 @@ def main(args): print(f"Processed {args.num_files}... quitting") break + print(args.input_meta) files = get_all_files_paths_under(root=data_path, recurse_subdirectories=False) files = [f for f in files if f.endswith(".jsonl")] df = read_data( @@ -77,6 +78,7 @@ def main(args): backend="cudf", files_per_partition=args.files_per_partition, add_filename=False, + input_meta=args.input_meta, )[[id_field, text_field]] if num_files is not None: @@ -119,6 +121,7 @@ def attach_args(parser=None): help="Random seed used for intializing the hash " "functions used to compute the MinHashes" ) + argumentHelper.add_arg_input_meta() parser.add_argument( "--char-ngram", type=int, diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index ea9a2920b..dabda543e 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -237,6 +237,8 @@ def read_single_partition( read_kwargs = {"lines": filetype == "jsonl"} if backend == "cudf": read_f = cudf.read_json + if input_meta is not None: + read_kwargs["prune_columns"] = True else: read_kwargs["dtype"] = False read_f = pd.read_json diff --git a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py index 323fe7e81..20f15e787 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py @@ -83,7 +83,7 @@ def get_shuffle_part_ids_df( num_workers=0, ): sizes = agg_df[size_col].values - max_text_bytes_per_part = int(np.iinfo(np.int32).max // 1.2) + max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3) # Adjust max_text_bytes_per_part if the number of output # partitions is small compared to the number of workers. From 5559ce1be22c9b83bf0e017dce46ee9cdaa53c6d Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Fri, 6 Sep 2024 14:37:52 -0700 Subject: [PATCH 4/6] Remove large strings env variable since it's enabled by default in 24.8 and above Signed-off-by: Ayush Dattagupta --- nemo_curator/__init__.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/nemo_curator/__init__.py b/nemo_curator/__init__.py index 650a8c511..80af4d698 100644 --- a/nemo_curator/__init__.py +++ b/nemo_curator/__init__.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os import sys import dask @@ -49,6 +48,3 @@ # See https://github.com/NVIDIA/NeMo-Curator/issues/33 # This also happens when reading and writing to files dask.config.set({"dataframe.convert-string": False}) - -# Enable libcudf large string support -os.environ["LIBCUDF_LARGE_STRINGS_ENABLED"] = "1" From 73e12b6f3059f1b26b4560b4856570915e85464d Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Fri, 6 Sep 2024 14:38:24 -0700 Subject: [PATCH 5/6] Remove debug print, filter nulls before bucketing Signed-off-by: Ayush Dattagupta --- nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py | 1 - nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py b/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py index 8f36a0a6f..7ec17815f 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py +++ b/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py @@ -69,7 +69,6 @@ def main(args): print(f"Processed {args.num_files}... quitting") break - print(args.input_meta) files = get_all_files_paths_under(root=data_path, recurse_subdirectories=False) files = [f for f in files if f.endswith(".jsonl")] df = read_data( diff --git a/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py b/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py index d9740f412..a7e2f0247 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py +++ b/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py @@ -54,6 +54,7 @@ def main(args): dask_cudf.read_parquet(data_path, blocksize="2GB", aggregate_files=True) ) df = dask_cudf.concat(dfs, ignore_unknown_divisions=True) + df = df[~df.adlr_id.isna()] df = df.map_partitions( convert_str_id_to_int, id_column=id_field, From cbf1d6691f49fdcef0ec8843847a9c029e16eb2c Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Fri, 6 Sep 2024 14:42:36 -0700 Subject: [PATCH 6/6] Remove hardcoded id field Signed-off-by: Ayush Dattagupta --- nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py b/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py index a7e2f0247..40c42ec92 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py +++ b/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py @@ -54,7 +54,7 @@ def main(args): dask_cudf.read_parquet(data_path, blocksize="2GB", aggregate_files=True) ) df = dask_cudf.concat(dfs, ignore_unknown_divisions=True) - df = df[~df.adlr_id.isna()] + df = df[~df.id_field.isna()] df = df.map_partitions( convert_str_id_to_int, id_column=id_field,