ffcv icon indicating copy to clipboard operation
ffcv copied to clipboard

Support for RL workloads

Open GuillaumeLeclerc opened this issue 4 years ago • 7 comments

After receiving request from multiple I'm creating this issue to track the progress on the support for RL workloads within FFCV.

The main current need right now is:

  • Allow sharing of data between multiple samples (as a given training sample is usually characterized as a subset of a sequence shared among multiple other samples).

It seems that it should be possible to do that with a custom field but I'm currently coming up with an example and tests to make sure that this use case is fully supported.

I could use some volunteers to update the documentation though

GuillaumeLeclerc avatar Jan 22 '22 02:01 GuillaumeLeclerc

Here is an example I made using FFCV custom fields where there is a piece of data (episode) that is shared between multiple samples. I hope it helps some people out there.

Dataset Code to import
import numpy as np
from ffcv import Loader
from ffcv.writer import DatasetWriter
from ffcv.fields import NDArrayField, FloatField
from ffcv.fields.decoders import NDArrayDecoder, FloatDecoder
from ffcv.transforms import ToTensor
from ffcv.pipeline.operation import Operation


# Mock Dataset
class DemoDataset:
    
    def __init__(self, num_episodes, num_samples_per_episode,
                 frame_dim=(4, 4),
                 frames_before=2,
                 frames_after=1):
        self.data = np.random.randn(num_episodes, num_samples_per_episode, *frame_dim)
        self.frames_before = frames_before
        self.frames_after = frames_after
        self.num_episodes = num_episodes
        self.shape = frame_dim
        self.valid_samples_per_episode = self.data.shape[1] - self.frames_before - self.frames_after
        self.frames_per_sample = frames_before + frames_after + 1
        
    def __len__(self):
        return self.valid_samples_per_episode * self.num_episodes
    
    def __getitem__(self, ix):
        frame_ix = ix % self.valid_samples_per_episode
        episode_ix = ix // self.valid_samples_per_episode
        episode = self.data[episode_ix]
        frame_start = frame_ix
        frame_end = frame_ix + self.frames_before + self.frames_after + 1
        
        return ((frame_start, frame_end, episode_ix, episode),)
# We instantiate the dataset 
dataset = DemoDataset(10, 15, frames_after=2) 
The code for the custom Field
from typing import Callable, TYPE_CHECKING, Tuple, Type
import json
from dataclasses import replace

import numpy as np

from ffcv.fields.base import Field, ARG_TYPE
from ffcv.pipeline.operation import Operation
from ffcv.pipeline.state import State
from ffcv.pipeline.compiler import Compiler
from ffcv.pipeline.allocation_query import AllocationQuery
from ffcv.libffcv import memcpy

if TYPE_CHECKING:
    from ..memory_managers.base import MemoryManager


NDArrayArgsType = np.dtype([
    ('shape', '<u8', 32),  # 32 is the max number of dimensions for numpy
    ('type_length', '<u8'),  # length of the dtype description
    ('frames_per_sample', '<u8'),
])

RecordType = np.dtype([
    ('episode_ptr', '<u8'),  # We store the address of the content of the episode
    ('frame_start', '<u4'),  # This is where the sequence start in the episode
    ('frame_end', '<u4'),  # This is where the sequence ends in the episode
])

class CustomFieldDecoder(Operation):
    def declare_state_and_memory(self, previous_state: State) -> Tuple[State, AllocationQuery]:
        my_shape = (self.field.frames_per_sample, *self.field.shape)
        return (replace(previous_state, jit_mode=True, shape=my_shape, dtype=self.field.dtype),
                AllocationQuery(shape=my_shape, dtype=self.field.dtype))
        
    def generate_code(self) -> Callable:
        
        mem_read = self.memory_read
        my_range = Compiler.get_iterator()
        my_memcpy = Compiler.compile(memcpy)
        s = self.field.shape
        d = self.field.dtype
        
        def decode(batch_indices, dest, metadata, storage_state):
            for i in my_range(len(batch_indices)):
                bix = batch_indices[i]
                met = metadata[bix]
                data = mem_read(met['episode_ptr'], storage_state)
                frame_start, frame_end = met['frame_start'], met['frame_end']
                original_data = data.view(d).reshape(-1, *s)
                sl = original_data[frame_start:frame_end]
                d_ptr = dest[i]
                cp_src = sl.ravel()
                cp_dst = d_ptr.ravel()
                my_memcpy(cp_src.view('uint8'), cp_dst.view('uint8'))
            return dest

        return decode
            
        
    

