NeMo-Curator icon indicating copy to clipboard operation
NeMo-Curator copied to clipboard

Fuzzy Duplicates Identification fails on batched_merge_and_write when document dataset is read with blocksize

Open praateekmahajan opened this issue 10 months ago • 1 comments

When reading dataset with DocumentDataset.read_parquet(..., blocksize=???, files_per_partition=None) and running fuzzy dedup, protocol=ucx false positive=on we run into an error during the shuffle_docs_on_buckets -> _batched_merge_and_write step

Stage3 (False Postive Check): Shuffle docs

  0%|          | 0/1 [00:00<?, ?it/s]
Started processing bucket-map partitions 0 through 1 of 1
Using 4 text partitions.
2025-01-02 08:31:21,288 - distributed.worker - ERROR - Compute Failed
Key:       ('read_parquet-fused-assign-7d4479cf1a375160a1452f529c7dfcef', 1)
State:     executing
Task:  <Task ('read_parquet-fused-assign-7d4479cf1a375160a1452f529c7dfcef', 1) _execute_subgraph(...)>
Exception: "ValueError('Cannot align indices with non-unique values')"
Traceback: '  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/dask_expr/_expr.py", line 1849, in assign\n    df[name] = val\n  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/utils/performance_tracking.py", line 51, in wrapper\n    return func(*args, **kwargs)\n  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/core/dataframe.py", line 1445, in __setitem__\n    self.insert(self._num_columns, arg, value)\n  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/utils/performance_tracking.py", line 51, in wrapper\n    return func(*args, **kwargs)\n  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/core/dataframe.py", line 3329, in insert\n    return self._insert(\n  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/utils/performance_tracking.py", line 51, in wrapper\n    return func(*args, **kwargs)\n  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/core/dataframe.py", line 3403, in _insert\n    value = value._align_to_index(\n  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/cudf/core/indexed_frame.py", line 3739, in _align_to_index\n    raise ValueError("Cannot align indices with non-unique values")\n'

2025-01-02 08:31:21,351 - distributed.worker - ERROR - Compute Failed
Key:       getitem-de6fe9f32b5dc94114977026f9696781
State:     executing
Task:  <Task 'getitem-de6fe9f32b5dc94114977026f9696781' getitem(...)>
Exception: 'KeyError(0)'
Traceback: ''


  0%|          | 0/1 [00:01<?, ?it/s]
Traceback (most recent call last):
  File "/benchmark/nemo-curator/scripts/run_curator_with_logs.py", line 1127, in main
    run_curation_pipeline(
  File "/benchmark/nemo-curator/scripts/run_curator_with_logs.py", line 969, in run_curation_pipeline
    curation_steps(dataset)
  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/nemo_curator/modules/meta.py", line 22, in __call__
    dataset = module(dataset)
  File "/benchmark/nemo-curator/scripts/pipeline_utils.py", line 115, in wrapped
    return func(
  File "/benchmark/nemo-curator/scripts/run_curator_with_logs.py", line 404, in fuzzy_dedupe
    duplicates = fuzzy_dup(dataset=dataset)
  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/nemo_curator/modules/fuzzy_dedup.py", line 673, in __call__
    self.jaccard_shuffle.shuffle_docs_on_buckets(
  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/nemo_curator/modules/fuzzy_dedup.py", line 1166, in shuffle_docs_on_buckets
    self._batched_merge_and_write(
  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/nemo_curator/modules/fuzzy_dedup.py", line 1309, in _batched_merge_and_write
    written_files = written_files.compute()
  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/dask_expr/_collection.py", line 480, in compute
    return DaskMethodsMixin.compute(out, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/dask/base.py", line 372, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/dask/base.py", line 660, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/opt/conda/envs/rapids/lib/python3.10/site-packages/distributed/client.py", line 2427, in _gather
    raise exception.with_traceback(traceback)
KeyError: 0

Environment

crossfit                           0.0.8
cudf-cu12                          25.2.0a219
cugraph-cu12                       25.2.0a55
cuml-cu12                          25.2.0a38
dask                               2024.12.1
dask-cuda                          25.2.0a14
dask-cudf-cu12                     25.2.0a219
dask-expr                          1.1.21
dask_labextension                  7.0.0
dask-mpi                           2022.4.0
distributed                        2024.12.1
distributed-ucxx-cu12              0.42.0a22
libcudf-cu12                       25.2.0a219
libucx-cu12                        1.17.0.post1
libucxx-cu12                       0.42.0a22
nemo_curator                       0.6.0rc0.dev1
pylibcudf-cu12                     25.2.0a219
pylibcugraph-cu12                  25.2.0a55
raft-dask-cu12                     25.2.0a30
rapids-dask-dependency             25.2.0a9
torch                              2.5.1
ucx-py-cu12                        0.42.0a8
ucxx-cu12                          0.42.0a22
curator d401333ec9d88c36494befc9ae7515574c4d89fb

praateekmahajan avatar Jan 02 '25 19:01 praateekmahajan