NeMo-Curator icon indicating copy to clipboard operation
NeMo-Curator copied to clipboard

Consecutive execution of fuzzy deduplication on different columns fails with errors

Open sarahyurick opened this issue 9 months ago • 4 comments
trafficstars

Python script to reproduce:

from functools import partial

from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig, Sequential, get_client
from nemo_curator.datasets import DocumentDataset


def fuzzy_dedupe(dataset, cache_dir, id_field, text_field):
    # dataset.df.reset_index(drop=True)

    fuzzy_dedup_config = FuzzyDuplicatesConfig(
        cache_dir=cache_dir,
        id_field=id_field,
        text_field=text_field,
        false_positive_check=True,
    )

    fuzzy_dup = FuzzyDuplicates(config=fuzzy_dedup_config)

    duplicates = fuzzy_dup(dataset=dataset)

    docs_to_remove = duplicates.df.map_partitions(
        lambda x: x[x.group.duplicated(keep="first")]
    )

    result = dataset.df[~dataset.df[id_field].isin(docs_to_remove[id_field].compute())]

    print("Quick look at the DataFrame here...")
    print(result.head())
    print(result.columns)
    print(len(result))
    print("---")

    return DocumentDataset(result)


def main():
    client = get_client(cluster_type="gpu")

    # JSONL dataset with "text" and "text2" fields
    # ID field is "adlr_id"
    dataset = DocumentDataset.read_json(
        "/path/to/jsonl/data",
        backend="cudf",
    )

    df = dataset.df
    df["text2"] = df["text2"].sample(frac=1, random_state=42).reset_index(drop=True)
    dataset = DocumentDataset(dataset_df=df)

    print("Original DataFrame:")
    print(dataset.df.head())
    print(dataset.df.columns)
    print(len(dataset.df))
    print("---")

    # Use different cache directories to avoid existing cache_dir errors
    cache_input = "/path/to/cache_input"
    dedupe_input = partial(
        fuzzy_dedupe, id_field="adlr_id", text_field="text", cache_dir=cache_input
    )

    cache_output = (
        "/path/to/cache_output"
    )
    dedupe_output = partial(
        fuzzy_dedupe, id_field="adlr_id", text_field="text2", cache_dir=cache_output
    )

    curation_steps = Sequential(
        [
            dedupe_input,
            dedupe_output,
        ]
    )
    dataset = curation_steps(dataset)
    dataset = dataset.persist()

    print("Result:")
    print(dataset.df.head())
    print(dataset.df.columns)
    print(len(dataset.df))
    print("---")

    client.close()


if __name__ == "__main__":
    main()

Console output:

