diff --git a/nemo_curator/modules/config.py b/nemo_curator/modules/config.py index eec5b42ed..f388b7619 100644 --- a/nemo_curator/modules/config.py +++ b/nemo_curator/modules/config.py @@ -77,6 +77,9 @@ class FuzzyDuplicatesConfig(BaseConfig): # Only required for fp check num_anchors: int = 2 jaccard_threshold: float = 0.8 + bucket_mapping_blocksize: int = 256 + parts_per_worker: int = 1 + bucket_parts_per_worker: int = 8 def __post_init__(self): self.num_hashes = self.num_buckets * self.hashes_per_bucket diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 6694dd420..5ef465407 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -502,9 +502,9 @@ def __call__(self, dataset: DocumentDataset): documents_df=dataset.df, bucket_w_anchors_path=mapped_buckets_w_anchors_path, output_shuffled_docs_path=shuffled_docs_path, - bucket_mapping_df_blocksize=256, - parts_per_worker=1, - bucket_parts_per_worker=8, + bucket_mapping_df_blocksize=self.config.bucket_mapping_blocksize, + parts_per_worker=self.config.parts_per_worker, + bucket_parts_per_worker=self.config.bucket_parts_per_worker, ) print("Stage3 (False Postive Check): Shuffle docs complete!") @@ -602,7 +602,7 @@ def _get_output_map_from_text_bytes_per_bucket( ): # String bytes limit for cuDF # https://github.com/rapidsai/cudf/issues/13733 - max_text_bytes_per_part = int(np.iinfo(np.int32).max // 1.2) + max_text_bytes_per_part = int(np.iinfo(np.int32).max * 1.2) self._logger.info(f"max_text_bytes_per_part = {max_text_bytes_per_part}") # Increasing in an attempt to prevent hitting