tonic icon indicating copy to clipboard operation
tonic copied to clipboard

Using ZipArchiveLoader from torchdata

Open fabrizio-ottati opened this issue 1 year ago • 7 comments

More than a PR, this is to show how to use torchdata archive utilities and to discuss next steps. I would like to add some other things before merging it.

I used ZipArchiveLoader, which allow to cycle through the files inside the ZIP archive. Does it make any sense to you, @biphasic? See this line from torchvision and this dataset class.

Edit: I tried to implement an STMNIST class inspired by torchvision new datapipe implementation. We can talk about it if you want, it's pretty complicated.

This is an usage example.

from torchdata.datapipes.iter import (
    ZipArchiveLoader,
    UnZipper,
    FileLister, 
    FileOpener,
    Filter
)
from scipy.io import loadmat

filepath = "./STMNIST.zip"

def is_mat(x):
    return x[0].endswith(".mat") and "LUT" not in x[0]

dp = FileLister(filepath)
dp = FileOpener(dp, mode="b")
dp = ZipArchiveLoader(dp).filter(is_mat)
dp_fpath, dp_data = UnZipper(dp, sequence_length=2)
iter_dp = iter(dp)
fpath, stream = next(iter_dp)
fpath, stream

('STMNIST.zip/data_submission/9/9_ch0_622_spiketrain.mat',
 StreamWrapper<<zipfile.ZipExtFile name='data_submission/9/9_ch0_622_spiketrain.mat' mode='r' compress_type=deflate>>)
mat = loadmat(stream)
a = mat['spiketrain']
a
array([[0.        , 0.        , 0.        , ..., 0.        , 0.        ,
        0.        ],
       [0.        , 0.        , 0.        , ..., 0.        , 0.        ,
        0.        ],
       [0.        , 0.        , 0.        , ..., 0.        , 0.        ,
        0.        ],
       ...,
       [0.        , 0.        , 0.        , ..., 0.        , 0.        ,
        0.        ],
       [0.        , 0.        , 0.        , ..., 0.        , 0.        ,
        0.        ],
       [0.07939189, 0.0793919 , 0.07939192, ..., 1.99187338, 1.99187339,
        1.99187341]])

fabrizio-ottati avatar Aug 28 '22 17:08 fabrizio-ottati

Codecov Report

Base: 78.21% // Head: 78.23% // Increases project coverage by +0.01% :tada:

Coverage data is based on head (2bc3441) compared to base (4ee0da1). Patch has no changes to coverable lines.

Additional details and impacted files
@@             Coverage Diff             @@
##           develop     #216      +/-   ##
===========================================
+ Coverage    78.21%   78.23%   +0.01%     
===========================================
  Files           42       42              
  Lines         2355     2357       +2     
===========================================
+ Hits          1842     1844       +2     
  Misses         513      513              
Impacted Files Coverage Δ
tonic/cached_dataset.py 88.78% <0.00%> (+0.21%) :arrow_up:

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

:umbrella: View full report at Codecov.
:loudspeaker: Do you have feedback about the report comment? Let us know in this issue.

codecov-commenter avatar Aug 28 '22 17:08 codecov-commenter

Well, actually there seems to be no slowdown if the decoding function is included in the pipe... If you remove the encoding step, the ratio zipped/unzipped load times is ~6.

from torchdata.datapipes.iter import (
    ZipArchiveLoader,
    UnZipper,
    FileLister, 
    FileOpener,
    Filter
)
from scipy.io import loadmat
import numpy as np
import timeit

def is_mat_tuple(x):
    return x[0].endswith(".mat") and "LUT" not in x[0]

def is_mat(x):
    return x.endswith(".mat") and "LUT" not in x

def spiketrain_to_array(matfile):
    # Transposing since the order is (address, event),
    # but we like (event, address).
    mat = loadmat(matfile)
    spiketrain = mat["spiketrain"].T
    # Separating coordinates and timestamps.
    spikes, timestamps = spiketrain[:, :-1], spiketrain[:, -1]
    # Getting events addresses.
    # First entry -> Event number.
    # Second entry -> Event address in [0,100).
    events_nums, events_addrs = spikes.nonzero()
    # Mapping addresses to 2D coordinates.
    # The mapping is (x%address, y//address), from the paper.
    events = np.zeros((len(events_nums)), dtype=dtype)
    events["x"] = events_addrs % sensor_size[0]
    events["y"] = events_addrs // sensor_size[1]
    # Converting floating point seconds to integer microseconds.
    events["t"] = (timestamps[events_nums] * 1e6).astype(int)
    # Converting -1 polarities to 0.
    events["p"] = np.max(spikes[(events_nums, events_addrs)], 0).astype(int)
    return events

###########################################################################

zip_path = "./STMNIST.zip"
datapath = "./data_submission"
sensor_size = (10, 10, 2)
dtype = np.dtype([("x", int), ("y", int), ("t", int), ("p", int)])

# Zipped.
zip_dp = FileLister(zip_path)
zip_dp = FileOpener(zip_dp, mode="b")
zip_dp = ZipArchiveLoader(zip_dp).filter(is_mat_tuple)
_, zip_dp = UnZipper(zip_dp, sequence_length=2)
zip_dp = zip_dp.map(spiketrain_to_array)
iter_zip_dp = iter(zip_dp)

# Unzipped.
dp = FileLister(datapath, recursive=True).filter(is_mat)
dp = dp.map(spiketrain_to_array)
iter_dp = iter(dp)

fn = lambda: next(iter_dp)
zip_fn = lambda: next(iter_zip_dp)

avg_time = timeit.Timer(fn).timeit(number=1000)
print(f"Average load time for simple datapipe: {avg_time:.3f} ms")
zip_avg_time = timeit.Timer(zip_fn).timeit(number=1000)
print(f"Average load time for zipped datapipe: {zip_avg_time:.3f} ms")
print(f"Ratio between load times (ZIP v.s. unzipped): {zip_avg_time/avg_time:.2f}x.")
Average load time for simple datapipe: 3.624 ms
Average load time for zipped datapipe: 3.428 ms
Ratio between load times (ZIP v.s. unzipped): 0.95x.

fabrizio-ottati avatar Aug 28 '22 18:08 fabrizio-ottati

should I merge?

biphasic avatar Aug 30 '22 12:08 biphasic

I just have one doubt. I benchmarked over 1000 samples the read speed. image Right now, we are using the archive loaders from torchdata to iterate through the dataset. With respect to the original Tonic datasets, hence, we perform no decompression. In your opinion, should we provide the option to decompress the archive and use the decompressed file? For NMNIST there is a 5x speedup.

At the same time, once the tensors are cached to the GPU or somewhere else, who cares? :laughing: I wanted to know your opinion about this, to see if it was worth to spend time adding a decompress argument to the dataset class.

fabrizio-ottati avatar Aug 30 '22 12:08 fabrizio-ottati

oh I see, so for some datasets we still needs to decompress the data. I think that makes sense and a 5x speedup is definitely worth a little bit of extra disk space. You're right that once tensors are cached it doesn't matter so much, but it still matters for the first epoch (during caching) and for people who work directly with the events instead of tensors (and do not use caching at all). I think that the default setting for decompression should be True for datasets like NMNIST, CIFAR10DVS, DVS Gesture and the like because they're not that huge at the end of the day and fast loading is what we want to achieve!

Recently I re-worked the DSEC dataset a bit and it's a quite large one. So to save some disk space, I just delete the zip file after extraction by default, see here: https://github.com/neuromorphs/tonic/blob/develop/tonic/datasets/dsec.py#L344 So my answer is: If it's (quite a bit) faster to load the decompressed files, we should do that. If the dataset is huge, we can think of deleting zips after extraction or providing a decompression=False flag

