Skip to content

Commit

Permalink
Update Fuzzy dedup params for long strings support. (NVIDIA#77)
Browse files Browse the repository at this point in the history
* Expose more configurations to test long string support

Signed-off-by: Ayush Dattagupta <[email protected]>

* Export libcudf env for long string support

Signed-off-by: Ayush Dattagupta <[email protected]>

* Default to using larger batches

Signed-off-by: Ayush Dattagupta <[email protected]>

* Remove large strings env variable since it's enabled by default in 24.8 and above

Signed-off-by: Ayush Dattagupta <[email protected]>

* Remove debug print, filter nulls before bucketing

Signed-off-by: Ayush Dattagupta <[email protected]>

* Remove hardcoded id field

Signed-off-by: Ayush Dattagupta <[email protected]>

---------

Signed-off-by: Ayush Dattagupta <[email protected]>
  • Loading branch information
ayushdg authored and yyu22 committed Oct 10, 2024
1 parent 3d55085 commit 71ba968
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 5 deletions.
3 changes: 3 additions & 0 deletions nemo_curator/modules/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class FuzzyDuplicatesConfig(BaseConfig):
# Only required for fp check
num_anchors: int = 2
jaccard_threshold: float = 0.8
bucket_mapping_blocksize: int = 256
parts_per_worker: int = 1
bucket_parts_per_worker: int = 8

def __post_init__(self):
self.num_hashes = self.num_buckets * self.hashes_per_bucket
Expand Down
8 changes: 4 additions & 4 deletions nemo_curator/modules/fuzzy_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,9 +521,9 @@ def __call__(self, dataset: DocumentDataset):
documents_df=dataset.df,
bucket_w_anchors_path=mapped_buckets_w_anchors_path,
output_shuffled_docs_path=shuffled_docs_path,
bucket_mapping_df_blocksize=256,
parts_per_worker=1,
bucket_parts_per_worker=8,
bucket_mapping_df_blocksize=self.config.bucket_mapping_blocksize,
parts_per_worker=self.config.parts_per_worker,
bucket_parts_per_worker=self.config.bucket_parts_per_worker,
)
print(f"Stage{stage_num} (False Postive Check): Shuffle docs complete!")
stage_num += 1
Expand Down Expand Up @@ -755,7 +755,7 @@ def _get_output_map_from_text_bytes_per_bucket(
):
# String bytes limit for cuDF
# https://github.com/rapidsai/cudf/issues/13733
max_text_bytes_per_part = int(np.iinfo(np.int32).max // 1.2)
max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3)

self._logger.info(f"max_text_bytes_per_part = {max_text_bytes_per_part}")
# Increasing in an attempt to prevent hitting
Expand Down
2 changes: 2 additions & 0 deletions nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def main(args):
backend="cudf",
files_per_partition=args.files_per_partition,
add_filename=False,
input_meta=args.input_meta,
)[[id_field, text_field]]

if num_files is not None:
Expand Down Expand Up @@ -119,6 +120,7 @@ def attach_args(parser=None):
help="Random seed used for intializing the hash "
"functions used to compute the MinHashes"
)
argumentHelper.add_arg_input_meta()
parser.add_argument(
"--char-ngram",
type=int,
Expand Down
1 change: 1 addition & 0 deletions nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def main(args):
dask_cudf.read_parquet(data_path, blocksize="2GB", aggregate_files=True)
)
df = dask_cudf.concat(dfs, ignore_unknown_divisions=True)
df = df[~df.id_field.isna()]
df = df.map_partitions(
convert_str_id_to_int,
id_column=id_field,
Expand Down
2 changes: 2 additions & 0 deletions nemo_curator/utils/distributed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ def read_single_partition(
read_kwargs = {"lines": filetype == "jsonl"}
if backend == "cudf":
read_f = cudf.read_json
if input_meta is not None:
read_kwargs["prune_columns"] = True
else:
read_kwargs["dtype"] = False
read_f = pd.read_json
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def get_shuffle_part_ids_df(
num_workers=0,
):
sizes = agg_df[size_col].values
max_text_bytes_per_part = int(np.iinfo(np.int32).max // 1.2)
max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3)

# Adjust max_text_bytes_per_part if the number of output
# partitions is small compared to the number of workers.
Expand Down

0 comments on commit 71ba968

Please sign in to comment.