(nemo_curator) syurick@exp02:~$ python /home/nfs/syurick/NaeMo-Curator-scratch/fuzzy_dedup_api/test1.py
cuDF Spilling is enabled
Reading 2 files with blocksize='1gb' / files_per_partition=None
/home/nfs/syurick/NeMo-Curator/nemo_curator/utils/distributed_utils.py:412: UserWarning: If underlying JSONL data does not have a consistent schema, reading with blocksize will fail. Please use files_per_partition approach instead.
  warnings.warn(
Original DataFrame:
                                  adlr_id  ...                                                url
0  ad21470b-7b97-4fb7-95db-1f51e654771e-0  ...  http://128922.homepagemodules.de/u1797_AutumnD...
1  6d91b7a3-2444-4edc-afe8-479fa95f83a3-1  ...  http://1actaday.blogspot.com/2016/11/before-fl...
2  cb27d499-7eee-4710-b68c-c01f6c874496-2  ...  http://22508.dynamicboard.de/t1892f2-pnigeriaj...
3  beeb1a42-030c-4942-8d46-0ce5e5baae96-3  ...  http://22508.dynamicboard.de/t2384f22-Were-fig...
4  650600a6-9a51-4900-8d9a-e2a09cc2db4b-4  ...  http://2ndgoorkhas.com/this-day-in-history/191...

[5 rows x 8 columns]
Index(['adlr_id', 'filename', 'language', 'quality_pred', 'source_id', 'text',
       'text2', 'url'],
      dtype='object')
23415
---
/home/nfs/syurick/NeMo-Curator/nemo_curator/modules/config.py:94: UserWarning: Identifying false positives during the Minhash deduplication is computationally expensive. For improved performance consider setting this to False
  warnings.warn(
Stage 1: Starting Minhash + LSH computation
Stage 1: Minhash + LSH complete!
Stage 2 (False Positive Check): Starting Map_Buckets
Stage 2 (False Postive Check): Map_Buckets Complete!
Stage 3 (False Postive Check): Shuffle docs
  0%|                                                                                              | 0/1 [00:00<?, ?it/s]
Started processing bucket-map partitions 0 through 1 of 1
Using 16 text partitions.
Text-df partition  1/1 completed in 1.2325401306152344
Bucket partition  1/1 completed in 1.2382714748382568
100%|██████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:01<00:00,  1.24s/it]
Stage 3 (False Postive Check): Shuffle docs complete!
Stage 4 (False Postive Check): Jaccard Similarity in Buckets
Stage 4 (False Postive Check): Jaccard Similarity in Buckets Complete!
Stage 5: Connected Components across buckets
/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/cudf/core/reshape.py:383: FutureWarning: The behavior of array concatenation with empty entries is deprecated. In a future version, this will no longer exclude empty items when determining the result dtype. To retain the old behavior, exclude the empty entries before the concat operation.
  warnings.warn(
Stage 5: Connected Components across buckets complete!
Quick look at the DataFrame here...
                                  adlr_id  ...                                                url
0  ad21470b-7b97-4fb7-95db-1f51e654771e-0  ...  http://128922.homepagemodules.de/u1797_AutumnD...
1  6d91b7a3-2444-4edc-afe8-479fa95f83a3-1  ...  http://1actaday.blogspot.com/2016/11/before-fl...
2  cb27d499-7eee-4710-b68c-c01f6c874496-2  ...  http://22508.dynamicboard.de/t1892f2-pnigeriaj...
3  beeb1a42-030c-4942-8d46-0ce5e5baae96-3  ...  http://22508.dynamicboard.de/t2384f22-Were-fig...
4  650600a6-9a51-4900-8d9a-e2a09cc2db4b-4  ...  http://2ndgoorkhas.com/this-day-in-history/191...

[5 rows x 8 columns]
Index(['adlr_id', 'filename', 'language', 'quality_pred', 'source_id', 'text',
       'text2', 'url'],
      dtype='object')
23168
---
/home/nfs/syurick/NeMo-Curator/nemo_curator/modules/config.py:94: UserWarning: Identifying false positives during the Minhash deduplication is computationally expensive. For improved performance consider setting this to False
  warnings.warn(
Stage 1: Starting Minhash + LSH computation
Stage 1: Minhash + LSH complete!
Stage 2 (False Positive Check): Starting Map_Buckets
Stage 2 (False Postive Check): Map_Buckets Complete!
Stage 3 (False Postive Check): Shuffle docs
  0%|                                                                                              | 0/1 [00:00<?, ?it/s]
Started processing bucket-map partitions 0 through 1 of 1
Using 16 text partitions.
2025-01-29 14:16:05,639 - distributed.worker - ERROR - Compute Failed
Key:       _run_coroutine_on_worker-d1c0dcf9-e5c3-4e0f-8c54-5774bce32a8a
State:     executing
Task:  <Task '_run_coroutine_on_worker-d1c0dcf9-e5c3-4e0f-8c54-5774bce32a8a' _run_coroutine_on_worker(...)>
Exception: "RuntimeError('CUDF failure at: /__w/cudf/cudf/cpp/src/partitioning/partitioning.cu:793: Unexpected null values in partition_map.')"
Traceback: '  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cuda/explicit_comms/comms.py", line 101, in _run_coroutine_on_worker\n    return executor.submit(_run).result()\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/concurrent/futures/_base.py", line 458, in result\n    return self.__get_result()\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result\n    raise self._exception\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/concurrent/futures/thread.py", line 58, in run\n    result = self.fn(*self.args, **self.kwargs)\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cuda/explicit_comms/comms.py", line 98, in _run\n    return future.result()\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/concurrent/futures/_base.py", line 458, in result\n    return self.__get_result()\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result\n    raise self._exception\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cuda/explicit_comms/dataframe/shuffle.py", line 381, in shuffle_task\n    partitions = create_partitions(\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cuda/explicit_comms/dataframe/shuffle.py", line 245, in create_partitions\n    partition_dataframe(\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cuda/explicit_comms/dataframe/shuffle.py", line 203, in partition_dataframe\n    return group_split_dispatch(df, map_index, npartitions, ignore_index=ignore_index)\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask/utils.py", line 772, in __call__\n    return meth(arg, *args, **kwargs)\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/cudf/utils/performance_tracking.py", line 51, in wrapper\n    return func(*args, **kwargs)\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cudf/backends.py", line 443, in group_split_cudf\n    df.scatter_by_map(\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/cudf/utils/performance_tracking.py", line 51, in wrapper\n    return func(*args, **kwargs)\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/cudf/core/dataframe.py", line 2487, in scatter_by_map\n    partitioned_columns, output_offsets = libcudf.partitioning.partition(\n  File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/contextlib.py", line 79, in inner\n    return func(*args, **kwds)\n  File "partitioning.pyx", line 48, in cudf._lib.partitioning.partition\n  File "partitioning.pyx", line 57, in pylibcudf.partitioning.partition\n  File "partitioning.pyx", line 82, in pylibcudf.partitioning.partition\n'

Please note, this error only happens when false_positive_check=True. When False (the default), it works as expected.

sarahyurick avatar Jan 29 '25 22:01 sarahyurick