class MyCustomField(Field):
    def __init__(self, frames_per_sample, dtype:np.dtype, shape:Tuple[int, ...]):
        self.episodes_stored = dict()  # We remember which episodes have been stored
        self.dtype = dtype
        self.shape = shape
        self.frames_per_sample = frames_per_sample
        self.element_size = dtype.itemsize * np.prod(shape)

    @property
    def metadata_type(self) -> np.dtype:
        return RecordType

    @staticmethod
    def from_binary(binary: ARG_TYPE) -> Field:
        header_size = NDArrayArgsType.itemsize
        header = binary[:header_size].view(NDArrayArgsType)[0]
        type_length = header['type_length']
        type_data = binary[header_size:][:type_length].tobytes().decode('ascii')
        type_desc = json.loads(type_data)
        type_desc = [tuple(x) for x in type_desc]
        assert len(type_desc) == 1
        dtype = np.dtype(type_desc)['f0']
        shape = list(header['shape'])
        while shape[-1] == 0:
            shape.pop()

        return MyCustomField(header['frames_per_sample'], dtype, tuple(shape))

    def to_binary(self) -> ARG_TYPE:
        result = np.zeros(1, dtype=ARG_TYPE)[0]
        header = np.zeros(1, dtype=NDArrayArgsType)
        s = np.array(self.shape).astype('<u8')
        header['shape'][0][:len(s)] = s
        header['frames_per_sample'] = self.frames_per_sample
        encoded_type = json.dumps(self.dtype.descr)
        encoded_type = np.frombuffer(encoded_type.encode('ascii'), dtype='<u1')
        header['type_length'][0] = len(encoded_type)
        to_write = np.concatenate([header.view('<u1'), encoded_type])
        result[0][:to_write.shape[0]] = to_write
        return result

    def encode(self, destination, field, malloc):
        frame_start, frame_end, episode_ix, episode_content = field
        
        # We check if we already stored the content of that episode in the dataset
        if episode_ix in self.episodes_stored:
            addr = self.episodes_stored[episode_ix]
            _, _ = malloc(0) # Dirty hack required for now 
        else:  # We need to allocate memory to store it and store it there
            addr, region = malloc(episode_content.itemsize * episode_content.size)
            region[:] = episode_content.ravel().view('<u1')
            self.episodes_stored[episode_ix] = addr
            
        destination['episode_ptr'] = addr
        destination['frame_start'] = frame_start
        destination['frame_end'] = frame_end

    def get_decoder_class(self):
        return CustomFieldDecoder

And usage:

test = DatasetWriter('/tmp/test.beton', {
    'sequence': MyCustomField(dataset.frames_per_sample, np.dtype('float64'), dataset.shape)
}, num_workers=1)
test.from_indexed_dataset(dataset)
loader = Loader('/tmp/test.beton', custom_fields={
    'sequence': MyCustomField
}, batch_size=5)

Warning: It only really works with num_workers=1 and when using os_cache=False it will only work if either a sample allocate memory for all fields or for None. It will fail if one field is skipped but not another.

GuillaumeLeclerc avatar Jan 23 '22 02:01 GuillaumeLeclerc

If this kind of thing is useful I could make the necessary changes to the writer to allow for multiple workers and support QUASI_RANDOM

GuillaumeLeclerc avatar Jan 23 '22 02:01 GuillaumeLeclerc

Hi, this script seems to not quite work in its current form, it throws an error when I try to read from the loader:

>> for item in loader: print(item)

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/home/ubuntu/miniconda3/envs/ffcv/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
  File "/home/ubuntu/miniconda3/envs/ffcv/lib/python3.9/site-packages/ffcv/loader/epoch_iterator.py", line 80, in run
    result = self.run_pipeline(b_ix, ixes, slot, events[slot])
  File "/home/ubuntu/miniconda3/envs/ffcv/lib/python3.9/site-packages/ffcv/loader/epoch_iterator.py", line 134, in run_pipeline
    result = code(*args)
ValueError: cannot assign slice from input of different size

