Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exact / Fuzzy Duplicate Removal Improvements at Scale #529

Open
praateekmahajan opened this issue Feb 7, 2025 · 0 comments
Open

Exact / Fuzzy Duplicate Removal Improvements at Scale #529

praateekmahajan opened this issue Feb 7, 2025 · 0 comments

Comments

@praateekmahajan
Copy link
Collaborator

praateekmahajan commented Feb 7, 2025

As part of #335 we did some investigation to understand how we can improve our performance, and we came up with a simple broadcast merge to perform the left-anti join.

However that approach doesn't scale when right is greater than system memory (works fine when right is greater than device memory).

Followups

  1. Fuzzy Duplication
    1. We could explore reading dataset with add_filename=True even though we know that results in slower I/O.
    2. However the slower IO will only be paid once at minhash time (which is ~20% of the whole pipeline runtime)
    3. In subsequent LSH / Jaccard / Connected Components stages we use filename as a join key along with id_col
    4. And in the end once we've identified duplicates, we'll know which file in the original set they belong to, and we can perform a file by file removal.
    5. If this approach indeed works, we'll increase our identification runtime by a ~few % points, however removal should be faster and be able to complete given any dataset size, as we're not shuffling anything, just performing a file by file merge
  2. Exact Deduplication
    1. When we perform the hashing we anyway perform a shuffle followed by a map_partitions(lambda x : x[x["_hashes"].duplicated(keep=False)) to return all the set of duplicates.
    2. Instead of keep=False, we can do keep=first which will allow us to return the first document in each "group" which is the same as identify and removal.

Analysis from #335

  1. Broadcast merge in Dask, has different implementations for left vs inner, where one is much more parallelizable than the other.
  2. A left join shuffles right (partition A,B become P,Q) and shuffles each partition of left (1,2,3 -> 1_1, 1_2 and 2_1, 2_2 and 3_2, 3_3) and then performs a concatenation of merge(1_1, P) and merge(1_2, Q) and ends up OOMing
  3. We suspect an inner join could be faster, as it doesn't need to shuffle left or right, and can just merge(1, A) and merge(1, B) in parallel, and then in the end have a task to concat all 1's together. (in fact even this could be optimized if you don't need to concat and just write out merge(1, A)'s output.
  4. However this still means, a 64tb dataset (~20bn rows) read with 1gb partitions will have 64000 partitions and it's set of duplicates (30% of dataset) will be 20bn * 30% * 30 bytes (for the join key) ~180gb, and when read with ~1gb partitions, will also be 180 partitions resulting in a very large task graph of 11.5million tasks
  5. More so in practice, it is observe that for both left and inner join we're still spilling a lot during the cudf.merge even though from my understanding each merge should be only between 1 and P (which in example above are assumed to be 1gb each), but it might be that P and Q live on the same worker / graph isn't evaluated until the concat is happening in which case they're evaluated on the same worker
    1. In left it just spills so much so soon during the graph that we reach ~80% of system memory result in dask to pause the worker and get stuck in loop.
    2. In inner, while in theory it shouldn't have spilled as each time we're just merging 1, A and 1, B we still observe a lot of time during the profile being spent in the spilling.
    3. This problem seems to be only at "scale" and not for smaller datasets

Conclusion
Our analysis above showed that there is huge memory requirement for large merges, and to timebox this effort we didn't dive any deeper into merge performance at dask / cudf level. And pushed #509 which works for the ~most dataset sizes and is an improvement over our current implementation of compute. For datasets larger where duplicates are > system memory, we should try an approach indicated above, or users can perform removal at a partition level (e.g your dataset is partitioned by year, and each id_col has a year prefix so you know which id needs to be removed from which partition after the identify step)

Broadcast Inner Join

Image
After 1 hour on 8 H100s

Image

~96% of time being spent in merge, where almost all of it is spilling

Image


Task Graphs

Left Join Inner Join
Image Image

Code Logic

Left Join

for l in Lpartitions:
   graph["split-left-{l}"] = split("left-{l}", "id", Rpartitions)
   for r in Rpartitions:
       inter_key = "inter-left-{l}-{r}"
       graph[inter_key] = (
           apply,
           _merge_chunk_wrapper,
           (
               operator.getitem,
               "split-left-{l}",
               r,
           ),
           {"how": "left", "left_on": [], "right_on": []},
       )
       _concat_list.append(inter_key)
   graph["left-{l}"] = (_concat_wrapper, _concat_list)

Inner Join

for l in Lpartitions:
   # no split here
   for r in Rpartitions:
       inter_key = "inter-left-{l}-{r}"
       graph[inter_key] = (
           apply,
           _merge_chunk_wrapper,
           f"left-{l}", # merge with whole of left
           {"how": "left", "left_on": [], "right_on": []},
       )
       _concat_list.append(inter_key)
   graph["left-{l}"] = (_concat_wrapper, _concat_list)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant