NeMo-Curator
NeMo-Curator copied to clipboard
Fuzzy Duplicates Identification fails on batched_merge_and_write when document dataset is read with blocksize
When reading dataset with DocumentDataset.read_parquet(..., blocksize=???, files_per_partition=None) and running fuzzy dedup, protocol=ucx false positive=on we run into an error during the shuffle_docs_on_buckets -> _batched_merge_and_write step
Stage3 (False Postive Check): Shuffle docs
0%| | 0/1 [00:00<?, ?it/s]
Started processing bucket-map partitions 0 through 1 of 1
Using 4 text partitions.
2025-01-02 08:31:21,288 - distributed.worker - ERROR - Compute Failed
Key: ('read_parquet-fused-assign-7d4479cf1a375160a1452f529c7dfcef', 1)
State: executing
Task: <Task ('read_parquet-fused-assign-7d4479cf1a375160a1452f529c7dfcef', 1) _execute_subgraph(...)>
Exception: "ValueError('Cannot align indices with non-unique values')"
Traceback: ' File "/opt/conda/envs/rapids/lib/python3.10/site-packages/dask_expr/_expr.py", line 1849, in assign\n df[name] = val\n File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/utils/performance_tracking.py", line 51, in wrapper\n return func(*args, **kwargs)\n File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/core/dataframe.py", line 1445, in __setitem__\n self.insert(self._num_columns, arg, value)\n File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/utils/performance_tracking.py", line 51, in wrapper\n return func(*args, **kwargs)\n File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/core/dataframe.py", line 3329, in insert\n return self._insert(\n File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/utils/performance_tracking.py", line 51, in wrapper\n return func(*args, **kwargs)\n File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/core/dataframe.py", line 3403, in _insert\n value = value._align_to_index(\n File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/core/indexed_frame.py", line 3739, in _align_to_index\n raise ValueError("Cannot align indices with non-unique values")\n'
2025-01-02 08:31:21,351 - distributed.worker - ERROR - Compute Failed
Key: getitem-de6fe9f32b5dc94114977026f9696781
State: executing
Task: <Task 'getitem-de6fe9f32b5dc94114977026f9696781' getitem(...)>
Exception: 'KeyError(0)'
Traceback: ''
0%| | 0/1 [00:01<?, ?it/s]
Traceback (most recent call last):
File "/benchmark/nemo-curator/scripts/run_curator_with_logs.py", line 1127, in main
run_curation_pipeline(
File "/benchmark/nemo-curator/scripts/run_curator_with_logs.py", line 969, in run_curation_pipeline
curation_steps(dataset)
File "/opt/conda/envs/rapids/lib/python3.10/site-packages/nemo_curator/modules/meta.py", line 22, in __call__
dataset = module(dataset)
File "/benchmark/nemo-curator/scripts/pipeline_utils.py", line 115, in wrapped
return func(
File "/benchmark/nemo-curator/scripts/run_curator_with_logs.py", line 404, in fuzzy_dedupe
duplicates = fuzzy_dup(dataset=dataset)
File "/opt/conda/envs/rapids/lib/python3.10/site-packages/nemo_curator/modules/fuzzy_dedup.py", line 673, in __call__
self.jaccard_shuffle.shuffle_docs_on_buckets(
File "/opt/conda/envs/rapids/lib/python3.10/site-packages/nemo_curator/modules/fuzzy_dedup.py", line 1166, in shuffle_docs_on_buckets
self._batched_merge_and_write(
File "/opt/conda/envs/rapids/lib/python3.10/site-packages/nemo_curator/modules/fuzzy_dedup.py", line 1309, in _batched_merge_and_write
written_files = written_files.compute()
File "/opt/conda/envs/rapids/lib/python3.10/site-packages/dask_expr/_collection.py", line 480, in compute
return DaskMethodsMixin.compute(out, **kwargs)
File "/opt/conda/envs/rapids/lib/python3.10/site-packages/dask/base.py", line 372, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/opt/conda/envs/rapids/lib/python3.10/site-packages/dask/base.py", line 660, in compute
results = schedule(dsk, keys, **kwargs)
File "/opt/conda/envs/rapids/lib/python3.10/site-packages/distributed/client.py", line 2427, in _gather
raise exception.with_traceback(traceback)
KeyError: 0
Environment
crossfit 0.0.8
cudf-cu12 25.2.0a219
cugraph-cu12 25.2.0a55
cuml-cu12 25.2.0a38
dask 2024.12.1
dask-cuda 25.2.0a14
dask-cudf-cu12 25.2.0a219
dask-expr 1.1.21
dask_labextension 7.0.0
dask-mpi 2022.4.0
distributed 2024.12.1
distributed-ucxx-cu12 0.42.0a22
libcudf-cu12 25.2.0a219
libucx-cu12 1.17.0.post1
libucxx-cu12 0.42.0a22
nemo_curator 0.6.0rc0.dev1
pylibcudf-cu12 25.2.0a219
pylibcugraph-cu12 25.2.0a55
raft-dask-cu12 25.2.0a30
rapids-dask-dependency 25.2.0a9
torch 2.5.1
ucx-py-cu12 0.42.0a8
ucxx-cu12 0.42.0a22
curator d401333ec9d88c36494befc9ae7515574c4d89fb