It succeeds on 6 iterations before crashing, on the 3rd point out of 5 on the 7th minibatch. I've been trying to debug it but no luck so far. I've narrowed it down to an issue with data = mem_read(met['episode_ptr'], storage_state) sometimes yielding an array of size 0.

Also, do you think you could explain the high-level roles of the various function calls a bit further? What is the difference between the roles of encode/decode and to_binary/from_binary? Why is the decoder implemented as its own class, rather than as a function decode()?

jacobmanifold avatar Jan 31 '22 21:01 jacobmanifold

Sorry for the delay. I'll try to debug that now but I'll answer your general questions first.

  1. to_binary and from binary are there to encode per_file settings that apply to a given field. In this example we are storing the shape of the individual frames. When the dataset is loaded this is then read and used to reshape the data appropriately. Without it there would be no way to distinguish between datasets with a 4x4 or a 2x8 frame. Storing it on a sample basis would mean a lot of wasted space. In general each Field has 1024bytes to store whatever they feel like and they will be able to decode these bytes however they want when the dataset is loaded
  2. Decode is implemented in its own class because for a given Field there might be different strategies to decode it. For example, with images, when they all have different resolution you could do pick a RandomCrop or a CenterCrop or Resize them. One could say that this choice can be encoded with the mechanism described above but it would mean that one has to regenerate a dataset if they want to change the strategy. It is therefore more flexible for a Field to declare a base class that all decoders have to inherit to declare that they are compatible with a given field and are able to decode it.

GuillaumeLeclerc avatar Feb 02 '22 08:02 GuillaumeLeclerc

Everything should be fixed @jacobmanifold . I was simply writing the result in bix instead of i which was incorrect and out of bounds most of the time.

GuillaumeLeclerc avatar Feb 02 '22 09:02 GuillaumeLeclerc

Thank you! I think you still have a small bug in decode where my_memcpy(data, dest[i].view(np.uint8)) should be my_memcpy(sl, dest[i].view(np.uint8)).

Running this code now, it doesn't crash, but seems to still be an issue where datapoints are improperly loaded. To see this, I modified the dataset to load all-1s instead of all-random, and some points seem to get loaded as very small numbers instead.

Here's the script I'm currently running:

Code
import numpy as np
from ffcv import Loader
from ffcv.writer import DatasetWriter
from ffcv.fields import NDArrayField, FloatField
from ffcv.fields.decoders import NDArrayDecoder, FloatDecoder
from ffcv.transforms import ToTensor
from ffcv.pipeline.operation import Operation


# Mock Dataset
class DemoDataset:

    def __init__(self, num_episodes, num_samples_per_episode,
                 frame_dim=(4, 4),
                 frames_before=2,
                 frames_after=1):
        self.data = np.ones([num_episodes, num_samples_per_episode, *frame_dim])
        self.frames_before = frames_before
        self.frames_after = frames_after
        self.num_episodes = num_episodes
        self.shape = frame_dim
        self.valid_samples_per_episode = self.data.shape[1] - self.frames_before - self.frames_after
        self.frames_per_sample = frames_before + frames_after + 1

    def __len__(self):
        return self.valid_samples_per_episode * self.num_episodes

    def __getitem__(self, ix):
        episode_ix = ix // self.valid_samples_per_episode
        frame_ix = ix % self.valid_samples_per_episode
        episode = self.data[episode_ix]
        frame_start = frame_ix
        frame_end = frame_ix + self.frames_before + 1 + self.frames_after
        return ((frame_start, frame_end, episode_ix, episode),)


from typing import Callable, TYPE_CHECKING, Tuple, Type
import json
from dataclasses import replace

import numpy as np

from ffcv.fields.base import Field, ARG_TYPE
from ffcv.pipeline.operation import Operation
from ffcv.pipeline.state import State
from ffcv.pipeline.compiler import Compiler
from ffcv.pipeline.allocation_query import AllocationQuery
from ffcv.libffcv import memcpy
from ffcv.loader import OrderOption

if TYPE_CHECKING:
    from ..memory_managers.base import MemoryManager

NDArrayArgsType = np.dtype([
    ('shape', '<u8', 32),  # 32 is the max number of dimensions for numpy
    ('type_length', '<u8'),  # length of the dtype description
    ('frames_per_sample', '<u8'),
])

RecordType = np.dtype([
    ('episode_ptr', '<u8'),  # We store the address of the content of the episode
    ('frame_start', '<u4'),  # This is where the sequence start in the episode
    ('frame_end', '<u4'),  # This is where the sequence ends in the episode
])