biphasic avatar Aug 30 '22 13:08 biphasic

I think you are absolutely right. Then, let's decompress the data by default. I want to understand if this can be accomplished trough torchdata utilities, so that we have to maintain only Tonic-related code and we can rely on torchdata for the most basic tasks. There must be a way since all the datasets in torchvision.prototype do not decompress the data in advance but use the archive loaders from torchdata. I'll investigate a little bit :laughing:

fabrizio-ottati avatar Aug 30 '22 13:08 fabrizio-ottati

thanks a lot @fabhertz95 !

biphasic avatar Aug 30 '22 13:08 biphasic

I managed to use datapipes for the decompression (no code to maintain for us, yeeee :laughing:) but between the old and new dataset, on NMNIST, there is a 30% slowdown in the read performance. For STMNIST, for instance, it makes no difference to keep the directory compressed or not in terms of speed.

image

fabrizio-ottati avatar Sep 02 '22 17:09 fabrizio-ottati

@fabhertz95 this starts to look excellent 👌 the readme is great and I'm pleased to read that we can make use of lots of torchdata code. Can you explain to me why the new version of nmnist is still slower? Is it that torchdata's decompression dp is releasing files iteratively and therefore slowing things down? Apologies as I'm only reading all this on my phone these days

biphasic avatar Sep 03 '22 12:09 biphasic

I onestly do not know :disappointed: If keep_compressed=False, the dataset in uncompressed in a dedicated folder at object creation, in the following way:

def _uncompress(self, dp):
        dp = FileLister(path_to_zip)
        dp = FileOpener(dp, mode="rb")
        # Iterating through the ZIP archive.
        dp = ZipArchiveLoader(dp)
        # Saving the file to disk. This code is not completely correct, but it lets you understand what happens.
        dp = Saver(dp, filepath_fn=fn_that_provides_path_to_destination_folder)
        # Then we iterate through the whole dataset to performs uncompression and saving with this datapipe. 
        for x in dp:
            pass
        # Now we return to the class the datapipe that iterates through the uncompressed files.
        dp = FileLister(path_to_uncompressed_folder)
        dp = FileOpener(dp, mode="rb")
        return dp

Hence, the dataset receives a datapipe that iterates through normal files in a folder. Using the profiler, it turns out that the problem is the overhead introduced by the datapipe. Like in FileLister, which performs substantially a for loop with os.walk, which is implemented directly in the old dataset. The advantage with datapipes, at this point, seems to be that we have to write much less code (no unzipping routine, file listing and so on) and we do not have to maintain code not related to Tonic itself.

Edit: I ran some more extensive benchmarking and these are the results. I iterate 10 times over the train portion of the dataset for each dataset version and I get the average sample load time. The overhead seems to be strongly reduced but maybe I am messing something up.

from tonic.prototype.datasets.nmnist import NMNIST as NMNIST_new                                                                                                                                                   
from tonic.datasets import NMNIST
from tonic.transforms import ToFrame

t = ToFrame(NMNIST.sensor_size, time_window=10000)

import timeit
print("="*50+"\nSimple read:")

ds = NMNIST(save_to="./data")
ds_new = NMNIST_new(root="./data", keep_compressed=True)
ds_new_uncmp = NMNIST_new(root="./data")

def lambda_ds():
    for x in ds: 
        sample = x

def lambda_ds_new() : 
    for x in ds_new:
        sample = x

def lambda_ds_new_uncmp():
    for x in ds_new_uncmp:
        sample = x

avg = timeit.Timer(lambda_ds).timeit(10) / 600
print(f"\told dataset: {avg:.3f} ms.")

avg_new = timeit.Timer(lambda_ds_new).timeit(10) / 600
print(f"\tnew dataset (compressed): {avg_new:.3f} ms.")

