NeMo-Curator
NeMo-Curator copied to clipboard
Consecutive execution of fuzzy deduplication on different columns fails with errors
trafficstars
Python script to reproduce:
from functools import partial
from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig, Sequential, get_client
from nemo_curator.datasets import DocumentDataset
def fuzzy_dedupe(dataset, cache_dir, id_field, text_field):
# dataset.df.reset_index(drop=True)
fuzzy_dedup_config = FuzzyDuplicatesConfig(
cache_dir=cache_dir,
id_field=id_field,
text_field=text_field,
false_positive_check=True,
)
fuzzy_dup = FuzzyDuplicates(config=fuzzy_dedup_config)
duplicates = fuzzy_dup(dataset=dataset)
docs_to_remove = duplicates.df.map_partitions(
lambda x: x[x.group.duplicated(keep="first")]
)
result = dataset.df[~dataset.df[id_field].isin(docs_to_remove[id_field].compute())]
print("Quick look at the DataFrame here...")
print(result.head())
print(result.columns)
print(len(result))
print("---")
return DocumentDataset(result)
def main():
client = get_client(cluster_type="gpu")
# JSONL dataset with "text" and "text2" fields
# ID field is "adlr_id"
dataset = DocumentDataset.read_json(
"/path/to/jsonl/data",
backend="cudf",
)
df = dataset.df
df["text2"] = df["text2"].sample(frac=1, random_state=42).reset_index(drop=True)
dataset = DocumentDataset(dataset_df=df)
print("Original DataFrame:")
print(dataset.df.head())
print(dataset.df.columns)
print(len(dataset.df))
print("---")
# Use different cache directories to avoid existing cache_dir errors
cache_input = "/path/to/cache_input"
dedupe_input = partial(
fuzzy_dedupe, id_field="adlr_id", text_field="text", cache_dir=cache_input
)
cache_output = (
"/path/to/cache_output"
)
dedupe_output = partial(
fuzzy_dedupe, id_field="adlr_id", text_field="text2", cache_dir=cache_output
)
curation_steps = Sequential(
[
dedupe_input,
dedupe_output,
]
)
dataset = curation_steps(dataset)
dataset = dataset.persist()
print("Result:")
print(dataset.df.head())
print(dataset.df.columns)
print(len(dataset.df))
print("---")
client.close()
if __name__ == "__main__":
main()
Console output:
(nemo_curator) syurick@exp02:~$ python /home/nfs/syurick/NaeMo-Curator-scratch/fuzzy_dedup_api/test1.py
cuDF Spilling is enabled
Reading 2 files with blocksize='1gb' / files_per_partition=None
/home/nfs/syurick/NeMo-Curator/nemo_curator/utils/distributed_utils.py:412: UserWarning: If underlying JSONL data does not have a consistent schema, reading with blocksize will fail. Please use files_per_partition approach instead.
warnings.warn(
Original DataFrame:
adlr_id ... url
0 ad21470b-7b97-4fb7-95db-1f51e654771e-0 ... http://128922.homepagemodules.de/u1797_AutumnD...
1 6d91b7a3-2444-4edc-afe8-479fa95f83a3-1 ... http://1actaday.blogspot.com/2016/11/before-fl...
2 cb27d499-7eee-4710-b68c-c01f6c874496-2 ... http://22508.dynamicboard.de/t1892f2-pnigeriaj...
3 beeb1a42-030c-4942-8d46-0ce5e5baae96-3 ... http://22508.dynamicboard.de/t2384f22-Were-fig...
4 650600a6-9a51-4900-8d9a-e2a09cc2db4b-4 ... http://2ndgoorkhas.com/this-day-in-history/191...
[5 rows x 8 columns]
Index(['adlr_id', 'filename', 'language', 'quality_pred', 'source_id', 'text',
'text2', 'url'],
dtype='object')
23415
---
/home/nfs/syurick/NeMo-Curator/nemo_curator/modules/config.py:94: UserWarning: Identifying false positives during the Minhash deduplication is computationally expensive. For improved performance consider setting this to False
warnings.warn(
Stage 1: Starting Minhash + LSH computation
Stage 1: Minhash + LSH complete!
Stage 2 (False Positive Check): Starting Map_Buckets
Stage 2 (False Postive Check): Map_Buckets Complete!
Stage 3 (False Postive Check): Shuffle docs
0%| | 0/1 [00:00<?, ?it/s]
Started processing bucket-map partitions 0 through 1 of 1
Using 16 text partitions.
Text-df partition 1/1 completed in 1.2325401306152344
Bucket partition 1/1 completed in 1.2382714748382568
100%|██████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:01<00:00, 1.24s/it]
Stage 3 (False Postive Check): Shuffle docs complete!
Stage 4 (False Postive Check): Jaccard Similarity in Buckets
Stage 4 (False Postive Check): Jaccard Similarity in Buckets Complete!
Stage 5: Connected Components across buckets
/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/cudf/core/reshape.py:383: FutureWarning: The behavior of array concatenation with empty entries is deprecated. In a future version, this will no longer exclude empty items when determining the result dtype. To retain the old behavior, exclude the empty entries before the concat operation.
warnings.warn(
Stage 5: Connected Components across buckets complete!
Quick look at the DataFrame here...
adlr_id ... url
0 ad21470b-7b97-4fb7-95db-1f51e654771e-0 ... http://128922.homepagemodules.de/u1797_AutumnD...
1 6d91b7a3-2444-4edc-afe8-479fa95f83a3-1 ... http://1actaday.blogspot.com/2016/11/before-fl...
2 cb27d499-7eee-4710-b68c-c01f6c874496-2 ... http://22508.dynamicboard.de/t1892f2-pnigeriaj...
3 beeb1a42-030c-4942-8d46-0ce5e5baae96-3 ... http://22508.dynamicboard.de/t2384f22-Were-fig...
4 650600a6-9a51-4900-8d9a-e2a09cc2db4b-4 ... http://2ndgoorkhas.com/this-day-in-history/191...
[5 rows x 8 columns]
Index(['adlr_id', 'filename', 'language', 'quality_pred', 'source_id', 'text',
'text2', 'url'],
dtype='object')
23168
---
/home/nfs/syurick/NeMo-Curator/nemo_curator/modules/config.py:94: UserWarning: Identifying false positives during the Minhash deduplication is computationally expensive. For improved performance consider setting this to False
warnings.warn(
Stage 1: Starting Minhash + LSH computation
Stage 1: Minhash + LSH complete!
Stage 2 (False Positive Check): Starting Map_Buckets
Stage 2 (False Postive Check): Map_Buckets Complete!
Stage 3 (False Postive Check): Shuffle docs
0%| | 0/1 [00:00<?, ?it/s]
Started processing bucket-map partitions 0 through 1 of 1
Using 16 text partitions.
2025-01-29 14:16:05,639 - distributed.worker - ERROR - Compute Failed
Key: _run_coroutine_on_worker-d1c0dcf9-e5c3-4e0f-8c54-5774bce32a8a
State: executing
Task: <Task '_run_coroutine_on_worker-d1c0dcf9-e5c3-4e0f-8c54-5774bce32a8a' _run_coroutine_on_worker(...)>
Exception: "RuntimeError('CUDF failure at: /__w/cudf/cudf/cpp/src/partitioning/partitioning.cu:793: Unexpected null values in partition_map.')"
Traceback: ' File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cuda/explicit_comms/comms.py", line 101, in _run_coroutine_on_worker\n return executor.submit(_run).result()\n File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/concurrent/futures/_base.py", line 458, in result\n return self.__get_result()\n File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result\n raise self._exception\n File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/concurrent/futures/thread.py", line 58, in run\n result = self.fn(*self.args, **self.kwargs)\n File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cuda/explicit_comms/comms.py", line 98, in _run\n return future.result()\n File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/concurrent/futures/_base.py", line 458, in result\n return self.__get_result()\n File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result\n raise self._exception\n File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cuda/explicit_comms/dataframe/shuffle.py", line 381, in shuffle_task\n partitions = create_partitions(\n File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cuda/explicit_comms/dataframe/shuffle.py", line 245, in create_partitions\n partition_dataframe(\n File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cuda/explicit_comms/dataframe/shuffle.py", line 203, in partition_dataframe\n return group_split_dispatch(df, map_index, npartitions, ignore_index=ignore_index)\n File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask/utils.py", line 772, in __call__\n return meth(arg, *args, **kwargs)\n File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/cudf/utils/performance_tracking.py", line 51, in wrapper\n return func(*args, **kwargs)\n File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/dask_cudf/backends.py", line 443, in group_split_cudf\n df.scatter_by_map(\n File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/cudf/utils/performance_tracking.py", line 51, in wrapper\n return func(*args, **kwargs)\n File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/site-packages/cudf/core/dataframe.py", line 2487, in scatter_by_map\n partitioned_columns, output_offsets = libcudf.partitioning.partition(\n File "/home/nfs/syurick/miniforge3/envs/nemo_curator/lib/python3.10/contextlib.py", line 79, in inner\n return func(*args, **kwds)\n File "partitioning.pyx", line 48, in cudf._lib.partitioning.partition\n File "partitioning.pyx", line 57, in pylibcudf.partitioning.partition\n File "partitioning.pyx", line 82, in pylibcudf.partitioning.partition\n'
Please note, this error only happens when false_positive_check=True. When False (the default), it works as expected.