from typing import Callable, TYPE_CHECKING, Tuple, Type
import json
from dataclasses import replace

import numpy as np

from ffcv.fields.base import Field, ARG_TYPE
from ffcv.pipeline.operation import Operation
from ffcv.pipeline.state import State
from ffcv.pipeline.compiler import Compiler
from ffcv.pipeline.allocation_query import AllocationQuery
from ffcv.libffcv import memcpy

if TYPE_CHECKING:
    from ..memory_managers.base import MemoryManager

NDArrayArgsType = np.dtype([
    ('shape', '<u8', 32),  # 32 is the max number of dimensions for numpy
    ('type_length', '<u8'),  # length of the dtype description
    ('frames_per_sample', '<u8'),
])

RecordType = np.dtype([
    ('episode_ptr', '<u8'),  # We store the address of the content of the episode
    ('frame_start', '<u4'),  # This is where the sequence start in the episode
    ('frame_end', '<u4'),  # This is where the sequence ends in the episode
])


class CustomFieldDecoder(Operation):
    def declare_state_and_memory(self, previous_state: State) -> Tuple[State, AllocationQuery]:
        my_shape = (self.field.frames_per_sample, *self.field.shape)
        return (replace(previous_state, jit_mode=True, shape=my_shape, dtype=self.field.dtype),
                AllocationQuery(shape=my_shape, dtype=self.field.dtype))

    def generate_code(self) -> Callable:
        mem_read = self.memory_read
        my_range = Compiler.get_iterator()
        my_memcpy = Compiler.compile(memcpy)
        s = self.field.shape
        d = self.field.dtype

        def decode(batch_indices, dest, metadata, storage_state):
            for i in my_range(len(batch_indices)):
                bix = batch_indices[i]
                met = metadata[bix]
                data = mem_read(met['episode_ptr'], storage_state)
                frame_start, frame_end = met['frame_start'], met['frame_end']
                original_data = data.view(d).reshape(-1, *s)
                sl = original_data[frame_start:frame_end]
                my_memcpy(sl, dest[i].view(np.uint8))

            ix = batch_indices[0]
            return dest

        return decode


class MyCustomField(Field):
    def __init__(self, frames_per_sample, dtype: np.dtype, shape: Tuple[int, ...]):
        self.episodes_stored = dict()  # We remember which episodes have been stored
        self.dtype = dtype
        self.shape = shape
        self.frames_per_sample = frames_per_sample
        self.element_size = dtype.itemsize * np.prod(shape)

    @property
    def metadata_type(self) -> np.dtype:
        return RecordType

    @staticmethod
    def from_binary(binary: ARG_TYPE) -> Field:
        header_size = NDArrayArgsType.itemsize
        header = binary[:header_size].view(NDArrayArgsType)[0]
        type_length = header['type_length']
        type_data = binary[header_size:][:type_length].tobytes().decode('ascii')
        type_desc = json.loads(type_data)
        type_desc = [tuple(x) for x in type_desc]
        assert len(type_desc) == 1
        dtype = np.dtype(type_desc)['f0']
        shape = list(header['shape'])
        while shape[-1] == 0:
            shape.pop()

        return MyCustomField(header['frames_per_sample'], dtype, tuple(shape))

    def to_binary(self) -> ARG_TYPE:
        result = np.zeros(1, dtype=ARG_TYPE)[0]
        header = np.zeros(1, dtype=NDArrayArgsType)
        s = np.array(self.shape).astype('<u8')
        header['shape'][0][:len(s)] = s
        header['frames_per_sample'] = self.frames_per_sample
        encoded_type = json.dumps(self.dtype.descr)
        encoded_type = np.frombuffer(encoded_type.encode('ascii'), dtype='<u1')
        header['type_length'][0] = len(encoded_type)
        to_write = np.concatenate([header.view('<u1'), encoded_type])
        result[0][:to_write.shape[0]] = to_write
        return result

    def encode(self, destination, field, malloc):
        frame_start, frame_end, episode_ix, episode_content = field

        # We check if we already stored the content of that episode in the dataset
        if episode_ix in self.episodes_stored:
            addr = self.episodes_stored[episode_ix]
            _, _ = malloc(0)  # Dirty hack required for now
        else:  # We need to allocate memory to store it and store it there
            addr, region = malloc(episode_content.itemsize * episode_content.size)
            region[:] = episode_content.ravel().view('<u1')
            self.episodes_stored[episode_ix] = addr

        destination['episode_ptr'] = addr
        destination['frame_start'] = frame_start
        destination['frame_end'] = frame_end

    def get_decoder_class(self):
        return CustomFieldDecoder


