data
data copied to clipboard
CacheHolder buffer size limited to 1000
🐛 Describe the bug
The .on_disk_cache()
data pipe uses .demux()
under the hood with a default buffer_size
of 1000.
Unfortunately this appears to break when the source datapipe has more than 1000 elements. See https://github.com/pytorch/data/blob/v0.4.1/torchdata/datapipes/iter/util/cacheholder.py#L251-L260
For example, to reproduce something like this should show the problem:
import tempfile
temp_dir = tempfile.gettempdir()
caching_datapipe = datapipes.iter.IterableWrapper(
range(1001)
)
caching_datapipe = caching_datapipe.on_disk_cache(
filepath_fn=lambda i: f"{temp_dir}/{i}.tmp"
)
# caching_datapipe = caching_datapipe.map(fn=_get_from_http) # (not needed for reproducing)
caching_datapipe = caching_datapipe.end_caching(
filepath_fn=lambda i: f"{temp_dir}/{i}.tmp"
)
for element in caching_datapipe:
print(element)
The stack trace ends with:
line 357, in _find_next
raise BufferError(
BufferError: DemultiplexerIterDataPipe buffer overflow, buffer size 1000 is insufficient.
This exception is thrown by __iter__ of MapperIterDataPipe(datapipe=_ChildDataPipe, fn=<lambda>, input_col=None, output_col=None)
(I haven't been able to find a suitable work-around for this since the demux is hidden inside the implementation of OnDiskCacheHolder
. Any ideas/advice for temporarily working around the issue would also be highly appreciated)
Versions
Collecting environment information...
PyTorch version: 1.12.1+cu116
Is debug build: False
CUDA used to build PyTorch: 11.6
ROCM used to build PyTorch: N/A
OS: NixOS 22.05 (Quokka) (x86_64)
GCC version: (GCC) 11.3.0
Clang version: Could not collect
CMake version: Could not collect
Libc version: glibc-2.35
Python version: 3.10.6 (main, Aug 1 2022, 20:38:21) [GCC 11.3.0] (64-bit runtime)
Python platform: Linux-5.18.19-x86_64-with-glibc2.35
Is CUDA available: True
CUDA runtime version: Could not collect
GPU models and configuration: GPU 0: NVIDIA GeForce RTX 3060
Nvidia driver version: 515.48.07
cuDNN version: Could not collect
HIP runtime version: N/A
MIOpen runtime version: N/A
Is XNNPACK available: True
Versions of relevant libraries:
[pip3] mypy==0.971
[pip3] mypy-extensions==0.4.3
[pip3] numpy==1.23.1
[pip3] torch==1.12.1+cu116
[pip3] torchdata==0.4.1
[pip3] torchvision==0.13.1+cu116
[conda] Could not collect
Thank you for opening the issue. The reason that demux
's buffer blows up because we will yield cached data first then todo
. See: https://github.com/pytorch/data/blob/983e87ada583b7a58d13a1a5f047dd9d256155dd/torchdata/datapipes/iter/util/cacheholder.py#L424
However, it seems weird to me to change the order to yield data from todo
then cached
. A proper solution might be adding an argument to end_caching
for buffer_size
.
Personally I think I would expect caching to be FIFO with respect to the source datapipe, in order to be as deterministic as possible.
In other words, if the on-disk cache can cause data in the pipe to be re-ordered as a side-effect, then surely this harms reproducibility. (I.e. a previous run can affect the outcome of the current run)
(Instead of demux
, maybe a sum type is needed to properly retain the original ordering of the source datapipe? EDIT: I suppose this would require some kind of filtered "view" of the pipe to maintain the current API...)
Personally I think I would expect caching to be FIFO with respect to the source datapipe, in order to be as deterministic as possible.
Agree. Will take a look later today
Thank you, I appreciate the support!
Did a quick research and it looks doable by adding a DataPipe to join two DataPipe with a key_fn
should do the job. And, it does require some changes in CacheHolder. And, I need to discuss with other users about the usage of such kind of DataPipe to consolidate the API.
I just ran into this issue. Setting a larger buffer_size fixes it for me atm. Is there a better way to solve this currently?
Caching files from aws using the S3 loader. I get the same buffer issue. I use a patch function from fastcore
to fix the issue temporarily.
@patch
def _end_caching(self:dp.iter.OnDiskCacheHolder):
filepath_fn, hash_dict, hash_type, extra_check_fn = dp.iter.OnDiskCacheHolder._temp_dict.pop(self)
todo_dp, cached_dp = self.source_datapipe.demux(
2,
partial(
dp.iter.OnDiskCacheHolder._cache_check_fn,
filepath_fn=filepath_fn,
hash_dict=hash_dict,
hash_type=hash_type,
extra_check_fn=extra_check_fn,
),
buffer_size = -1
)
# Cached: keep filepath(s)
cached_dp = cached_dp.map(fn=filepath_fn)
# Convert list back to single elements
cached_dp = cached_dp.unbatch(-1)
self.source_datapipe = todo_dp
self._end_caching_flag = True
return cached_dp
Alternatively, i noticed I can do:
pipe = dp.iter.S3FileLister(pipe).header(100)
pipe = dp.iter.OnDiskCacheHolder(pipe,filepath_fn=save_path)
pipe = dp.iter.S3FileLoader(pipe)
pipe = dp.iter.EndOnDiskCacheHolder(pipe,mode="wb",filepath_fn=save_path)
And it will progressively download more files every epoch, but the first way is probably way better.