Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removal logic for Exact / Fuzzy Dedup #499

Conversation

praateekmahajan
Copy link
Collaborator

@praateekmahajan praateekmahajan commented Jan 28, 2025

Description

Removal Description

  1. We implement ~left-anti join using a broadcast merge. This allows us to scale even when right is greater than memory per node.
  2. We observe that the performance varies as num partitions (or partition sizes vary).
    • The fastest being where right is exactly one partition, in which case we broadcast all of right to all the workers, and perform a merge (resulting in least transfer).
  3. One downside of this approach is when right is bigger than system memory per node, then the merge operation will have to spill, and that won't be possible, hence resulting in a stuck state where dask pauses workers once it reaches 80% capacity in which case user will have to implement a different merge logic.

Class Description

  1. Implements an abstract class called Deduplicator (name suggestions welcome)
    1. Lives in nemo_curator._deduplicator.py
    7. Implements identify / remove / call
    8. call now accepts a boolean called perform_removal (alternative API's also accepted)
  2. Fuzzy / Exact Duplicates extend this class and the __call__ is now renamed as identify since the __call__ is implemented in base class

Usage

dataset = DocumentDataset.read_parquet(...)
exact_dedup = ExactDeduplicator(..)
duplicates = exact_dedup(dataset) # or exact_dedup.identify(exact_dedup)

exact_dedup.remove(dataset)

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

Signed-off-by: Praateek <[email protected]>
@praateekmahajan praateekmahajan changed the title [WIP] Removal logic for Exact / Fuzzy Dedup Removal logic for Exact / Fuzzy Dedup Jan 30, 2025
Signed-off-by: Praateek <[email protected]>
@praateekmahajan praateekmahajan added gpuci Run GPU CI/CD on PR and removed gpuci Run GPU CI/CD on PR labels Jan 30, 2025
Copy link
Collaborator

@ryantwolf ryantwolf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of comments about the organization. Can you make sure you update the API docs and deduplication sections in the user guide to mention all this?

One downside of this approach is when right is bigger than system memory per node, then the merge operation will have to spill, and that won't be possible, hence resulting in a stuck state where dask pauses workers once it reaches 80% capacity in which case user will have to implement a different merge logic.

Make sure to mention this in the user guide and describe roughly when "right is bigger than system memory per node" in layman's terms. Is this something we have/will encounter? Wondering if globally deduping 240 TB would trigger this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be moved under nemo_curator/modules/deduplicator.py? I think the function is something we want users to be able to access.

return removed_result


class Deduplicator(ABC):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how I feel about the abstraction. I have been wanting something like this, but I worry this is not as generalizable as I'd want it to be. For example, can semantic dedupe use this? I don't believe it can since the duplicates aren't all grouped like this. Imo, if the deduplication abstraction doesn't work for all our deduplication methods I don't want to have it so we don't confuse our users. We can always refactor out the logic into a base class if we find a general solution.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call it

Suggested change
class Deduplicator(ABC):
class DuplicateRemover:

or DuplicatesRemover instead?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible example usage:

remover = DuplicatesRemover(...)

exact_dupes = ExactDuplicates(...).identify_duplicates(...)
deduped_data = remover.remove_duplicates(exact_dupes)

fuzzy_dupes = FuzzyDuplicates(...).identify_duplicates(...)
deduped_data = remover.remove_duplicates(fuzzy_dupes)

# Could it be possible to call both simultaneously?
# deduped_data = remover.remove_duplicates(exact_dupes, fuzzy_dupes)
# deduped_data = remover.remove_duplicates([exact_dupes, fuzzy_dupes])
# or similar...

?

Should implement the logic for identifying duplicates in the dataset."""
raise NotImplementedError

def remove(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function could just be a helper function that's defined here that exact and fuzzy dedup import and use instead of the ABC.

return removed_result


class Deduplicator(ABC):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call it

Suggested change
class Deduplicator(ABC):
class DuplicateRemover:

or DuplicatesRemover instead?

from nemo_curator.datasets import DocumentDataset
from nemo_curator.log import create_logger
from nemo_curator.utils.distributed_utils import performance_report_if_with_ts_suffix
from nemo_curator.utils.gpu_utils import is_cudf_type


class ExactDuplicates:
class ExactDuplicates(Deduplicator):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class ExactDuplicates(Deduplicator):
class ExactDuplicates:

?

@@ -35,7 +34,7 @@
from nemo_curator.utils.distributed_utils import performance_report_if_with_ts_suffix


class FuzzyDuplicates:
class FuzzyDuplicates(Deduplicator):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class FuzzyDuplicates(Deduplicator):
class FuzzyDuplicates:

?

return removed_result


class Deduplicator(ABC):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible example usage:

remover = DuplicatesRemover(...)

exact_dupes = ExactDuplicates(...).identify_duplicates(...)
deduped_data = remover.remove_duplicates(exact_dupes)

fuzzy_dupes = FuzzyDuplicates(...).identify_duplicates(...)
deduped_data = remover.remove_duplicates(fuzzy_dupes)

# Could it be possible to call both simultaneously?
# deduped_data = remover.remove_duplicates(exact_dupes, fuzzy_dupes)
# deduped_data = remover.remove_duplicates([exact_dupes, fuzzy_dupes])
# or similar...

?

@praateekmahajan praateekmahajan marked this pull request as draft January 31, 2025 23:01
@praateekmahajan
Copy link
Collaborator Author

@sarahyurick / @ryantwolf given both of you had thoughts about it not being an abstract class, I took your suggestions and started this #509
@ryantwolf I'll add the docs once we are clear on what the API is, so that I don't have to change them repeatedly.

Regarding your question on if we've run into a case where it's greater than system (host) memory yes.
In one of our experiments, a 64TB dataset produced ~300gb of duplicates, and our host memory was 250gb per worker, which didn't work out for us, as CUDF during merge was spilling to host memory, and running OOM.
In that case, we'd likely recommend the user, to implement something custom where they do it at a partition level (where each partition typically is a year of CC or something similar)

It's still an open question on why dask / cudf needed to have the whole ~300gb in memory, but for now at that scale this won't work. It works wherever the duplicates size << host memory which can vary for different systems.

@sarahyurick sarahyurick closed this Feb 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
gpuci Run GPU CI/CD on PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants