NeMo-Curator icon indicating copy to clipboard operation
NeMo-Curator copied to clipboard

nemo_curator.utils.distributed_utils.read_data doesn't work for my own parquet dataset unless cleaning text by myself

Open RickyShi46 opened this issue 10 months ago • 1 comments
trafficstars

Describe the bug

I encountered the following bug when using our own Parquet dataset with nemo_curator.utils.distributed_utils.read_data and nemo_curator.AddId operations, following the approach outlined in this tutorial. However, when I manually clean the data using ftfy before performing the read_data and AddId operations, the process completes successfully. I am aware that you also have data cleaning methods such as nemo_curator.Modify( UnicodeReformatter(), text_field=input_text_field ), but this similarly requires performing the read_data operation on the dataset first.

root@dask-nemo-pure-text-0:/lpai/volumes/lmp-guan/sy/11-26-dedup-test# python3 dedup-en-dataset.py
Num Workers = 2
Allowed failures: 100
Reading 8 files
Writing to disk complete for 8 partitions
Reading 8 files
Writing to disk complete for 8 partitions
Reading 8 files
Writing to disk complete for 8 partitions
Reading 8 files
Writing to disk complete for 8 partitions
Reading 8 files
Writing to disk complete for 8 partitions
Reading 8 files
Traceback (most
recent call last):
File "/mnt/volumes/lmp-guan/sy/11-26-dedup-test/dedup-en-dataset.py", line 96, in <module>
main()
File "/mnt/volumes/lmp-guan/sy/11-26-dedup-test/dedup-en-dataset.py", line 66, in main
id_dataset.to_json(id_data_dir, write_to_filename=True)
File_"/opt/NeMo-Curator/nemo_curator/datasets/doc_dataset.pyy", line 103, in to_json
write_to_disk(
File "/opt/NeMo-Curator/nemo_curator/utils/distributed_utils.py", line 514, in write_to_disk
output = output.compute()
File "/usr/local/lib/python3.10/dist-packages/dask/base.py", line 376, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/dask/base.py", line 662, in compute
results = schedule(dsk, keys, **kwargs)
File "/usr/local/lib/python3.10/dist-packages/distributed/client.py", line 2423, in _gather
raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: Attempted to runtask ('single_partition_write_with_filename-f6c1765edbf5a25f2b
0fe44f721022b9', 0) on 4 different workers, bbut all those workers died while running it.The last worker that attem
pt to run the task was tcp://127.0.0.1:37323. Inspecting worrker logs is often a good next step to diagnose what wen
t wrong. For more information see https://diistributed.dask.org/en/stable/killed.html.

The terminal that starts the dask worker will show the error :

root@dask-nemo-pure-text-0:/lpai# dask worker localhost:8786 --local-directory /lpai/volumes/lmp-guan/dask-tmp --nworkers=2
2024-12-12 07:33:01,905 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:44623'
2024-12-12 07:33:01,909 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:34143'
2024-12-12 07:33:02,704 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:44687
2024-12-12 07:33:02,704 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:40197
2024-12-12 07:33:02,704 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:44687
2024-12-12 07:33:02,704 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:40197
2024-12-12 07:33:02,704 - distributed.worker - INFO -          dashboard at:            127.0.0.1:41285
2024-12-12 07:33:02,704 - distributed.worker - INFO -          dashboard at:            127.0.0.1:35369
2024-12-12 07:33:02,704 - distributed.worker - INFO - Waiting to connect to:       tcp://localhost:8786
2024-12-12 07:33:02,704 - distributed.worker - INFO - Waiting to connect to:       tcp://localhost:8786
2024-12-12 07:33:02,704 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 07:33:02,704 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 07:33:02,704 - distributed.worker - INFO -               Threads:                         35
2024-12-12 07:33:02,704 - distributed.worker - INFO -               Threads:                         35
2024-12-12 07:33:02,704 - distributed.worker - INFO -                Memory:                 160.00 GiB
2024-12-12 07:33:02,704 - distributed.worker - INFO -       Local Directory: /lpai/volumes/lmp-guan/dask-tmp/dask-scratch-space/worker-mwqve110
2024-12-12 07:33:02,704 - distributed.worker - INFO -                Memory:                 160.00 GiB
2024-12-12 07:33:02,704 - distributed.worker - INFO -       Local Directory: /lpai/volumes/lmp-guan/dask-tmp/dask-scratch-space/worker-75nl0lei
2024-12-12 07:33:02,704 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 07:33:02,704 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 07:33:03,280 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-12-12 07:33:03,281 - distributed.worker - INFO -         Registered to:       tcp://localhost:8786
2024-12-12 07:33:03,281 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 07:33:03,281 - distributed.core - INFO - Starting established connection to tcp://localhost:8786
2024-12-12 07:33:03,290 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-12-12 07:33:03,291 - distributed.worker - INFO -         Registered to:       tcp://localhost:8786
2024-12-12 07:33:03,291 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 07:33:03,292 - distributed.core - INFO - Starting established connection to tcp://localhost:8786
2024-12-12 07:33:33,166 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.63s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:33:47,127 - distributed.core - INFO - Event loop was unresponsive in Worker for 6.94s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:33:53,803 - distributed.core - INFO - Event loop was unresponsive in Worker for 6.12s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:34:18,915 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.16s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:34:22,204 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.29s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:34:25,273 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.15s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:34:32,641 - distributed.core - INFO - Event loop was unresponsive in Worker for 10.44s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:34:40,474 - distributed.core - INFO - Event loop was unresponsive in Worker for 15.20s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:35:10,018 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.67s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:35:12,616 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.62s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:37:13,644 - distributed.core - INFO - Event loop was unresponsive in Worker for 123.63s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:38:22,558 - distributed.core - INFO - Event loop was unresponsive in Worker for 189.94s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:38:34,465 - distributed.core - INFO - Event loop was unresponsive in Worker for 80.82s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:38:47,169 - distributed.core - INFO - Event loop was unresponsive in Worker for 24.30s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:39:40,485 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.57s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:39:47,336 - distributed.core - INFO - Event loop was unresponsive in Worker for 6.85s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:39:49,675 - distributed.core - INFO - Event loop was unresponsive in Worker for 10.50s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:39:50,510 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.17s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:42:44,484 - distributed.core - INFO - Event loop was unresponsive in Worker for 144.75s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:42:55,764 - distributed.core - INFO - Event loop was unresponsive in Worker for 11.28s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:43:11,145 - distributed.core - INFO - Event loop was unresponsive in Worker for 171.46s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:43:13,438 - distributed.core - INFO - Event loop was unresponsive in Worker for 17.67s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:43:20,823 - distributed.core - INFO - Event loop was unresponsive in Worker for 9.68s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:43:37,805 - distributed.core - INFO - Event loop was unresponsive in Worker for 16.64s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:43:41,748 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.94s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:43:45,435 - distributed.core - INFO - Event loop was unresponsive in Worker for 32.00s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:44:09,619 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.03s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:44:39,284 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.12s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:44:45,625 - distributed.core - INFO - Event loop was unresponsive in Worker for 7.96s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:44:49,009 - distributed.core - INFO - Event loop was unresponsive in Worker for 9.72s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:45:26,668 - distributed.core - INFO - Event loop was unresponsive in Worker for 11.00s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:45:28,544 - distributed.core - INFO - Event loop was unresponsive in Worker for 10.66s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:45:52,904 - distributed.core - INFO - Event loop was unresponsive in Worker for 24.36s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:48:43,041 - distributed.core - INFO - Event loop was unresponsive in Worker for 196.24s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:48:45,861 - distributed.core - INFO - Event loop was unresponsive in Worker for 172.96s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:48:46,152 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.11s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:49:08,403 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.12s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:49:36,473 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.20s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:49:41,890 - distributed.core - INFO - Event loop was unresponsive in Worker for 7.83s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:49:46,064 - distributed.core - INFO - Event loop was unresponsive in Worker for 9.59s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:50:48,730 - distributed.core - INFO - Event loop was unresponsive in Worker for 34.86s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:52:18,330 - distributed.core - INFO - Event loop was unresponsive in Worker for 123.92s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:53:12,134 - distributed.core - INFO - Event loop was unresponsive in Worker for 143.40s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:53:18,679 - distributed.core - INFO - Event loop was unresponsive in Worker for 60.35s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:53:29,038 - distributed.core - INFO - Event loop was unresponsive in Worker for 16.81s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:53:32,511 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.47s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:53:35,430 - distributed.core - INFO - Event loop was unresponsive in Worker for 16.47s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:53:41,302 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.87s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:54:30,260 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.14s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:54:39,575 - distributed.core - INFO - Event loop was unresponsive in Worker for 9.32s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:54:40,868 - distributed.core - INFO - Event loop was unresponsive in Worker for 8.15s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:56:27,921 - distributed.core - INFO - Event loop was unresponsive in Worker for 80.94s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:56:42,723 - distributed.core - INFO - Event loop was unresponsive in Worker for 94.91s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:58:18,566 - distributed.core - INFO - Event loop was unresponsive in Worker for 95.84s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:58:28,999 - distributed.core - INFO - Event loop was unresponsive in Worker for 121.08s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:58:50,769 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.22s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:59:26,358 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.94s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 07:59:26,472 - distributed.core - INFO - Event loop was unresponsive in Worker for 8.78s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:00:27,865 - distributed.core - INFO - Event loop was unresponsive in Worker for 34.24s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:02:14,697 - distributed.core - INFO - Event loop was unresponsive in Worker for 142.05s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:02:39,516 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:44687 (pid=37600) exceeded 95% memory budget. Restarting...
2024-12-12 08:02:47,441 - distributed.nanny - INFO - Worker process 37600 was killed by signal 15
2024-12-12 08:02:47,444 - distributed.nanny - WARNING - Restarting worker
2024-12-12 08:02:48,237 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:38627
2024-12-12 08:02:48,237 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:38627
2024-12-12 08:02:48,237 - distributed.worker - INFO -          dashboard at:            127.0.0.1:36669
2024-12-12 08:02:48,237 - distributed.worker - INFO - Waiting to connect to:       tcp://localhost:8786
2024-12-12 08:02:48,237 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:02:48,237 - distributed.worker - INFO -               Threads:                         35
2024-12-12 08:02:48,237 - distributed.worker - INFO -                Memory:                 160.00 GiB
2024-12-12 08:02:48,237 - distributed.worker - INFO -       Local Directory: /lpai/volumes/lmp-guan/dask-tmp/dask-scratch-space/worker-bge05ma9
2024-12-12 08:02:48,237 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:02:48,764 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-12-12 08:02:48,765 - distributed.worker - INFO -         Registered to:       tcp://localhost:8786
2024-12-12 08:02:48,765 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:02:48,765 - distributed.core - INFO - Starting established connection to tcp://localhost:8786
2024-12-12 08:03:07,511 - distributed.core - INFO - Event loop was unresponsive in Worker for 52.81s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:03:37,503 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.01s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:04:13,292 - distributed.core - INFO - Event loop was unresponsive in Worker for 35.79s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:06:27,215 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:40197 (pid=37596) exceeded 95% memory budget. Restarting...
2024-12-12 08:06:35,250 - distributed.nanny - INFO - Worker process 37596 was killed by signal 15
2024-12-12 08:06:35,253 - distributed.nanny - WARNING - Restarting worker
2024-12-12 08:06:36,041 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:37323
2024-12-12 08:06:36,042 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:37323
2024-12-12 08:06:36,042 - distributed.worker - INFO -          dashboard at:            127.0.0.1:42437
2024-12-12 08:06:36,042 - distributed.worker - INFO - Waiting to connect to:       tcp://localhost:8786
2024-12-12 08:06:36,042 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:06:36,042 - distributed.worker - INFO -               Threads:                         35
2024-12-12 08:06:36,042 - distributed.worker - INFO -                Memory:                 160.00 GiB
2024-12-12 08:06:36,042 - distributed.worker - INFO -       Local Directory: /lpai/volumes/lmp-guan/dask-tmp/dask-scratch-space/worker-csw22aqa
2024-12-12 08:06:36,042 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:06:36,594 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-12-12 08:06:36,594 - distributed.worker - INFO -         Registered to:       tcp://localhost:8786
2024-12-12 08:06:36,594 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:06:36,595 - distributed.core - INFO - Starting established connection to tcp://localhost:8786
2024-12-12 08:06:40,979 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.75s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:07:37,205 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.76s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:12:36,214 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:38627 (pid=41421) exceeded 95% memory budget. Restarting...
2024-12-12 08:12:43,946 - distributed.nanny - INFO - Worker process 41421 was killed by signal 15
2024-12-12 08:12:43,949 - distributed.nanny - WARNING - Restarting worker
2024-12-12 08:12:44,764 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:39201
2024-12-12 08:12:44,764 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:39201
2024-12-12 08:12:44,764 - distributed.worker - INFO -          dashboard at:            127.0.0.1:39927
2024-12-12 08:12:44,764 - distributed.worker - INFO - Waiting to connect to:       tcp://localhost:8786
2024-12-12 08:12:44,765 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:12:44,765 - distributed.worker - INFO -               Threads:                         35
2024-12-12 08:12:44,765 - distributed.worker - INFO -                Memory:                 160.00 GiB
2024-12-12 08:12:44,765 - distributed.worker - INFO -       Local Directory: /lpai/volumes/lmp-guan/dask-tmp/dask-scratch-space/worker-ujfq8kpc
2024-12-12 08:12:44,765 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:12:45,237 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-12-12 08:12:45,237 - distributed.worker - INFO -         Registered to:       tcp://localhost:8786
2024-12-12 08:12:45,237 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:12:45,238 - distributed.core - INFO - Starting established connection to tcp://localhost:8786
2024-12-12 08:12:49,594 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.66s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:14:14,003 - distributed.core - INFO - Event loop was unresponsive in Worker for 33.91s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:19:52,014 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:37323 (pid=41866) exceeded 95% memory budget. Restarting...
2024-12-12 08:20:00,211 - distributed.nanny - INFO - Worker process 41866 was killed by signal 15
2024-12-12 08:20:00,213 - distributed.nanny - WARNING - Restarting worker
2024-12-12 08:20:01,044 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:41803
2024-12-12 08:20:01,044 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:41803
2024-12-12 08:20:01,044 - distributed.worker - INFO -          dashboard at:            127.0.0.1:37033
2024-12-12 08:20:01,044 - distributed.worker - INFO - Waiting to connect to:       tcp://localhost:8786
2024-12-12 08:20:01,044 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:20:01,044 - distributed.worker - INFO -               Threads:                         35
2024-12-12 08:20:01,044 - distributed.worker - INFO -                Memory:                 160.00 GiB
2024-12-12 08:20:01,044 - distributed.worker - INFO -       Local Directory: /lpai/volumes/lmp-guan/dask-tmp/dask-scratch-space/worker-_4xuy8xz
2024-12-12 08:20:01,044 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:20:01,507 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-12-12 08:20:01,508 - distributed.worker - INFO -         Registered to:       tcp://localhost:8786
2024-12-12 08:20:01,508 - distributed.worker - INFO - -------------------------------------------------
2024-12-12 08:20:01,508 - distributed.core - INFO - Starting established connection to tcp://localhost:8786
2024-12-12 08:20:05,951 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.75s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:20:36,095 - distributed.core - INFO - Event loop was unresponsive in Worker for 4.20s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:22:03,164 - distributed.core - INFO - Event loop was unresponsive in Worker for 87.07s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

The terminal that starts the dask scheduler will show the following error:

root@dask-nemo-pure-text-0:/lpai# dask scheduler
2024-12-12 07:32:56,541 - distributed.scheduler - INFO - -----------------------------------------------
2024-12-12 07:32:56,859 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2024-12-12 07:32:56,911 - distributed.scheduler - INFO - State start
2024-12-12 07:32:56,926 - distributed.scheduler - INFO - -----------------------------------------------
2024-12-12 07:32:56,927 - distributed.scheduler - INFO -   Scheduler at: tcp://172.28.129.176:8786
2024-12-12 07:32:56,928 - distributed.scheduler - INFO -   dashboard at:  http://172.28.129.176:8787/status
2024-12-12 07:32:56,928 - distributed.scheduler - INFO - Registering Worker plugin shuffle
2024-12-12 07:33:03,276 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:44687', status: init, memory: 0, processing: 0>
2024-12-12 07:33:03,280 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:44687
2024-12-12 07:33:03,280 - distributed.core - INFO - Starting established connection to tcp://[::1]:53290
2024-12-12 07:33:03,290 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:40197', status: init, memory: 0, processing: 0>
2024-12-12 07:33:03,290 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:40197
2024-12-12 07:33:03,290 - distributed.core - INFO - Starting established connection to tcp://[::1]:53298
2024-12-12 07:33:12,007 - distributed.scheduler - INFO - Receive client connection: Client-576c0830-b85b-11ef-92fb-7e49b3567ca4
2024-12-12 07:33:12,007 - distributed.core - INFO - Starting established connection to tcp://127.0.0.1:51980
2024-12-12 07:33:27,532 - distributed.core - INFO - Event loop was unresponsive in Scheduler for 4.50s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2024-12-12 08:02:47,432 - distributed.core - INFO - Connection to tcp://[::1]:53290 has been closed.
2024-12-12 08:02:47,434 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:44687', status: running, memory: 0, processing: 4> (stimulus_id='handle-worker-cleanup-1733990567.434068')
2024-12-12 08:02:48,762 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:38627', status: init, memory: 0, processing: 0>
2024-12-12 08:02:48,764 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:38627
2024-12-12 08:02:48,764 - distributed.core - INFO - Starting established connection to tcp://[::1]:55716
2024-12-12 08:06:35,239 - distributed.core - INFO - Connection to tcp://[::1]:53298 has been closed.
2024-12-12 08:06:35,240 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:40197', status: running, memory: 4, processing: 4> (stimulus_id='handle-worker-cleanup-1733990795.240042')
2024-12-12 08:06:35,240 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:40197' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {('single_partition_write_with_filename-f6c1765edbf5a25f2b0fe44f721022b9', 7), ('single_partition_write_with_filename-f6c1765edbf5a25f2b0fe44f721022b9', 3), ('single_partition_write_with_filename-f6c1765edbf5a25f2b0fe44f721022b9', 5), ('single_partition_write_with_filename-f6c1765edbf5a25f2b0fe44f721022b9', 1)} (stimulus_id='handle-worker-cleanup-1733990795.240042')
2024-12-12 08:06:36,593 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:37323', status: init, memory: 0, processing: 0>
2024-12-12 08:06:36,593 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:37323
2024-12-12 08:06:36,593 - distributed.core - INFO - Starting established connection to tcp://[::1]:44604
2024-12-12 08:12:43,936 - distributed.core - INFO - Connection to tcp://[::1]:55716 has been closed.
2024-12-12 08:12:43,937 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:38627', status: running, memory: 0, processing: 8> (stimulus_id='handle-worker-cleanup-1733991163.9372256')
2024-12-12 08:12:45,236 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:39201', status: init, memory: 0, processing: 0>
2024-12-12 08:12:45,236 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:39201
2024-12-12 08:12:45,236 - distributed.core - INFO - Starting established connection to tcp://[::1]:36226
2024-12-12 08:17:21,249 - distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
2024-12-12 08:17:23,682 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2024-12-12 08:17:32,228 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2024-12-12 08:17:33,664 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2024-12-12 08:17:35,155 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2024-12-12 08:18:42,729 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2024-12-12 08:18:46,158 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2024-12-12 08:18:49,170 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2024-12-12 08:19:38,212 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
2024-12-12 08:20:00,204 - distributed.core - INFO - Connection to tcp://[::1]:44604 has been closed.
2024-12-12 08:20:00,205 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:37323', status: running, memory: 0, processing: 8> (stimulus_id='handle-worker-cleanup-1733991600.2049491')
2024-12-12 08:20:00,205 - distributed.scheduler - ERROR - Task ('single_partition_write_with_filename-f6c1765edbf5a25f2b0fe44f721022b9', 0) marked as failed because 4 workers died while trying to run it
2024-12-12 08:20:00,205 - distributed.scheduler - ERROR - Task ('single_partition_write_with_filename-f6c1765edbf5a25f2b0fe44f721022b9', 6) marked as failed because 4 workers died while trying to run it
2024-12-12 08:20:00,205 - distributed.scheduler - ERROR - Task ('single_partition_write_with_filename-f6c1765edbf5a25f2b0fe44f721022b9', 2) marked as failed because 4 workers died while trying to run it
2024-12-12 08:20:00,205 - distributed.scheduler - ERROR - Task ('single_partition_write_with_filename-f6c1765edbf5a25f2b0fe44f721022b9', 4) marked as failed because 4 workers died while trying to run it
2024-12-12 08:20:00,221 - distributed.scheduler - INFO - Remove client Client-576c0830-b85b-11ef-92fb-7e49b3567ca4
2024-12-12 08:20:00,221 - distributed.core - INFO - Received 'close-stream' from tcp://127.0.0.1:51980; closing.
2024-12-12 08:20:00,221 - distributed.scheduler - INFO - Remove client Client-576c0830-b85b-11ef-92fb-7e49b3567ca4
2024-12-12 08:20:00,222 - distributed.scheduler - INFO - Close client connection: Client-576c0830-b85b-11ef-92fb-7e49b3567ca4
2024-12-12 08:20:01,506 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:41803', status: init, memory: 0, processing: 0>
2024-12-12 08:20:01,507 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:41803
2024-12-12 08:20:01,507 - distributed.core - INFO - Starting established connection to tcp://[::1]:57212
2024-12-12 08:22:27,221 - distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)

Steps/Code to reproduce bug

import os
import time
from dask.distributed import Client, LocalCluster
import warnings
import dask.dataframe as dd
import dask_cudf
import cudf
import gzip
import json
import dask.bag as db
import glob
from dask.distributed import wait
import numpy as np

from nemo_curator import get_client
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import (
    get_num_workers,
    read_data,
    write_to_disk,
)
from nemo_curator.utils.file_utils import expand_outdir_and_mkdir
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.file_utils import reshard_jsonl
from nemo_curator.utils.file_utils import expand_outdir_and_mkdir
from helper import convert_jsonl_gz_to_json
from nemo_curator.utils.file_utils import get_all_files_paths_under, get_batched_files
from dask_cuda import LocalCUDACluster
from nemo_curator import AddId
from dask import config


def pre_imports():
    import cudf
    
def main():
    warnings.filterwarnings('ignore')
    base_dir = "/lpai"
    cpu_client = get_client(cluster_type='cpu',scheduler_address='127.0.0.1:8786',)
    print(f"Num Workers = {get_num_workers(cpu_client)}", flush=True)
    config.set({'distributed.scheduler.allowed-failures': 100})
    allowed_failures = config.get('distributed.scheduler.allowed-failures')
    print(f"Allowed failures: {allowed_failures}")
    decompress_data_dir = os.path.join(base_dir,"volumes/lmp-guan/sy/cc-main-lan-en/24-11-11-1112-parquet")
    #decompress_data_dir = os.path.join(base_dir,"volumes/lmp-guan/sy/cc-main-lan-en/24-12-10-parquet-clean")
    id_data_dir = os.path.join(base_dir,"volumes/lmp-guan/sy/11-26-dedup-test/add-id-en-dataset")
    for files in get_batched_files(decompress_data_dir, id_data_dir, "parquet", batch_size=8):
        raw_data = read_data(files, file_type="parquet", backend="pandas", add_filename=True)
        input_dataset = DocumentDataset(raw_data)
        input_dataset.df.head()
        len(input_dataset.df)
        t0 = time.time()
        # specify add_id function
        add_id = AddId(
            id_field="id",
            id_prefix="add_id",
        )
        id_dataset = add_id(input_dataset)
        id_dataset.to_json(id_data_dir, write_to_filename=True)
    print(f"Adding ID took :{time.time()-t0}")
if __name__ == '__main__':
    main()

Here is the script I used to manually perform Unicode cleaning on the dataset using ftfy:

import os
import ftfy
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import ray

def fix_text(text):
    return ftfy.fix_text(text)

@ray.remote
def process_file(file_path, output_dir, text_field='text'):
    filename = os.path.basename(file_path)
    output_file_path = os.path.join(output_dir, filename)

    parquet_writer = None

    try:
        # Open the parquet file
        reader = pq.ParquetFile(file_path)
        
        for batch_index, batch in enumerate(reader.iter_batches(batch_size=1000)):
            print(f"start Processing batch {batch_index} of file {filename}")

            df = batch.to_pandas()
            if text_field in df.columns:
                df[text_field] = df[text_field].apply(fix_text)

            table = pa.Table.from_pandas(df)
            if parquet_writer is None:
                parquet_writer = pq.ParquetWriter(output_file_path, table.schema)
            
            parquet_writer.write_table(table)
            print(f"finished Processing batch {batch_index} of file {filename}")
    finally:
        if parquet_writer is not None:
            parquet_writer.close()

def clean_data_files_parallel(input_dir, output_dir, text_field='text'):
    os.makedirs(output_dir, exist_ok=True)

    futures = [
        process_file.remote(os.path.join(input_dir, filename), output_dir, text_field)
        for filename in os.listdir(input_dir)
        if filename.endswith('.parquet')
    ]
    
    ray.get(futures)

if __name__ == "__main__":
    ray.init()

    input_dir = "/lpai/volumes/lmp-guan/sy/cc-main-lan-en/24-11-11-1112-parquet"
    output_dir = "/lpai/volumes/lmp-guan/sy/cc-main-lan-en/24-12-10-parquet-clean"
    clean_data_files_parallel(input_dir, output_dir, text_field='text')

    ray.shutdown()

Expected behavior

Avoid using ftfy to do the text cleaning by myself before read_data operation

Environment overview

two cpu workers

RickyShi46 avatar Jan 16 '25 03:01 RickyShi46