avg_new_uncmp = timeit.Timer(lambda_ds_new_uncmp).timeit(10) / 600
print(f"\tnew dataset (uncompressed): {avg_new_uncmp:.3f} ms.")

print(f"Slowdown:\n\tnew compressed v.s. old: {avg_new/avg:.2f}x.\n\tnew compressed v.s. new uncompressed: {avg_new/avg_new_uncmp:.2f}x.\n\tnew uncompressed v.s. old: {avg_new_uncmp/avg:.2f}x.")

==================================================
Simple read:
	old dataset: 0.117 ms.
	new dataset (compressed): 0.276 ms.
	new dataset (uncompressed): 0.129 ms.
Slowdown:
	new compressed v.s. old: 2.37x.
	new compressed v.s. new uncompressed: 2.14x.
	new uncompressed v.s. old: 1.11x.
print("="*50+"\nAdding the transform ToFrame:")

ds = NMNIST(save_to="./data", transform=t)
ds_new = NMNIST_new(root="./data", keep_compressed=True, transform=t)
ds_new_uncmp = NMNIST_new(root="./data", transform=t)

avg = timeit.Timer(lambda_ds).timeit(10) / 600
print(f"\told dataset: {avg:.3f} ms.")

avg_new = timeit.Timer(lambda_ds_new).timeit(10) / 600
print(f"\tnew dataset (compressed): {avg_new:.3f} ms.")

avg_new_uncmp = timeit.Timer(lambda_ds_new_uncmp).timeit(10) / 600
print(f"\tnew dataset (uncompressed): {avg_new_uncmp:.3f} ms.")

print(f"Slowdown:\n\tnew compressed v.s. old: {avg_new/avg:.2f}x.\n\tnew compressed v.s. new uncompressed: {avg_new/avg_new_uncmp:.2f}x.\n\tnew uncompressed v.s. old: {avg_new_uncmp/avg:.2f}x.")
==================================================
Adding the transform ToFrame:
	old dataset: 0.593 ms.
	new dataset (compressed): 0.766 ms.
	new dataset (uncompressed): 0.619 ms.
Slowdown:
	new compressed v.s. old: 1.29x.
	new compressed v.s. new uncompressed: 1.24x.
	new uncompressed v.s. old: 1.04x.

fabrizio-ottati avatar Sep 03 '22 16:09 fabrizio-ottati

A performance hit of 11% is worth it. This is great stuff @fabhertz95 . Should I merge?

biphasic avatar Sep 03 '22 17:09 biphasic

I think the code is decent now. Let's merge like there is no tomorrow :laughing:

fabrizio-ottati avatar Sep 03 '22 17:09 fabrizio-ottati

One more thing, is there a reason we're not using this anymore https://github.com/neuromorphs/tonic/blob/beef9808639693762893b9f17106a6d9ea61bc74/tonic/io.py#L157

biphasic avatar Sep 03 '22 17:09 biphasic

Like in the old version

biphasic avatar Sep 03 '22 17:09 biphasic

One more thing, is there a reason we're not using this anymore

https://github.com/neuromorphs/tonic/blob/beef9808639693762893b9f17106a6d9ea61bc74/tonic/io.py#L157

Yes, the reason is that read_mnist_file wants the file in input, while the datapipe provides a binary stream. In fact, if you look here:If you prefer, when not working in compressed mode, I can add an if to use read_mnist instead of

https://github.com/fabhertz95/tonic/blob/2bc344152088a5153a3db54167aa35a403bc1f6e/tonic/prototype/datasets/nmnist.py#L47

you can see that I read the already opened binary file using np.frombuffer().

In the latest commit, with uncompressed archives I use read_mnist_file, by removing the FileOpener datapipe and propagating keep_compressed to the NMNISTFileReader class. Does it sound good?

fabrizio-ottati avatar Sep 03 '22 18:09 fabrizio-ottati

Sounds good 👍 merging now like there is no tomorrow

biphasic avatar Sep 03 '22 18:09 biphasic