streaming icon indicating copy to clipboard operation
streaming copied to clipboard

Augment existing dataset

Open LWprogramming opened this issue 1 year ago • 3 comments

🚀 Feature Request

Suppose we create a dataset

compression = 'zstd'
container_name = "foo"
folder = "bar
remote = f'azure://{container_name}/{folder}'
columns = {
    "id": "int",
    "value": "str",
}
with MDSWriter(out=remote, columns=columns, compression=compression, size_limit=1024*1024*64) as out:
    for i in range(100):
        # make each sample take 1 MB of space, value should be a string of 1M randomly generated alphanumeric characters
        value = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(1024*1024))
        sample = {
            "id": i,
            "value": value,
        }
        out.write(sample)
# expect 2 shards

Then later create a second MDSWriter to write data points 101-200 similarly, the second MDSWriter overwrites the existing shards. The preferable thing would be to continue writing new shards as though we had looped through 0-200 originally.

Motivation

Cleaned data comes in piecemeal and it would be nice to be able to just continue augmenting the existing cleaned dataset that's already been turned into a StreamingDataset format. Not sure if this would be particularly tricky or easy to do, or if it already exists and I'm missing a flag somewhere.

LWprogramming avatar Apr 03 '24 19:04 LWprogramming

One possibility that might work if a single data item never gets split across multiple shards is to search the existing folder/cloud storage directory for shard names, pulling the existing shard down to a temporary folder, and pick up where we left off using the index.json in that directory.

Alternatively (less efficient but maybe easier to work with) is to just start a new shard (e.g. if there are shards 0.mds.zstd through 17.mds.zstd, just create 18.mds.zstd when opening the second MDSWriter). These approaches seem plausible for Azure at least, I'm not super familiar with all the different types of uploaders in streaming/base/storage/upload.py, or with all the details of the process of actually converting the sample dict into something that goes into the mds file.

LWprogramming avatar Apr 03 '24 19:04 LWprogramming

@LWprogramming You can also start writing shard files to a different directory and use the merge_index function to combine the index files from multiple directories! But to your point, starting from where you left off for shard writing would be nice. We can look into it in upcoming planning.

snarayan21 avatar Apr 09 '24 01:04 snarayan21

Cool! Out of curiosity, when might we use something like merge_index vs. just splitting the dataset into a bunch of pieces, uploading each piece to a separate folder, and then creating a stream for each folder e.g.

from streaming import StreamingDataset, Stream
locals = [
    "/foo/bar1",
    "/foo/bar2"
]
remotes = [
    f"azure://foo/bar1",
    f"azure://foo/bar2",
]
streams = [
    Stream(local=local, remote=remote) for local, remote in zip(locals, remotes)
]
ds = StreamingDataset(streams=streams, shuffle=False)

Is the main difference in what shuffling algorithms we can use? It looks like even with multiple streams, it's possible to do dataset shuffling.

LWprogramming avatar Apr 11 '24 18:04 LWprogramming