dataset = DemoDataset(10, 15, frames_after=2)

test = DatasetWriter('/tmp/test.beton', {
    'sequence': MyCustomField(dataset.frames_per_sample, np.dtype('float64'), dataset.shape)
}, num_workers=1)
test.from_indexed_dataset(dataset)

loader = Loader('/tmp/test.beton', custom_fields={
    'sequence': MyCustomField
}, batch_size=12, os_cache=False, order=OrderOption.SEQUENTIAL)

for item in loader: print(item[0][:,0,0,0].tolist())

And the output:

[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.8504618152133806e-307, 2.2022581597627517e-308]
[2.1935664976514946e-308, 2.1935664976514946e-308, 2.1935664976514946e-308, 2.1935664976514946e-308, 2.209863455298968e-308, 2.204431116099213e-308, 2.208776997146854e-308, 2.195739388728061e-308, 2.19573938872806e-308, 2.195739388728061e-308, 2.19573938872806e-308, 2.195739388728061e-308]
[2.204431116101237e-308, 1.4528842266508651e-303, 1.965409477558363e-308, 2.195739451314053e-308, 2.195739451314052e-308, 2.195739451314053e-308, 2.195739451314052e-308, 2.195739451314053e-308, 1.0, 1.0, 1.0, 1.0]
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.195739455001471e-308, 2.195739455001471e-308, 2.195739455001471e-308, 2.1924800657793804e-308, 2.204431116101237e-308]
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
[1.0, 1.0, 1.0, 1.0, 2.209863455298968e-308, 2.204431116099213e-308, 2.208776997146854e-308, 2.195739388728061e-308, 2.19573938872806e-308, 2.195739388728061e-308, 2.19573938872806e-308, 2.195739388728061e-308]
[2.204431116101237e-308, 1.4528842266508651e-303, 1.965409477558363e-308, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]

jacobmanifold avatar Feb 02 '22 16:02 jacobmanifold

Hi there, I have been giving this a bit of thought, and wanted to propose an alternative way of achieving this. Please let me know if this sounds like a sound approach:

  1. Dump the dataset "as usual" (ie no sequence involved) where each datapoint may contain several fields for observation, reward, action, etc.
  2. For each field type, create a decoder variant that accepts an offset. By this I mean replace https://github.com/libffcv/ffcv/blob/bfd9b3d85e31360fada2ecf63bea5602e4774ba3/ffcv/fields/ndarray.py#L37 with sample_id = indices[ix] + offset where offset is a parameter of the class.
  3. Finally, use the new branch new_codegen to create several pipelines for the same field, something like:
pipelines={
     "obs0": PipelineSpec(source="obs", transforms=[NDArrayDecoderWithOffset(0), ToTensor()]),
     "obs1": PipelineSpec(source="obs", transforms=[NDArrayDecoderWithOffset(1), ToTensor()]),
     "rew0": PipelineSpec(source="reward", transforms=[FloatDecoderWithOffset(0), ToTensor()]),
     "rew1": PipelineSpec(source="reward", transforms=[FloatDecoderWithOffset(1), ToTensor()]),
}
  1. One caveat is that the user must provide a list of ids to the Loader to avoid out-of-bounds access. This can be done by providing a SequenceLoader that takes care of this.

How does that sound? Is it safe to modify the sample_id like that (provided that we avoid out of bounds access)? I have tried some initial prototypes that seemed to be giving the correct result.

One thing I haven't really figured out yet is related to data augmentation. If we apply random augmentations, for example random cropping, then most likely we want the exact same transform applied to all the observations. Is there a way to achieve that? One way I can think of is to stack the observations that need to be together, then apply augmentations directly to the stack. Something like "stacked_obs": PipelineSpec(source=["obs0", "obs1"], transforms=[torch.stack()]) However, AFAICT currently PipelineSpec doesn't accept several sources.

Let me know what you think Best

alcinos avatar Feb 14 '22 15:02 alcinos