From b3bcb5d9d7a74570f54eedb17209ea299d3e2b5b Mon Sep 17 00:00:00 2001 From: Praateek Date: Fri, 18 Oct 2024 15:19:34 -0700 Subject: [PATCH 01/16] add 32 bit impl Signed-off-by: Praateek --- nemo_curator/modules/fuzzy_dedup.py | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 63576516c..5ff4fc45b 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -99,7 +99,9 @@ def __init__( """ self.num_hashes = num_hashes self.char_ngram = char_ngrams - self.seeds = self.generate_seeds(n_seeds=self.num_hashes, seed=seed) + self.seeds = self.generate_hash_permutations( + n_permutations=self.num_hashes, seed=seed + ) self.minhash_method = self.minhash64 if use_64bit_hash else self.minhash32 self.id_field = id_field self.text_field = text_field @@ -120,12 +122,22 @@ def __init__( else: self._logger = logger - def generate_seeds(self, n_seeds: int = 260, seed: int = 0) -> np.ndarray: + def generate_hash_permutations(self, n_permutations=260, seed=0): """ Generate seeds for all minhash permutations based on the given seed. """ gen = np.random.RandomState(seed) - return gen.randint(0, 1e6, size=n_seeds) + MERSENNE_PRIME = np.uint32((1 << 31) - 1) + return np.array( + [ + ( + gen.randint(1, MERSENNE_PRIME, dtype=np.uint32), + gen.randint(0, MERSENNE_PRIME, dtype=np.uint32), + ) + for _ in range(n_permutations) + ], + dtype=np.uint32, + ) def minhash32( self, ser: cudf.Series, seeds: np.ndarray, char_ngram: int @@ -135,8 +147,12 @@ def minhash32( """ if not isinstance(ser, cudf.Series): raise TypeError("Expected data of type cudf.Series") - seeds = cudf.Series(seeds, dtype="uint32") - return ser.str.minhash(seeds=seeds, width=char_ngram) + seeds_a = cudf.Series(seeds[:, 0], dtype="uint32") + seeds_b = cudf.Series(seeds[:, 1], dtype="uint32") + + return ser.str.minhash_permuted( + a=seeds_a, b=seeds_b, seed=seeds[0][0], width=char_ngram + ) def minhash64( self, ser: cudf.Series, seeds: np.ndarray, char_ngram: int @@ -144,6 +160,7 @@ def minhash64( """ Compute 64bit minhashes based on the MurmurHash3 algorithm """ + raise NotImplementedError("minhash_permuted not implemented for 64 bit yet") if not isinstance(ser, cudf.Series): raise TypeError("Expected data of type cudf.Series") seeds = cudf.Series(seeds, dtype="uint64") From 221a9bdbb9d7783258606ed8cd4f34e8236064b5 Mon Sep 17 00:00:00 2001 From: Praateek Date: Fri, 18 Oct 2024 15:22:46 -0700 Subject: [PATCH 02/16] add typehint Signed-off-by: Praateek --- nemo_curator/modules/fuzzy_dedup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 5ff4fc45b..95e47e29b 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -122,7 +122,7 @@ def __init__( else: self._logger = logger - def generate_hash_permutations(self, n_permutations=260, seed=0): + def generate_hash_permutations(self, n_permutations=260, seed=0) -> np.ndarray: """ Generate seeds for all minhash permutations based on the given seed. """ From 99dcb7dffc5150594e5c79800107ff804f8852ac Mon Sep 17 00:00:00 2001 From: Praateek Date: Mon, 21 Oct 2024 13:08:57 -0700 Subject: [PATCH 03/16] add 32/64 bit Signed-off-by: Praateek --- nemo_curator/modules/fuzzy_dedup.py | 110 ++++++++++++++++------------ 1 file changed, 65 insertions(+), 45 deletions(-) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 95e47e29b..f1984bc84 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -59,10 +59,7 @@ build_partition, get_agg_text_bytes_df, ) -from nemo_curator.utils.fuzzy_dedup_utils.shuffle_utils import ( - text_bytes_aware_shuffle, - write_partitioned_file, -) +from nemo_curator.utils.fuzzy_dedup_utils.shuffle_utils import write_partitioned_file class MinHash: @@ -100,7 +97,9 @@ def __init__( self.num_hashes = num_hashes self.char_ngram = char_ngrams self.seeds = self.generate_hash_permutations( - n_permutations=self.num_hashes, seed=seed + bit_width=64 if use_64bit_hash else 32, + n_permutations=self.num_hashes, + seed=seed, ) self.minhash_method = self.minhash64 if use_64bit_hash else self.minhash32 self.id_field = id_field @@ -122,21 +121,33 @@ def __init__( else: self._logger = logger - def generate_hash_permutations(self, n_permutations=260, seed=0) -> np.ndarray: + def generate_hash_permutations( + self, bit_width: int, n_permutations: int = 260, seed: int = 0 + ) -> np.ndarray: """ Generate seeds for all minhash permutations based on the given seed. """ gen = np.random.RandomState(seed) - MERSENNE_PRIME = np.uint32((1 << 31) - 1) + + if bit_width == 32: + MERSENNE_PRIME = np.uint32((1 << 31) - 1) + dtype = np.uint32 + elif bit_width == 64: + # For 64-bit, use a larger prime number suitable for 64-bit operations + MERSENNE_PRIME = np.uint64((1 << 61) - 1) + dtype = np.uint64 + else: + raise ValueError("Unsupported bit width. Use either 32 or 64.") + return np.array( [ ( - gen.randint(1, MERSENNE_PRIME, dtype=np.uint32), - gen.randint(0, MERSENNE_PRIME, dtype=np.uint32), + gen.randint(1, MERSENNE_PRIME, dtype=dtype), + gen.randint(0, MERSENNE_PRIME, dtype=dtype), ) for _ in range(n_permutations) ], - dtype=np.uint32, + dtype=dtype, ) def minhash32( @@ -145,8 +156,6 @@ def minhash32( """ Compute 32bit minhashes based on the MurmurHash3 algorithm """ - if not isinstance(ser, cudf.Series): - raise TypeError("Expected data of type cudf.Series") seeds_a = cudf.Series(seeds[:, 0], dtype="uint32") seeds_b = cudf.Series(seeds[:, 1], dtype="uint32") @@ -160,11 +169,14 @@ def minhash64( """ Compute 64bit minhashes based on the MurmurHash3 algorithm """ - raise NotImplementedError("minhash_permuted not implemented for 64 bit yet") if not isinstance(ser, cudf.Series): raise TypeError("Expected data of type cudf.Series") - seeds = cudf.Series(seeds, dtype="uint64") - return ser.str.minhash64(seeds=seeds, width=char_ngram) + seeds_a = cudf.Series(seeds[:, 0], dtype="uint64") + seeds_b = cudf.Series(seeds[:, 1], dtype="uint64") + + return ser.str.minhash64_permuted( + a=seeds_a, b=seeds_b, seed=seeds[0][0], width=char_ngram + ) def __call__(self, dataset: DocumentDataset) -> Union[str, DocumentDataset]: """ @@ -1011,10 +1023,24 @@ def shuffle_docs_on_buckets( bucket_parts_per_worker: int = 8, partition_on: str = "_output_partition_id", ): + + try: + print(documents_df.repartition(npartitions=1).head()) + except Exception as e: + print("Failed to show head due to {e}") + + print(f"{bucket_w_anchors_path=}") + print(f"{output_shuffled_docs_path=}") + print(f"{bucket_mapping_df_blocksize=}") + print(f"{parts_per_worker=}") + print(f"{bucket_parts_per_worker=}") + print(f"{partition_on=}") + ddf_anchor_docs_with_bk, bk_mapping = aggregated_anchor_docs_with_bk_read( path=bucket_w_anchors_path, blocksize=bucket_mapping_df_blocksize, ) + print(f"bk_mapping=") self._logger.info("Getting ddf_anchor_docs_with_bk completed") self._logger.debug( f"ddf_anchor_docs_with_bk.npartitions = {ddf_anchor_docs_with_bk.npartitions}" @@ -1026,6 +1052,7 @@ def shuffle_docs_on_buckets( parts_per_bucket_batch = num_workers * bucket_parts_per_worker self._logger.debug(f"parts_per_bucket_batch = {parts_per_bucket_batch}") + print(f"{num_workers=}\n{parts_per_batch=}\n{parts_per_bucket_batch=}") dask_profile_name = ( "suffle_docs" + f"-parts_per_batch-{parts_per_batch}" @@ -1061,9 +1088,12 @@ def _batched_merge_and_write( bk_mapping, num_workers: int = None, ): + print("\n\n=== batched_merge_and_write ===") + print(left_df.repartition(npartitions=1).head()) + print(right_df.repartition(npartitions=1).head()) total_text_partitions = left_df.npartitions total_bucket_partitions = right_df.npartitions - + print(f"\n{total_text_partitions=}\n{total_bucket_partitions=}") # Extract global partitioning index left_df, global_partitioning_index = extract_partitioning_index( left_df, @@ -1072,11 +1102,15 @@ def _batched_merge_and_write( parts_per_bucket_batch, total_bucket_partitions, ) - + print(f"left_df=\n{left_df.repartition(npartitions=1).head()}") + print( + f"global_partitioning_index=\n{global_partitioning_index.repartition(npartitions=1).head()}" + ) # Set start offsets bucket_part_start_offset, text_part_start_offset = get_restart_offsets( output_path ) + print(f"\n{bucket_part_start_offset=}\n{text_part_start_offset=}") # Set end offsets # NOTE: These end offsets are always set to the end @@ -1085,7 +1119,7 @@ def _batched_merge_and_write( # in the future. bucket_part_end_offset = total_bucket_partitions text_part_end_offset = total_text_partitions - + print(f"\n{bucket_part_end_offset=}\n{text_part_end_offset=}") # Check that offsets are valid assert bucket_part_start_offset % parts_per_bucket_batch == 0 assert bucket_part_end_offset > bucket_part_start_offset @@ -1128,7 +1162,9 @@ def _batched_merge_and_write( # Select our bucket-mapping batch subset_bucket_df = right_df.partitions[bucket_part_offset:end_bucket_offset] subset_bucket_df = subset_bucket_df.persist() - + print( + f"subset_bucket_df=\n{subset_bucket_df.repartition(npartitions=1).head()}" + ) # Filter out rows of left_df that we know cannot # align with any rows of subset_bucket_df left_df_use = filter_text_rows_by_bucket_batch( @@ -1138,6 +1174,7 @@ def _batched_merge_and_write( bucket_part_end_offset, total_bucket_partitions, ) + print(f"left_df_use=\n{left_df_use.repartition(npartitions=1).head()}") text_part_offset = text_part_start_offset while text_part_offset < text_part_end_offset: @@ -1157,33 +1194,16 @@ def _batched_merge_and_write( subset_text_df = left_df_use.partitions[ text_part_offset:end_text_offset ] + print( + f"subset_text_df=\n{subset_text_df.repartition(npartitions=1).head()}" + ) - try: - # NOTE: If we have more text-df partitions than bucket-map - # partitions, we are more likely to see an OverflowError - output_df = text_bytes_aware_shuffle( - df=merge_left_to_shuffled_right( - subset_text_df, - subset_bucket_df, - merge_on, - ), - partition_on=partition_on, - text_column=self.text_field, - num_workers=num_workers, - ) - except OverflowError as err: - # We encountered an overflow error! - # Let's try again with less text data - parts_per_text_batch_retry = int(parts_per_text_batch_use / 2) - if parts_per_text_batch_retry < 1: - raise err - print( - f"\nWe encountered an OverflowError and will retry " - f"the current batch with {parts_per_text_batch_retry} " - f"text partitions instead of {parts_per_text_batch_use}.", - flush=True, - ) - continue + output_df = merge_left_to_shuffled_right( + subset_text_df, + subset_bucket_df, + merge_on, + ).shuffle(on=partition_on) + print(f"output_df=\n{output_df.repartition(npartitions=1).head()}") if self.int_to_str_id is not None: output_df = output_df.map_partitions( From 9079368456556ab056d8c503db8be6debdd59902 Mon Sep 17 00:00:00 2001 From: Praateek Date: Mon, 21 Oct 2024 13:12:56 -0700 Subject: [PATCH 04/16] remove minhash changes Signed-off-by: Praateek --- nemo_curator/modules/fuzzy_dedup.py | 58 +++-------- .../utils/fuzzy_dedup_utils/merge_utils.py | 10 ++ .../utils/fuzzy_dedup_utils/shuffle_utils.py | 96 ------------------- tests/test_fuzzy_dedup.py | 80 ++++++++-------- 4 files changed, 63 insertions(+), 181 deletions(-) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index f1984bc84..7f99e6c1e 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -59,7 +59,10 @@ build_partition, get_agg_text_bytes_df, ) -from nemo_curator.utils.fuzzy_dedup_utils.shuffle_utils import write_partitioned_file +from nemo_curator.utils.fuzzy_dedup_utils.shuffle_utils import ( + text_bytes_aware_shuffle, + write_partitioned_file, +) class MinHash: @@ -96,11 +99,7 @@ def __init__( """ self.num_hashes = num_hashes self.char_ngram = char_ngrams - self.seeds = self.generate_hash_permutations( - bit_width=64 if use_64bit_hash else 32, - n_permutations=self.num_hashes, - seed=seed, - ) + self.seeds = self.generate_seeds(n_seeds=self.num_hashes, seed=seed) self.minhash_method = self.minhash64 if use_64bit_hash else self.minhash32 self.id_field = id_field self.text_field = text_field @@ -121,34 +120,12 @@ def __init__( else: self._logger = logger - def generate_hash_permutations( - self, bit_width: int, n_permutations: int = 260, seed: int = 0 - ) -> np.ndarray: + def generate_seeds(self, n_seeds: int = 260, seed: int = 0) -> np.ndarray: """ Generate seeds for all minhash permutations based on the given seed. """ gen = np.random.RandomState(seed) - - if bit_width == 32: - MERSENNE_PRIME = np.uint32((1 << 31) - 1) - dtype = np.uint32 - elif bit_width == 64: - # For 64-bit, use a larger prime number suitable for 64-bit operations - MERSENNE_PRIME = np.uint64((1 << 61) - 1) - dtype = np.uint64 - else: - raise ValueError("Unsupported bit width. Use either 32 or 64.") - - return np.array( - [ - ( - gen.randint(1, MERSENNE_PRIME, dtype=dtype), - gen.randint(0, MERSENNE_PRIME, dtype=dtype), - ) - for _ in range(n_permutations) - ], - dtype=dtype, - ) + return gen.randint(0, 1e6, size=n_seeds) def minhash32( self, ser: cudf.Series, seeds: np.ndarray, char_ngram: int @@ -156,27 +133,18 @@ def minhash32( """ Compute 32bit minhashes based on the MurmurHash3 algorithm """ - seeds_a = cudf.Series(seeds[:, 0], dtype="uint32") - seeds_b = cudf.Series(seeds[:, 1], dtype="uint32") - - return ser.str.minhash_permuted( - a=seeds_a, b=seeds_b, seed=seeds[0][0], width=char_ngram - ) + if not isinstance(ser, cudf.Series): + raise TypeError("Expected data of type cudf.Series") + seeds = cudf.Series(seeds, dtype="uint32") + return ser.str.minhash(seeds=seeds, width=char_ngram) def minhash64( self, ser: cudf.Series, seeds: np.ndarray, char_ngram: int ) -> cudf.Series: - """ - Compute 64bit minhashes based on the MurmurHash3 algorithm - """ if not isinstance(ser, cudf.Series): raise TypeError("Expected data of type cudf.Series") - seeds_a = cudf.Series(seeds[:, 0], dtype="uint64") - seeds_b = cudf.Series(seeds[:, 1], dtype="uint64") - - return ser.str.minhash64_permuted( - a=seeds_a, b=seeds_b, seed=seeds[0][0], width=char_ngram - ) + seeds = cudf.Series(seeds, dtype="uint64") + return ser.str.minhash64(seeds=seeds, width=char_ngram) def __call__(self, dataset: DocumentDataset) -> Union[str, DocumentDataset]: """ diff --git a/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py index f5bff3164..85c8fe7f3 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py @@ -20,6 +20,7 @@ from dask.base import tokenize from dask.dataframe.shuffle import partitioning_index from dask.utils import M +from sympy import npartitions from nemo_curator._compat import DASK_SHUFFLE_CAST_DTYPE, query_planning_enabled @@ -153,7 +154,9 @@ def extract_partitioning_index( # `cast_dtype` argument doesn't exist yet cast_dtype = {} + print("\n ==== extract_partitioning_index START ====") num_bucket_files = bk_mapping.file_id.max() + 1 + print(f"{num_bucket_files=}") global_partitioning_index = left_df[merge_on].map_partitions( partitioning_index, npartitions=num_bucket_files, @@ -163,6 +166,9 @@ def extract_partitioning_index( align_dataframes=False, **cast_dtype, ) + print( + f"Before aggregation\n{global_partitioning_index.repartition(npartitions=1).head()}" + ) if total_bucket_partitions < num_bucket_files: # Our bucket-map files were aggregated. @@ -177,6 +183,9 @@ def extract_partitioning_index( transform_divisions=False, align_dataframes=False, ) + print( + f"After aggregation\n{global_partitioning_index.repartition(npartitions=1).head()}" + ) # Since we are iterating over `right_df`, we do not really # want to send the rows of `left_df` to the partition @@ -184,6 +193,7 @@ def extract_partitioning_index( # need to take a modulus with `parts_per_bucket_batch` to # define a `"_partitions"` column. left_df["_partitions"] = global_partitioning_index % parts_per_bucket_batch + print("\n ==== extract_partitioning_index END ====\n") return left_df, global_partitioning_index diff --git a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py index 7d05b399c..4368f717b 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py @@ -97,99 +97,3 @@ def rearange_by_column_direct( npartitions=npartitions, ignore_index=ignore_index, ) - - -def get_shuffle_part_ids_df( - agg_df, - partition_on, - output_col, - size_col, - num_workers=0, -): - sizes = agg_df[size_col].values - max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3) - - # Adjust max_text_bytes_per_part if the number of output - # partitions is small compared to the number of workers. - # Sometimes we just have very few output partitions to - # deal with, and just need a larger batch - npartitions_min = max(1, int(num_workers * 0.8)) - while True: - output_ar = build_partition(sizes.get(), max_text_bytes_per_part) - if output_ar.max() > npartitions_min or max_text_bytes_per_part < 2**24: - break - max_text_bytes_per_part = int(max_text_bytes_per_part // 2.0) - - df = cudf.DataFrame() - df[partition_on] = agg_df[partition_on] - df[output_col] = output_ar - return df - - -def get_shuffle_partition_info( - df, - partition_on, - output_column, - text_column, - bytes_column="_text_bytes", - num_workers=None, -): - df[bytes_column] = df[text_column].map_partitions(lambda s: s.str.byte_count()) - agg_df, _ = get_agg_text_bytes_df( - df, agg_column=partition_on, bytes_column=bytes_column, n_partitions=1 - ) - del df - - agg_df = agg_df.reset_index(drop=True) - shuffle_part_ids = agg_df.map_partitions( - get_shuffle_part_ids_df, - partition_on, - size_col=bytes_column, - num_workers=num_workers, - output_col=output_column, - ).persist() - return shuffle_part_ids - - -def text_bytes_aware_shuffle(df, partition_on, text_column, num_workers=None): - """ - This shuffle takes into account the text bytes of each partition - and tries to make sure that the output partitions do not exceed - the char limit of cuDF - - Args: - df: dask_cudf dataframe - partition_on: column name to partition on - - - Returns: - dask_cudf dataframe with _partitions columns - """ - print("Starting text bytes aware shuffle", flush=True) - output_col = "_partitions" - - df = df.persist() - shuffle_part_ids = get_shuffle_partition_info( - df=df, - partition_on=partition_on, - num_workers=num_workers, - output_column=output_col, - text_column=text_column, - ) - n_output_partitions = shuffle_part_ids[output_col].max().compute() + 1 - n_output_partitions = int(n_output_partitions) - df = df.merge(shuffle_part_ids, on=partition_on, how="inner").persist() - - df = ( - rearange_by_column_direct( - df, - col=output_col, - npartitions=n_output_partitions, - ignore_index=True, - excomms_default=True, - ) - .drop(columns=[output_col]) - .persist() - ) - print(f"Will write {len(df)} rows to disk", flush=True) - return df diff --git a/tests/test_fuzzy_dedup.py b/tests/test_fuzzy_dedup.py index fb831cb16..95828b7ac 100644 --- a/tests/test_fuzzy_dedup.py +++ b/tests/test_fuzzy_dedup.py @@ -372,46 +372,46 @@ def test_num_anchors(self, large_fuzzy_dedup_data, num_anchors, tmpdir): ).columns assert all(f"anchor_{i}_id" in anchor_docs_df_cols for i in range(num_anchors)) - @pytest.mark.parametrize("use_64_bit_hash", [False, True]) - @pytest.mark.parametrize( - "num_buckets,duplicate_docs", - # Duplcated docs estimated from true_jaccard values - [ - (10, [[4, -1], [1, 2, 300]]), - (3, [[4, -1], [1, 2, 300]]), - ], - ) - def test_no_fp_check( - self, fuzzy_dedup_data, use_64_bit_hash, num_buckets, duplicate_docs, tmpdir - ): - config = FuzzyDuplicatesConfig( - cache_dir=tmpdir, - id_field="id", - text_field="text", - seed=42, - char_ngrams=5, - num_buckets=num_buckets, - hashes_per_bucket=1, - use_64_bit_hash=use_64_bit_hash, - buckets_per_shuffle=5, - false_positive_check=False, - num_anchors=2, - jaccard_threshold=0.39, - ) - fuzzy_duplicates = FuzzyDuplicates(config=config) - result = fuzzy_duplicates(fuzzy_dedup_data) - result_df = result.df.compute() - # Drop non duplicated docs - result_df = result_df[result_df.group.duplicated(keep=False)] - result_df = result_df.groupby("group").id.agg(list) - # Sort to maintain uniform ordering - - result_df = result_df.list.sort_values() - result_df = result_df.sort_values() - expected_df = cudf.Series(duplicate_docs, name="id") - expected_df = expected_df.list.sort_values() - expected_df = expected_df.sort_values() - assert_eq(expected_df, result_df, check_index=False) + # @pytest.mark.parametrize("use_64_bit_hash", [False, True]) + # @pytest.mark.parametrize( + # "num_buckets,duplicate_docs", + # # Duplcated docs estimated from true_jaccard values + # [ + # (10, [[4, -1], [1, 2, 300]]), + # (3, [[4, -1], [1, 2, 300]]), + # ], + # ) + # def test_no_fp_check( + # self, fuzzy_dedup_data, use_64_bit_hash, num_buckets, duplicate_docs, tmpdir + # ): + # config = FuzzyDuplicatesConfig( + # cache_dir=tmpdir, + # id_field="id", + # text_field="text", + # seed=42, + # char_ngrams=5, + # num_buckets=num_buckets, + # hashes_per_bucket=1, + # use_64_bit_hash=use_64_bit_hash, + # buckets_per_shuffle=5, + # false_positive_check=False, + # num_anchors=2, + # jaccard_threshold=0.39, + # ) + # fuzzy_duplicates = FuzzyDuplicates(config=config) + # result = fuzzy_duplicates(fuzzy_dedup_data) + # result_df = result.df.compute() + # # Drop non duplicated docs + # result_df = result_df[result_df.group.duplicated(keep=False)] + # result_df = result_df.groupby("group").id.agg(list) + # # Sort to maintain uniform ordering + + # result_df = result_df.list.sort_values() + # result_df = result_df.sort_values() + # expected_df = cudf.Series(duplicate_docs, name="id") + # expected_df = expected_df.list.sort_values() + # expected_df = expected_df.sort_values() + # assert_eq(expected_df, result_df, check_index=False) class TestFuzzyDuplicatesConfig: From 467e628da201ffe7bd65946576b22a47cd0b5078 Mon Sep 17 00:00:00 2001 From: Praateek Date: Mon, 21 Oct 2024 13:14:20 -0700 Subject: [PATCH 05/16] remove debug code Signed-off-by: Praateek --- .../utils/fuzzy_dedup_utils/merge_utils.py | 1 - tests/test_fuzzy_dedup.py | 80 +++++++++---------- 2 files changed, 40 insertions(+), 41 deletions(-) diff --git a/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py index 85c8fe7f3..d01357724 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py @@ -20,7 +20,6 @@ from dask.base import tokenize from dask.dataframe.shuffle import partitioning_index from dask.utils import M -from sympy import npartitions from nemo_curator._compat import DASK_SHUFFLE_CAST_DTYPE, query_planning_enabled diff --git a/tests/test_fuzzy_dedup.py b/tests/test_fuzzy_dedup.py index 95828b7ac..fb831cb16 100644 --- a/tests/test_fuzzy_dedup.py +++ b/tests/test_fuzzy_dedup.py @@ -372,46 +372,46 @@ def test_num_anchors(self, large_fuzzy_dedup_data, num_anchors, tmpdir): ).columns assert all(f"anchor_{i}_id" in anchor_docs_df_cols for i in range(num_anchors)) - # @pytest.mark.parametrize("use_64_bit_hash", [False, True]) - # @pytest.mark.parametrize( - # "num_buckets,duplicate_docs", - # # Duplcated docs estimated from true_jaccard values - # [ - # (10, [[4, -1], [1, 2, 300]]), - # (3, [[4, -1], [1, 2, 300]]), - # ], - # ) - # def test_no_fp_check( - # self, fuzzy_dedup_data, use_64_bit_hash, num_buckets, duplicate_docs, tmpdir - # ): - # config = FuzzyDuplicatesConfig( - # cache_dir=tmpdir, - # id_field="id", - # text_field="text", - # seed=42, - # char_ngrams=5, - # num_buckets=num_buckets, - # hashes_per_bucket=1, - # use_64_bit_hash=use_64_bit_hash, - # buckets_per_shuffle=5, - # false_positive_check=False, - # num_anchors=2, - # jaccard_threshold=0.39, - # ) - # fuzzy_duplicates = FuzzyDuplicates(config=config) - # result = fuzzy_duplicates(fuzzy_dedup_data) - # result_df = result.df.compute() - # # Drop non duplicated docs - # result_df = result_df[result_df.group.duplicated(keep=False)] - # result_df = result_df.groupby("group").id.agg(list) - # # Sort to maintain uniform ordering - - # result_df = result_df.list.sort_values() - # result_df = result_df.sort_values() - # expected_df = cudf.Series(duplicate_docs, name="id") - # expected_df = expected_df.list.sort_values() - # expected_df = expected_df.sort_values() - # assert_eq(expected_df, result_df, check_index=False) + @pytest.mark.parametrize("use_64_bit_hash", [False, True]) + @pytest.mark.parametrize( + "num_buckets,duplicate_docs", + # Duplcated docs estimated from true_jaccard values + [ + (10, [[4, -1], [1, 2, 300]]), + (3, [[4, -1], [1, 2, 300]]), + ], + ) + def test_no_fp_check( + self, fuzzy_dedup_data, use_64_bit_hash, num_buckets, duplicate_docs, tmpdir + ): + config = FuzzyDuplicatesConfig( + cache_dir=tmpdir, + id_field="id", + text_field="text", + seed=42, + char_ngrams=5, + num_buckets=num_buckets, + hashes_per_bucket=1, + use_64_bit_hash=use_64_bit_hash, + buckets_per_shuffle=5, + false_positive_check=False, + num_anchors=2, + jaccard_threshold=0.39, + ) + fuzzy_duplicates = FuzzyDuplicates(config=config) + result = fuzzy_duplicates(fuzzy_dedup_data) + result_df = result.df.compute() + # Drop non duplicated docs + result_df = result_df[result_df.group.duplicated(keep=False)] + result_df = result_df.groupby("group").id.agg(list) + # Sort to maintain uniform ordering + + result_df = result_df.list.sort_values() + result_df = result_df.sort_values() + expected_df = cudf.Series(duplicate_docs, name="id") + expected_df = expected_df.list.sort_values() + expected_df = expected_df.sort_values() + assert_eq(expected_df, result_df, check_index=False) class TestFuzzyDuplicatesConfig: From f2dadc9b4038a7447172d45f9dce1123cc93d646 Mon Sep 17 00:00:00 2001 From: Praateek Date: Mon, 21 Oct 2024 13:23:20 -0700 Subject: [PATCH 06/16] removed print statements Signed-off-by: Praateek --- nemo_curator/modules/fuzzy_dedup.py | 37 ++----------------- .../utils/fuzzy_dedup_utils/io_utils.py | 1 + .../utils/fuzzy_dedup_utils/merge_utils.py | 9 ----- 3 files changed, 4 insertions(+), 43 deletions(-) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 7f99e6c1e..ee13c8396 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -141,6 +141,9 @@ def minhash32( def minhash64( self, ser: cudf.Series, seeds: np.ndarray, char_ngram: int ) -> cudf.Series: + """ + Compute 64bit minhashes based on the MurmurHash3 algorithm + """ if not isinstance(ser, cudf.Series): raise TypeError("Expected data of type cudf.Series") seeds = cudf.Series(seeds, dtype="uint64") @@ -992,23 +995,10 @@ def shuffle_docs_on_buckets( partition_on: str = "_output_partition_id", ): - try: - print(documents_df.repartition(npartitions=1).head()) - except Exception as e: - print("Failed to show head due to {e}") - - print(f"{bucket_w_anchors_path=}") - print(f"{output_shuffled_docs_path=}") - print(f"{bucket_mapping_df_blocksize=}") - print(f"{parts_per_worker=}") - print(f"{bucket_parts_per_worker=}") - print(f"{partition_on=}") - ddf_anchor_docs_with_bk, bk_mapping = aggregated_anchor_docs_with_bk_read( path=bucket_w_anchors_path, blocksize=bucket_mapping_df_blocksize, ) - print(f"bk_mapping=") self._logger.info("Getting ddf_anchor_docs_with_bk completed") self._logger.debug( f"ddf_anchor_docs_with_bk.npartitions = {ddf_anchor_docs_with_bk.npartitions}" @@ -1020,7 +1010,6 @@ def shuffle_docs_on_buckets( parts_per_bucket_batch = num_workers * bucket_parts_per_worker self._logger.debug(f"parts_per_bucket_batch = {parts_per_bucket_batch}") - print(f"{num_workers=}\n{parts_per_batch=}\n{parts_per_bucket_batch=}") dask_profile_name = ( "suffle_docs" + f"-parts_per_batch-{parts_per_batch}" @@ -1056,12 +1045,8 @@ def _batched_merge_and_write( bk_mapping, num_workers: int = None, ): - print("\n\n=== batched_merge_and_write ===") - print(left_df.repartition(npartitions=1).head()) - print(right_df.repartition(npartitions=1).head()) total_text_partitions = left_df.npartitions total_bucket_partitions = right_df.npartitions - print(f"\n{total_text_partitions=}\n{total_bucket_partitions=}") # Extract global partitioning index left_df, global_partitioning_index = extract_partitioning_index( left_df, @@ -1070,16 +1055,10 @@ def _batched_merge_and_write( parts_per_bucket_batch, total_bucket_partitions, ) - print(f"left_df=\n{left_df.repartition(npartitions=1).head()}") - print( - f"global_partitioning_index=\n{global_partitioning_index.repartition(npartitions=1).head()}" - ) # Set start offsets bucket_part_start_offset, text_part_start_offset = get_restart_offsets( output_path ) - print(f"\n{bucket_part_start_offset=}\n{text_part_start_offset=}") - # Set end offsets # NOTE: These end offsets are always set to the end # of the data. However, we may want to be able to set @@ -1087,7 +1066,6 @@ def _batched_merge_and_write( # in the future. bucket_part_end_offset = total_bucket_partitions text_part_end_offset = total_text_partitions - print(f"\n{bucket_part_end_offset=}\n{text_part_end_offset=}") # Check that offsets are valid assert bucket_part_start_offset % parts_per_bucket_batch == 0 assert bucket_part_end_offset > bucket_part_start_offset @@ -1130,9 +1108,6 @@ def _batched_merge_and_write( # Select our bucket-mapping batch subset_bucket_df = right_df.partitions[bucket_part_offset:end_bucket_offset] subset_bucket_df = subset_bucket_df.persist() - print( - f"subset_bucket_df=\n{subset_bucket_df.repartition(npartitions=1).head()}" - ) # Filter out rows of left_df that we know cannot # align with any rows of subset_bucket_df left_df_use = filter_text_rows_by_bucket_batch( @@ -1142,7 +1117,6 @@ def _batched_merge_and_write( bucket_part_end_offset, total_bucket_partitions, ) - print(f"left_df_use=\n{left_df_use.repartition(npartitions=1).head()}") text_part_offset = text_part_start_offset while text_part_offset < text_part_end_offset: @@ -1162,16 +1136,11 @@ def _batched_merge_and_write( subset_text_df = left_df_use.partitions[ text_part_offset:end_text_offset ] - print( - f"subset_text_df=\n{subset_text_df.repartition(npartitions=1).head()}" - ) - output_df = merge_left_to_shuffled_right( subset_text_df, subset_bucket_df, merge_on, ).shuffle(on=partition_on) - print(f"output_df=\n{output_df.repartition(npartitions=1).head()}") if self.int_to_str_id is not None: output_df = output_df.map_partitions( diff --git a/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py index e6f126209..4722beaa8 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py @@ -119,6 +119,7 @@ def aggregated_anchor_docs_with_bk_read(path, blocksize): sorted(glob(f"{path}/*.parquet"), key=natural_sort_key), format="parquet", ) + # create chunks of files to which are less than blocksize chunks = chunk_files(ds.get_fragments(), blocksize) # Record mapping between file indices and partition indices. diff --git a/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py index d01357724..f5bff3164 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/merge_utils.py @@ -153,9 +153,7 @@ def extract_partitioning_index( # `cast_dtype` argument doesn't exist yet cast_dtype = {} - print("\n ==== extract_partitioning_index START ====") num_bucket_files = bk_mapping.file_id.max() + 1 - print(f"{num_bucket_files=}") global_partitioning_index = left_df[merge_on].map_partitions( partitioning_index, npartitions=num_bucket_files, @@ -165,9 +163,6 @@ def extract_partitioning_index( align_dataframes=False, **cast_dtype, ) - print( - f"Before aggregation\n{global_partitioning_index.repartition(npartitions=1).head()}" - ) if total_bucket_partitions < num_bucket_files: # Our bucket-map files were aggregated. @@ -182,9 +177,6 @@ def extract_partitioning_index( transform_divisions=False, align_dataframes=False, ) - print( - f"After aggregation\n{global_partitioning_index.repartition(npartitions=1).head()}" - ) # Since we are iterating over `right_df`, we do not really # want to send the rows of `left_df` to the partition @@ -192,7 +184,6 @@ def extract_partitioning_index( # need to take a modulus with `parts_per_bucket_batch` to # define a `"_partitions"` column. left_df["_partitions"] = global_partitioning_index % parts_per_bucket_batch - print("\n ==== extract_partitioning_index END ====\n") return left_df, global_partitioning_index From 53f477aa8d78732dc9f5d85ba6adec94e92f6f32 Mon Sep 17 00:00:00 2001 From: Praateek Date: Mon, 21 Oct 2024 13:27:43 -0700 Subject: [PATCH 07/16] blanklines Signed-off-by: Praateek --- nemo_curator/modules/fuzzy_dedup.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index ee13c8396..722a24ad1 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -1047,6 +1047,7 @@ def _batched_merge_and_write( ): total_text_partitions = left_df.npartitions total_bucket_partitions = right_df.npartitions + # Extract global partitioning index left_df, global_partitioning_index = extract_partitioning_index( left_df, @@ -1055,10 +1056,12 @@ def _batched_merge_and_write( parts_per_bucket_batch, total_bucket_partitions, ) + # Set start offsets bucket_part_start_offset, text_part_start_offset = get_restart_offsets( output_path ) + # Set end offsets # NOTE: These end offsets are always set to the end # of the data. However, we may want to be able to set @@ -1066,6 +1069,7 @@ def _batched_merge_and_write( # in the future. bucket_part_end_offset = total_bucket_partitions text_part_end_offset = total_text_partitions + # Check that offsets are valid assert bucket_part_start_offset % parts_per_bucket_batch == 0 assert bucket_part_end_offset > bucket_part_start_offset @@ -1108,6 +1112,7 @@ def _batched_merge_and_write( # Select our bucket-mapping batch subset_bucket_df = right_df.partitions[bucket_part_offset:end_bucket_offset] subset_bucket_df = subset_bucket_df.persist() + # Filter out rows of left_df that we know cannot # align with any rows of subset_bucket_df left_df_use = filter_text_rows_by_bucket_batch( From 05538490bf16130407ac3dded02c013f213d1ed2 Mon Sep 17 00:00:00 2001 From: Praateek Date: Mon, 21 Oct 2024 17:36:12 -0700 Subject: [PATCH 08/16] remove unused imports Signed-off-by: Praateek --- nemo_curator/modules/fuzzy_dedup.py | 9 +++------ nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py | 5 ----- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 722a24ad1..9a476be2c 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -59,10 +59,7 @@ build_partition, get_agg_text_bytes_df, ) -from nemo_curator.utils.fuzzy_dedup_utils.shuffle_utils import ( - text_bytes_aware_shuffle, - write_partitioned_file, -) +from nemo_curator.utils.fuzzy_dedup_utils.shuffle_utils import write_partitioned_file class MinHash: @@ -371,7 +368,7 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: write_path = os.path.join(self.cache_dir, "_buckets.parquet") t0 = time.time() - with performance_report_if_with_ts_suffix(self.profile_dir, f"lsh-profile"): + with performance_report_if_with_ts_suffix(self.profile_dir, "lsh-profile"): self.lsh(write_path=write_path, df=df) self._logger.info( f"Time taken for LSH = {time.time() - t0}s and output written at {write_path}" @@ -505,7 +502,7 @@ def __call__(self, dataset: DocumentDataset): ) with performance_report_if_with_ts_suffix( self.config.profile_dir, - f"map_buckets", + "map_buckets", ): ddf_mapped_buckets_w_anchors = ( self.map_buckets.map_buckets_with_anchors( diff --git a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py index 4368f717b..f5d3ac26a 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py @@ -14,15 +14,10 @@ import cudf import dask_cuda -import numpy as np from dask import config from packaging.version import Version from nemo_curator._compat import query_planning_enabled -from nemo_curator.utils.fuzzy_dedup_utils.output_map_utils import ( - build_partition, - get_agg_text_bytes_df, -) dask_cuda_version = Version(dask_cuda.__version__) USE_EXCOMMS = ( From 9f1ceec5798bede6e13021f3d9cdf16089711296 Mon Sep 17 00:00:00 2001 From: Praateek Date: Mon, 4 Nov 2024 03:20:53 -0800 Subject: [PATCH 09/16] try different approaches Signed-off-by: Praateek --- nemo_curator/modules/fuzzy_dedup.py | 40 +++++-- .../utils/fuzzy_dedup_utils/shuffle_utils.py | 113 ++++++++++++++++++ 2 files changed, 145 insertions(+), 8 deletions(-) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 9a476be2c..d6b1fdbfa 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -59,8 +59,11 @@ build_partition, get_agg_text_bytes_df, ) -from nemo_curator.utils.fuzzy_dedup_utils.shuffle_utils import write_partitioned_file - +from nemo_curator.utils.fuzzy_dedup_utils.shuffle_utils import ( + text_bytes_aware_shuffle, + write_partitioned_file, + rearange_by_column_direct, +) class MinHash: """ @@ -1138,12 +1141,33 @@ def _batched_merge_and_write( subset_text_df = left_df_use.partitions[ text_part_offset:end_text_offset ] - output_df = merge_left_to_shuffled_right( - subset_text_df, - subset_bucket_df, - merge_on, - ).shuffle(on=partition_on) - + merged_subset_df = merge_left_to_shuffled_right( + subset_text_df, + subset_bucket_df, + merge_on, + ) + if os.environ["SHUFFLE_APPROACH"] == "text_bytes_aware": + self._logger.info("Using text_bytes_aware_shuffle") + output_df = text_bytes_aware_shuffle( + df=merged_subset_df, + partition_on=partition_on, + text_column=self.text_field, + num_workers=num_workers, + ) + elif os.environ["SHUFFLE_APPROACH"] == "dask_vanilla": + self._logger.info("Using dask's vanilla shuffle") + output_df = merged_subset_df.shuffle(on=partition_on) + elif os.environ["SHUFFLE_APPROACH"] == "rearrange_by_column_direct": + self._logger.info("Using rearrange_by_column_direct") + output_df = rearange_by_column_direct( + df=merged_subset_df, + col=partition_on, + npartitions=num_workers, + ignore_index=True, + excomms_default=True + ) + else: + raise ValueError("Invalid shuffle approach") if self.int_to_str_id is not None: output_df = output_df.map_partitions( int_ids_to_str, id_column=self.int_to_str_id diff --git a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py index f5d3ac26a..02377642e 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py @@ -12,12 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Optional + import cudf import dask_cuda +import numpy as np from dask import config from packaging.version import Version from nemo_curator._compat import query_planning_enabled +from nemo_curator.utils.fuzzy_dedup_utils.output_map_utils import ( + build_partition, + get_agg_text_bytes_df, +) dask_cuda_version = Version(dask_cuda.__version__) USE_EXCOMMS = ( @@ -47,6 +54,7 @@ def rearange_by_column_direct( ): # Execute a "direct" shuffle operation without staging if config.get("explicit-comms", excomms_default): + print("Using excomms", flush=True) from dask_cuda.explicit_comms.dataframe.shuffle import ( shuffle as explicit_comms_shuffle, ) @@ -66,6 +74,7 @@ def rearange_by_column_direct( from dask_expr._shuffle import RearrangeByColumn # Use the internal dask-expr API + print("Using queryplanning", flush=True) return new_collection( RearrangeByColumn( frame=df.expr, @@ -81,6 +90,7 @@ def rearange_by_column_direct( else: from dask.dataframe.shuffle import rearrange_by_column + print("Using oldschool", flush=True) return rearrange_by_column( df, @@ -92,3 +102,106 @@ def rearange_by_column_direct( npartitions=npartitions, ignore_index=ignore_index, ) + + +def get_shuffle_part_ids_df( + agg_df, + partition_on, + output_col, + size_col, + num_workers=0, +): + sizes = agg_df[size_col].values + max_text_bytes_per_part = int(np.iinfo(np.int32).max * 3) + + # Adjust max_text_bytes_per_part if the number of output + # partitions is small compared to the number of workers. + # Sometimes we just have very few output partitions to + # deal with, and just need a larger batch + npartitions_min = max(1, int(num_workers * 0.8)) + while True: + output_ar = build_partition(sizes.get(), max_text_bytes_per_part) + if output_ar.max() > npartitions_min or max_text_bytes_per_part < 2**24: + break + max_text_bytes_per_part = int(max_text_bytes_per_part // 2.0) + + df = cudf.DataFrame() + df[partition_on] = agg_df[partition_on] + df[output_col] = output_ar + return df + + +def get_shuffle_partition_info( + df, + partition_on, + output_column, + text_column, + bytes_column="_text_bytes", + num_workers=None, +): + df[bytes_column] = df[text_column].map_partitions(lambda s: s.str.byte_count()) + agg_df, _ = get_agg_text_bytes_df( + df, agg_column=partition_on, bytes_column=bytes_column, n_partitions=1 + ) + del df + + agg_df = agg_df.reset_index(drop=True) + shuffle_part_ids = agg_df.map_partitions( + get_shuffle_part_ids_df, + partition_on, + size_col=bytes_column, + num_workers=num_workers, + output_col=output_column, + ).persist() + return shuffle_part_ids + + +def text_bytes_aware_shuffle( + df, + partition_on: str, + text_column: str, + num_workers: Optional[int] = None, +): + """ + This shuffle takes into account the text bytes of each partition + and tries to make sure that the output partitions do not exceed + the char limit of cuDF + + Args: + df: dask_cudf dataframe + partition_on: column name to partition on + text_column: column name for the text data + + Returns: + dask_cudf dataframe with _partitions columns or None if `df` is empty after the merge + """ + print("Starting text bytes aware shuffle", flush=True) + output_col = "_partitions" + + df = df.persist() + if len(df) == 0: + return None + shuffle_part_ids = get_shuffle_partition_info( + df=df, + partition_on=partition_on, + num_workers=num_workers, + output_column=output_col, + text_column=text_column, + ) + n_output_partitions = shuffle_part_ids[output_col].max().compute() + 1 + n_output_partitions = int(n_output_partitions) + df = df.merge(shuffle_part_ids, on=partition_on, how="inner").persist() + + df = ( + rearange_by_column_direct( + df, + col=output_col, + npartitions=n_output_partitions, + ignore_index=True, + excomms_default=True, + ) + .drop(columns=[output_col]) + .persist() + ) + print(f"Will write {len(df)} rows to disk", flush=True) + return df \ No newline at end of file From dd984b8ba3b1dd6a8cc214fb4b3cfe8aade35f67 Mon Sep 17 00:00:00 2001 From: Praateek Date: Mon, 4 Nov 2024 11:16:34 -0800 Subject: [PATCH 10/16] use npartitions Signed-off-by: Praateek --- nemo_curator/modules/fuzzy_dedup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index d6b1fdbfa..c6b5450c4 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -1162,7 +1162,7 @@ def _batched_merge_and_write( output_df = rearange_by_column_direct( df=merged_subset_df, col=partition_on, - npartitions=num_workers, + npartitions=merged_subset_df.npartitions, ignore_index=True, excomms_default=True ) From 2776c1ecca5d26dcc0a322c4958213698b8d1ad3 Mon Sep 17 00:00:00 2001 From: Praateek Date: Mon, 4 Nov 2024 11:21:12 -0800 Subject: [PATCH 11/16] add 2 more approaches Signed-off-by: Praateek --- nemo_curator/modules/fuzzy_dedup.py | 32 +++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index c6b5450c4..5a13b7185 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -1166,8 +1166,40 @@ def _batched_merge_and_write( ignore_index=True, excomms_default=True ) + elif os.environ["SHUFFLE_APPROACH"] == "rearrange_second_branch": + self._logger.info("Using rearrange_second_branch") + from dask_expr._collection import new_collection + from dask_expr._shuffle import RearrangeByColumn + + # Use the internal dask-expr API + output_df = new_collection( + RearrangeByColumn( + frame=merged_subset_df.expr, + partitioning_index=partition_on, + npartitions_out=merged_subset_df.npartitions, + ignore_index=True, + method="tasks", + # Prevent staged shuffling by setting max_branch + # to the number of input partitions + 1 + options={"max_branch": merged_subset_df.npartitions + 1}, + ) + ) + elif os.enviorn["SHUFFLE_APPROACH"] == "rearrange_third_branch": + self._logger.info("Using rearrange_third_branch") + from dask.dataframe.shuffle import rearrange_by_column + output_df = rearrange_by_column( + merged_subset_df, + col=partition_on, + shuffle_method="tasks", + # Prevent staged shuffling by setting max_branch + # to the number of input partitions + 1 + max_branch=merged_subset_df.npartitions + 1, + npartitions=merged_subset_df.npartitions, + ignore_index=True, + ) else: raise ValueError("Invalid shuffle approach") + if self.int_to_str_id is not None: output_df = output_df.map_partitions( int_ids_to_str, id_column=self.int_to_str_id From 9186975c4afa328e442d3bccc0c956f07b77e2a6 Mon Sep 17 00:00:00 2001 From: Praateek Date: Tue, 5 Nov 2024 03:32:32 -0800 Subject: [PATCH 12/16] typo Signed-off-by: Praateek --- nemo_curator/modules/fuzzy_dedup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 5a13b7185..41e888881 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -1184,7 +1184,7 @@ def _batched_merge_and_write( options={"max_branch": merged_subset_df.npartitions + 1}, ) ) - elif os.enviorn["SHUFFLE_APPROACH"] == "rearrange_third_branch": + elif os.environ["SHUFFLE_APPROACH"] == "rearrange_third_branch": self._logger.info("Using rearrange_third_branch") from dask.dataframe.shuffle import rearrange_by_column output_df = rearrange_by_column( From 4914974d2e6ab6a7a18b97db4f6f2d101614c1d4 Mon Sep 17 00:00:00 2001 From: Praateek Date: Thu, 7 Nov 2024 03:22:47 -0800 Subject: [PATCH 13/16] rename var + left change from merge Signed-off-by: Praateek --- nemo_curator/modules/fuzzy_dedup.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 44e0977e1..03621bf35 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -1142,7 +1142,7 @@ def _batched_merge_and_write( subset_text_df = left_df_use.partitions[ text_part_offset:end_text_offset ] - merged_subset_df = merge_left_to_shuffled_right( + subset_merged_df = merge_left_to_shuffled_right( subset_text_df, subset_bucket_df, merge_on, @@ -1150,20 +1150,20 @@ def _batched_merge_and_write( if os.environ["SHUFFLE_APPROACH"] == "text_bytes_aware": self._logger.info("Using text_bytes_aware_shuffle") output_df = text_bytes_aware_shuffle( - df=merged_subset_df, + df=subset_merged_df, partition_on=partition_on, text_column=self.text_field, num_workers=num_workers, ) elif os.environ["SHUFFLE_APPROACH"] == "dask_vanilla": self._logger.info("Using dask's vanilla shuffle") - output_df = merged_subset_df.shuffle(on=partition_on) + output_df = subset_merged_df.shuffle(on=partition_on) elif os.environ["SHUFFLE_APPROACH"] == "rearrange_by_column_direct": self._logger.info("Using rearrange_by_column_direct") output_df = rearange_by_column_direct( - df=merged_subset_df, + df=subset_merged_df, col=partition_on, - npartitions=merged_subset_df.npartitions, + npartitions=subset_merged_df.npartitions, ignore_index=True, excomms_default=True, ) @@ -1175,14 +1175,14 @@ def _batched_merge_and_write( # Use the internal dask-expr API output_df = new_collection( RearrangeByColumn( - frame=merged_subset_df.expr, + frame=subset_merged_df.expr, partitioning_index=partition_on, - npartitions_out=merged_subset_df.npartitions, + npartitions_out=subset_merged_df.npartitions, ignore_index=True, method="tasks", # Prevent staged shuffling by setting max_branch # to the number of input partitions + 1 - options={"max_branch": merged_subset_df.npartitions + 1}, + options={"max_branch": subset_merged_df.npartitions + 1}, ) ) elif os.environ["SHUFFLE_APPROACH"] == "rearrange_third_branch": @@ -1190,19 +1190,19 @@ def _batched_merge_and_write( from dask.dataframe.shuffle import rearrange_by_column output_df = rearrange_by_column( - merged_subset_df, + subset_merged_df, col=partition_on, shuffle_method="tasks", # Prevent staged shuffling by setting max_branch # to the number of input partitions + 1 - max_branch=merged_subset_df.npartitions + 1, - npartitions=merged_subset_df.npartitions, + max_branch=subset_merged_df.npartitions + 1, + npartitions=subset_merged_df.npartitions, ignore_index=True, ) else: raise ValueError("Invalid shuffle approach") - if self.int_to_str_id is not None: + if self.int_to_str_id is not None and output_df is not None: output_df = output_df.map_partitions( int_ids_to_str, id_column=self.int_to_str_id ) From d80960a17bc08b241d8c54f9e974c99417af351e Mon Sep 17 00:00:00 2001 From: Praateek Date: Tue, 19 Nov 2024 08:46:39 -0800 Subject: [PATCH 14/16] remove other implementations Signed-off-by: Praateek --- nemo_curator/modules/fuzzy_dedup.py | 55 +---------------------------- 1 file changed, 1 insertion(+), 54 deletions(-) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 03621bf35..c444ffd68 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -1147,60 +1147,7 @@ def _batched_merge_and_write( subset_bucket_df, merge_on, ) - if os.environ["SHUFFLE_APPROACH"] == "text_bytes_aware": - self._logger.info("Using text_bytes_aware_shuffle") - output_df = text_bytes_aware_shuffle( - df=subset_merged_df, - partition_on=partition_on, - text_column=self.text_field, - num_workers=num_workers, - ) - elif os.environ["SHUFFLE_APPROACH"] == "dask_vanilla": - self._logger.info("Using dask's vanilla shuffle") - output_df = subset_merged_df.shuffle(on=partition_on) - elif os.environ["SHUFFLE_APPROACH"] == "rearrange_by_column_direct": - self._logger.info("Using rearrange_by_column_direct") - output_df = rearange_by_column_direct( - df=subset_merged_df, - col=partition_on, - npartitions=subset_merged_df.npartitions, - ignore_index=True, - excomms_default=True, - ) - elif os.environ["SHUFFLE_APPROACH"] == "rearrange_second_branch": - self._logger.info("Using rearrange_second_branch") - from dask_expr._collection import new_collection - from dask_expr._shuffle import RearrangeByColumn - - # Use the internal dask-expr API - output_df = new_collection( - RearrangeByColumn( - frame=subset_merged_df.expr, - partitioning_index=partition_on, - npartitions_out=subset_merged_df.npartitions, - ignore_index=True, - method="tasks", - # Prevent staged shuffling by setting max_branch - # to the number of input partitions + 1 - options={"max_branch": subset_merged_df.npartitions + 1}, - ) - ) - elif os.environ["SHUFFLE_APPROACH"] == "rearrange_third_branch": - self._logger.info("Using rearrange_third_branch") - from dask.dataframe.shuffle import rearrange_by_column - - output_df = rearrange_by_column( - subset_merged_df, - col=partition_on, - shuffle_method="tasks", - # Prevent staged shuffling by setting max_branch - # to the number of input partitions + 1 - max_branch=subset_merged_df.npartitions + 1, - npartitions=subset_merged_df.npartitions, - ignore_index=True, - ) - else: - raise ValueError("Invalid shuffle approach") + output_df = subset_merged_df.shuffle(on=partition_on) if self.int_to_str_id is not None and output_df is not None: output_df = output_df.map_partitions( From b49e6604eadeb2549abc82263bfc421c8b571866 Mon Sep 17 00:00:00 2001 From: Praateek Date: Tue, 19 Nov 2024 09:03:57 -0800 Subject: [PATCH 15/16] remove text_bytes_aware Signed-off-by: Praateek --- nemo_curator/modules/fuzzy_dedup.py | 6 +- .../utils/fuzzy_dedup_utils/shuffle_utils.py | 83 +------------------ 2 files changed, 3 insertions(+), 86 deletions(-) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index c444ffd68..c5f9ba352 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -59,11 +59,7 @@ build_partition, get_agg_text_bytes_df, ) -from nemo_curator.utils.fuzzy_dedup_utils.shuffle_utils import ( - rearange_by_column_direct, - text_bytes_aware_shuffle, - write_partitioned_file, -) +from nemo_curator.utils.fuzzy_dedup_utils.shuffle_utils import write_partitioned_file class MinHash: diff --git a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py index 02377642e..2c93709b1 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional import cudf import dask_cuda @@ -21,10 +20,7 @@ from packaging.version import Version from nemo_curator._compat import query_planning_enabled -from nemo_curator.utils.fuzzy_dedup_utils.output_map_utils import ( - build_partition, - get_agg_text_bytes_df, -) +from nemo_curator.utils.fuzzy_dedup_utils.output_map_utils import build_partition dask_cuda_version = Version(dask_cuda.__version__) USE_EXCOMMS = ( @@ -90,6 +86,7 @@ def rearange_by_column_direct( else: from dask.dataframe.shuffle import rearrange_by_column + print("Using oldschool", flush=True) return rearrange_by_column( @@ -129,79 +126,3 @@ def get_shuffle_part_ids_df( df[partition_on] = agg_df[partition_on] df[output_col] = output_ar return df - - -def get_shuffle_partition_info( - df, - partition_on, - output_column, - text_column, - bytes_column="_text_bytes", - num_workers=None, -): - df[bytes_column] = df[text_column].map_partitions(lambda s: s.str.byte_count()) - agg_df, _ = get_agg_text_bytes_df( - df, agg_column=partition_on, bytes_column=bytes_column, n_partitions=1 - ) - del df - - agg_df = agg_df.reset_index(drop=True) - shuffle_part_ids = agg_df.map_partitions( - get_shuffle_part_ids_df, - partition_on, - size_col=bytes_column, - num_workers=num_workers, - output_col=output_column, - ).persist() - return shuffle_part_ids - - -def text_bytes_aware_shuffle( - df, - partition_on: str, - text_column: str, - num_workers: Optional[int] = None, -): - """ - This shuffle takes into account the text bytes of each partition - and tries to make sure that the output partitions do not exceed - the char limit of cuDF - - Args: - df: dask_cudf dataframe - partition_on: column name to partition on - text_column: column name for the text data - - Returns: - dask_cudf dataframe with _partitions columns or None if `df` is empty after the merge - """ - print("Starting text bytes aware shuffle", flush=True) - output_col = "_partitions" - - df = df.persist() - if len(df) == 0: - return None - shuffle_part_ids = get_shuffle_partition_info( - df=df, - partition_on=partition_on, - num_workers=num_workers, - output_column=output_col, - text_column=text_column, - ) - n_output_partitions = shuffle_part_ids[output_col].max().compute() + 1 - n_output_partitions = int(n_output_partitions) - df = df.merge(shuffle_part_ids, on=partition_on, how="inner").persist() - - df = ( - rearange_by_column_direct( - df, - col=output_col, - npartitions=n_output_partitions, - ignore_index=True, - excomms_default=True, - ) - .drop(columns=[output_col]) - .persist() - ) - print(f"Will write {len(df)} rows to disk", flush=True) - return df \ No newline at end of file From abc3dcb775934a8d633283776567fd35ccd5d291 Mon Sep 17 00:00:00 2001 From: Praateek Date: Tue, 19 Nov 2024 09:05:05 -0800 Subject: [PATCH 16/16] remove prints Signed-off-by: Praateek --- nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py index 2c93709b1..937518ec9 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py @@ -50,7 +50,6 @@ def rearange_by_column_direct( ): # Execute a "direct" shuffle operation without staging if config.get("explicit-comms", excomms_default): - print("Using excomms", flush=True) from dask_cuda.explicit_comms.dataframe.shuffle import ( shuffle as explicit_comms_shuffle, ) @@ -70,7 +69,6 @@ def rearange_by_column_direct( from dask_expr._shuffle import RearrangeByColumn # Use the internal dask-expr API - print("Using queryplanning", flush=True) return new_collection( RearrangeByColumn( frame=df.expr, @@ -87,8 +85,6 @@ def rearange_by_column_direct( else: from dask.dataframe.shuffle import rearrange_by_column - print("Using oldschool", flush=True) - return rearrange_by_column( df, col=col,