`optimize()` with `num_workers > 1` leads to deletion issues
🐛 Bug
In the LitData tests, we only ever call optimize() with num_workers=1. In the PR #237 I found that if optimize is called with more workers, then we get a race condition (??) causing some chunks to be deleted and then streaming fails.
https://github.com/Lightning-AI/litdata/pull/237#discussion_r1684570860
This happens in this test: https://github.com/Lightning-AI/litdata/blob/c58b67346a3be22de26679fb6788f38894c47cd1/tests/streaming/test_dataset.py#L826 (see ToDo comments).
The test fails with
__________________ test_dataset_resume_on_future_chunks[True] __________________
shuffle = True
tmpdir = local('/tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0')
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x7f6a4124f460>
@pytest.mark.skipif(sys.platform == "win32", reason="Not tested on windows and MacOs")
@mock.patch.dict(os.environ, {}, clear=True)
@pytest.mark.timeout(60)
@pytest.mark.parametrize("shuffle", [True, False])
def test_dataset_resume_on_future_chunks(shuffle, tmpdir, monkeypatch):
"""This test is constructed to test resuming from a chunk past the first chunk, when subsequent chunks don't have
the same size."""
s3_cache_dir = str(tmpdir / "s3cache")
optimize_data_cache_dir = str(tmpdir / "optimize_data_cache")
optimize_cache_dir = str(tmpdir / "optimize_cache")
data_dir = str(tmpdir / "optimized")
monkeypatch.setenv("DATA_OPTIMIZER_DATA_CACHE_FOLDER", optimize_data_cache_dir)
monkeypatch.setenv("DATA_OPTIMIZER_CACHE_FOLDER", optimize_cache_dir)
> optimize(
fn=_simple_preprocess,
inputs=list(range(8)),
output_dir=data_dir,
chunk_size=190,
num_workers=4,
num_uploaders=1,
copying /tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0/optimize_cache/chunk-3-1.bin to /tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0/optimized/chunk-3-1.bin
putting /tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0/optimize_cache/chunk-3-1.bin on the remove queue
Worker 1 is done.
Worker 2 is done.
Worker 3 is done.
Worker 0 is done.
Workers are finished.
----------------------------- Captured stderr call -----------------------------
Progress: 0%| | 0/8 [00:00<?, ?it/s]Process Process-85:1:
Traceback (most recent call last):
File "/opt/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/opt/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 259, in _upload_fn
shutil.copy(local_filepath, output_filepath)
File "/opt/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/shutil.py", line 427, in copy
copyfile(src, dst, follow_symlinks=follow_symlinks)
File "/opt/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/shutil.py", line 264, in copyfile
with open(src, 'rb') as fsrc:
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/pytest-of-runner/pytest-0/test_dataset_resume_on_future_0/optimize_cache/chunk-0-0.bin'
Progress: 100%|██████████| 8/8 [00:00<00:00, 122.77it/s]
=========================== short test summary info ============================
FAILED tests/streaming/test_dataset.py::test_dataset_resume_on_future_chunks[True] - RuntimeError: All the chunks should have been deleted. Found ['chunk-0-1.bin']
====== 1 failed, 191 passed, 8 skipped, 11 warnings in [247](https://github.com/Lightning-AI/litdata/actions/runs/10010459328/job/27671682379?pr=237#step:10:248).94s (0:04:07) =======
when setting optimize(num_workers=4). This needs to be investigated. However, not possible so far to reproduce locally (only observed in CI)!
Some more evidence in another (more rare flaky) test that uses num_workers=2:
https://github.com/Lightning-AI/litdata/actions/runs/10013150667/job/27680138130
_______ test_dataset_for_text_tokens_distributed_num_workers_end_to_end ________
tmpdir = local('/private/var/folders/80/7d6cd1d13n9by5z92zm5q2mr0000gn/T/pytest-of-runner/pytest-0/test_dataset_for_text_tokens_d1')
monkeypatch = <_pytest.monkeypatch.MonkeyPatch object at 0x121e83bb0>
def test_dataset_for_text_tokens_distributed_num_workers_end_to_end(tmpdir, monkeypatch):
monkeypatch.setattr(functions, "_get_input_dir", lambda x: str(tmpdir))
seed_everything(42)
with open(tmpdir / "a.txt", "w") as f:
f.write("hello")
inputs = [(v, str(tmpdir / "a.txt")) for v in range(0, 200, 20)]
cache_dir = os.path.join(tmpdir, "cache")
output_dir = os.path.join(tmpdir, "target_dir")
os.makedirs(output_dir, exist_ok=True)
monkeypatch.setenv("DATA_OPTIMIZER_CACHE_FOLDER", cache_dir)
monkeypatch.setenv("DATA_OPTIMIZER_DATA_CACHE_FOLDER", cache_dir)
> functions.optimize(
optimize_fn, inputs, output_dir=str(tmpdir), num_workers=2, chunk_size=2, reorder_files=False, num_downloaders=1
)
tests/streaming/test_dataset.py:596:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
src/litdata/processing/functions.py:432: in optimize
data_processor.run(
src/litdata/processing/data_processor.py:1055: in run
self._exit_on_error(error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <litdata.processing.data_processor.DataProcessor object at 0x121edb6a0>
error = 'Traceback (most recent call last):\n File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor....te/var/folders/80/7d6cd1d13n9by5z92zm5q2mr0000gn/T/pytest-of-runner/pytest-0/test_dataset_for_text_tokens_d1/cache\'\n'
def _exit_on_error(self, error: str) -> None:
for w in self.workers:
# w.join(0)
w.terminate() # already error has occurred. So, no benefit of processing further.
> raise RuntimeError(f"We found the following error {error}.")
E RuntimeError: We found the following error Traceback (most recent call last):
E File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 435, in run
E self._setup()
E File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 444, in _setup
E self._create_cache()
E File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 511, in _create_cache
E os.makedirs(self.cache_data_dir, exist_ok=True)
E File "/Users/runner/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/os.py", line 225, in makedirs
E mkdir(name, mode)
E FileExistsError: [Errno 17] File exists: '/private/var/folders/80/7d6cd1d13n9by5z92zm5q2mr0000gn/T/pytest-of-runner/pytest-0/test_dataset_for_text_tokens_d1/cache'
E .
src/litdata/processing/data_processor.py:1119: RuntimeError
----------------------------- Captured stdout call -----------------------------
Create an account on https://lightning.ai/ to optimize your data faster using multiple nodes and large machines.
Storing the files under /private/var/folders/80/7d6cd1d13n9by5z92zm5q2mr0000gn/T/pytest-of-runner/pytest-0/test_dataset_for_text_tokens_d1
Setup started with fast_dev_run=False.
Setup finished in 0.001 seconds. Found 10 items to process.
Starting 2 workers with 10 items. The progress bar is only updated when a worker finishes.
Workers are ready ! Starting data processing...
Worker 1 is done.
----------------------------- Captured stderr call -----------------------------
Progress: 0%| | 0/10 [00:00<?, ?it/s]
=========================== short test summary info ============================
FAILED tests/streaming/test_dataset.py::test_dataset_for_text_tokens_distributed_num_workers_end_to_end - RuntimeError: We found the following error Traceback (most recent call last):
File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 435, in run
self._setup()
File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 444, in _setup
self._create_cache()
File "/Users/runner/work/litdata/litdata/src/litdata/processing/data_processor.py", line 511, in _create_cache
os.makedirs(self.cache_data_dir, exist_ok=True)
File "/Users/runner/hostedtoolcache/Python/3.9.19/x64/lib/python3.9/os.py", line 225, in makedirs
mkdir(name, mode)
FileExistsError: [Errno 17] File exists: '/private/var/folders/80/7d6cd1d13n9by5z92zm5q2mr0000gn/T/pytest-of-runner/pytest-0/test_dataset_for_text_tokens_d1/cache'
.
Hi, I also met a similar issue with num_workers > 1. How could we resolve it?
I also found the issue happens when num_workers = 1. For now I can only run with num_workers = 0.
I'm also experiencing this issue
[Errno 2] No such file or directory: '/tmp/[...]/chunk-0-0.bin'
This error goes away when I set my num_workers = 1
It seems I can bypass this error with
os.environ["DATA_OPTIMIZER_CACHE_FOLDER"] = f"/tmp/{__name__}"
For the second issue related to test_dataset_for_text_tokens_distributed_num_workers_end_to_end:
I think, this is only a weird bug.
We are using os.makedirs(SOME_PATH, exist_ok=True), so even if the file exists, it shouldn't raise an error. But, sometimes, it raises error.
I don't think it has anything to do with num_workers.
I faced this issue couple of times, and from what I remember, it only used to fail in macos. So, I added couple of if os.path.exists(): conditions before calling make_dirs. It is present in number of files.
Same issue on my Ubuntu 16 server with num_workers=16. It doesn't always happen, and one way to solve it is to just rerun the code.
psesudo code:
from PIL import Image
import os
import litdata as ld
def process_patch(input_data):
img_patch, mask_patch, color2label = input_data
img_patch = img_patch.convert("RGB")
mask_patch = mask_patch.convert("RGB")
w, h = mask_patch.size
pixel = mask_patch.getpixel((w//2, h//2))
label_text = color2label.get(pixel, "BG")
if label_text == "BG": return None
label = list(color2label.keys()).index(pixel)
return (img_patch, label)
for slide_id in slide_ids:
img_path = slide_id + "_HE.jpg"
mask_path = slide_id + "_mask.jpg"
img = Image.open(img_path)
mask = Image.open(mask_path)
img_patches = split_image_into_patches(img, patch_size, stride_size)
mask_patches = split_image_into_patches(mask, patch_size, stride_size)
input_data = [(img, mask, color2label) for img, mask in zip(img_patches, mask_patches)]
ld.optimize(
fn=process_patch,
inputs=input_data,
output_dir=os.path.join(patch_dir, slide_id),
num_workers=min(os.cpu_count(), 16),
mode='overwrite',
compression="zstd",
chunk_bytes="64MB"
)