NeMo-Curator
NeMo-Curator copied to clipboard
Exact / Fuzzy Duplicate Removal Improvements at Scale
As part of https://github.com/NVIDIA/NeMo-Curator/issues/335 we did some investigation to understand how we can improve our performance, and we came up with a simple broadcast merge to perform the left-anti join.
However that approach doesn't scale when right is greater than system memory (works fine when right is greater than device memory).
Followups
- Fuzzy Duplication
- We could explore reading dataset with
add_filename=Trueeven though we know that results in slower I/O. - However the slower IO will only be paid once at minhash time (which is ~20% of the whole pipeline runtime)
- In subsequent LSH / Jaccard / Connected Components stages we use
filenameas a join key along with id_col - And in the end once we've identified duplicates, we'll know which file in the original set they belong to, and we can perform a file by file removal.
- If this approach indeed works, we'll increase our identification runtime by a ~few % points, however removal should be faster and be able to complete given any dataset size, as we're not shuffling anything, just performing a file by file merge
- We could explore reading dataset with
- Exact Deduplication
- When we perform the
hashingwe anyway perform a shuffle followed by amap_partitions(lambda x : x[x["_hashes"].duplicated(keep=False))to return all the set of duplicates. - Instead of
keep=False, we can dokeep=firstwhich will allow us to return the first document in each "group" which is the same as identify and removal.
- When we perform the
Analysis from #335
- Broadcast merge in Dask, has different implementations for left vs inner, where one is much more parallelizable than the other.
- A left join shuffles right (partition
A,BbecomeP,Q) and shuffles each partition of left (1,2,3->1_1, 1_2and2_1, 2_2and3_2, 3_3) and then performs aconcatenation ofmerge(1_1, P) and merge(1_2, Q)and ends up OOMing - We suspect an inner join could be faster, as it doesn't need to shuffle left or right, and can just
merge(1, A)andmerge(1, B)in parallel, and then in the end have a task toconcatall 1's together. (in fact even this could be optimized if you don't need to concat and just write out merge(1, A)'s output. - However this still means, a 64tb dataset (~20bn rows) read with 1gb partitions will have 64000 partitions and it's set of duplicates (30% of dataset) will be 20bn * 30% * 30 bytes (for the join key) ~180gb, and when read with ~1gb partitions, will also be 180 partitions resulting in a very large task graph of 11.5million tasks
- More so in practice, it is observe that for both
leftandinnerjoin we're still spilling a lot during thecudf.mergeeven though from my understanding each merge should be only between 1 and P (which in example above are assumed to be 1gb each), but it might be that P and Q live on the same worker / graph isn't evaluated until theconcatis happening in which case they're evaluated on the same worker- In left it just spills so much so soon during the graph that we reach ~80% of system memory result in dask to pause the worker and get stuck in loop.
- In inner, while in theory it shouldn't have spilled as each time we're just merging
1, Aand1, Bwe still observe a lot of time during the profile being spent in the spilling. - This problem seems to be only at "scale" and not for smaller datasets
Conclusion
Our analysis above showed that there is huge memory requirement for large merges, and to timebox this effort we didn't dive any deeper into merge performance at dask / cudf level. And pushed #509 which works for the ~most dataset sizes and is an improvement over our current implementation of compute. For datasets larger where duplicates are > system memory, we should try an approach indicated above, or users can perform removal at a partition level (e.g your dataset is partitioned by year, and each id_col has a year prefix so you know which id needs to be removed from which partition after the identify step)
Broadcast Inner Join
After 1 hour on 8 H100s
~96% of time being spent in merge, where almost all of it is spilling
Task Graphs
| Left Join | Inner Join |
|---|---|
Code Logic
Left Join
for l in Lpartitions:
graph["split-left-{l}"] = split("left-{l}", "id", Rpartitions)
for r in Rpartitions:
inter_key = "inter-left-{l}-{r}"
graph[inter_key] = (
apply,
_merge_chunk_wrapper,
(
operator.getitem,
"split-left-{l}",
r,
),
{"how": "left", "left_on": [], "right_on": []},
)
_concat_list.append(inter_key)
graph["left-{l}"] = (_concat_wrapper, _concat_list)
Inner Join
for l in Lpartitions:
# no split here
for r in Rpartitions:
inter_key = "inter-left-{l}-{r}"
graph[inter_key] = (
apply,
_merge_chunk_wrapper,
f"left-{l}", # merge with whole of left
{"how": "left", "left_on": [], "right_on": []},
)
_concat_list.append(inter_key)
graph["left-{l}"] = (_concat_wrapper, _concat_list)