Support for RL workloads
After receiving request from multiple I'm creating this issue to track the progress on the support for RL workloads within FFCV.
The main current need right now is:
- Allow sharing of data between multiple samples (as a given training sample is usually characterized as a subset of a sequence shared among multiple other samples).
It seems that it should be possible to do that with a custom field but I'm currently coming up with an example and tests to make sure that this use case is fully supported.
I could use some volunteers to update the documentation though
Here is an example I made using FFCV custom fields where there is a piece of data (episode) that is shared between multiple samples. I hope it helps some people out there.
Dataset Code to import
import numpy as np
from ffcv import Loader
from ffcv.writer import DatasetWriter
from ffcv.fields import NDArrayField, FloatField
from ffcv.fields.decoders import NDArrayDecoder, FloatDecoder
from ffcv.transforms import ToTensor
from ffcv.pipeline.operation import Operation
# Mock Dataset
class DemoDataset:
def __init__(self, num_episodes, num_samples_per_episode,
frame_dim=(4, 4),
frames_before=2,
frames_after=1):
self.data = np.random.randn(num_episodes, num_samples_per_episode, *frame_dim)
self.frames_before = frames_before
self.frames_after = frames_after
self.num_episodes = num_episodes
self.shape = frame_dim
self.valid_samples_per_episode = self.data.shape[1] - self.frames_before - self.frames_after
self.frames_per_sample = frames_before + frames_after + 1
def __len__(self):
return self.valid_samples_per_episode * self.num_episodes
def __getitem__(self, ix):
frame_ix = ix % self.valid_samples_per_episode
episode_ix = ix // self.valid_samples_per_episode
episode = self.data[episode_ix]
frame_start = frame_ix
frame_end = frame_ix + self.frames_before + self.frames_after + 1
return ((frame_start, frame_end, episode_ix, episode),)
# We instantiate the dataset
dataset = DemoDataset(10, 15, frames_after=2)
The code for the custom Field
from typing import Callable, TYPE_CHECKING, Tuple, Type
import json
from dataclasses import replace
import numpy as np
from ffcv.fields.base import Field, ARG_TYPE
from ffcv.pipeline.operation import Operation
from ffcv.pipeline.state import State
from ffcv.pipeline.compiler import Compiler
from ffcv.pipeline.allocation_query import AllocationQuery
from ffcv.libffcv import memcpy
if TYPE_CHECKING:
from ..memory_managers.base import MemoryManager
NDArrayArgsType = np.dtype([
('shape', '<u8', 32), # 32 is the max number of dimensions for numpy
('type_length', '<u8'), # length of the dtype description
('frames_per_sample', '<u8'),
])
RecordType = np.dtype([
('episode_ptr', '<u8'), # We store the address of the content of the episode
('frame_start', '<u4'), # This is where the sequence start in the episode
('frame_end', '<u4'), # This is where the sequence ends in the episode
])
class CustomFieldDecoder(Operation):
def declare_state_and_memory(self, previous_state: State) -> Tuple[State, AllocationQuery]:
my_shape = (self.field.frames_per_sample, *self.field.shape)
return (replace(previous_state, jit_mode=True, shape=my_shape, dtype=self.field.dtype),
AllocationQuery(shape=my_shape, dtype=self.field.dtype))
def generate_code(self) -> Callable:
mem_read = self.memory_read
my_range = Compiler.get_iterator()
my_memcpy = Compiler.compile(memcpy)
s = self.field.shape
d = self.field.dtype
def decode(batch_indices, dest, metadata, storage_state):
for i in my_range(len(batch_indices)):
bix = batch_indices[i]
met = metadata[bix]
data = mem_read(met['episode_ptr'], storage_state)
frame_start, frame_end = met['frame_start'], met['frame_end']
original_data = data.view(d).reshape(-1, *s)
sl = original_data[frame_start:frame_end]
d_ptr = dest[i]
cp_src = sl.ravel()
cp_dst = d_ptr.ravel()
my_memcpy(cp_src.view('uint8'), cp_dst.view('uint8'))
return dest
return decode
class MyCustomField(Field):
def __init__(self, frames_per_sample, dtype:np.dtype, shape:Tuple[int, ...]):
self.episodes_stored = dict() # We remember which episodes have been stored
self.dtype = dtype
self.shape = shape
self.frames_per_sample = frames_per_sample
self.element_size = dtype.itemsize * np.prod(shape)
@property
def metadata_type(self) -> np.dtype:
return RecordType
@staticmethod
def from_binary(binary: ARG_TYPE) -> Field:
header_size = NDArrayArgsType.itemsize
header = binary[:header_size].view(NDArrayArgsType)[0]
type_length = header['type_length']
type_data = binary[header_size:][:type_length].tobytes().decode('ascii')
type_desc = json.loads(type_data)
type_desc = [tuple(x) for x in type_desc]
assert len(type_desc) == 1
dtype = np.dtype(type_desc)['f0']
shape = list(header['shape'])
while shape[-1] == 0:
shape.pop()
return MyCustomField(header['frames_per_sample'], dtype, tuple(shape))
def to_binary(self) -> ARG_TYPE:
result = np.zeros(1, dtype=ARG_TYPE)[0]
header = np.zeros(1, dtype=NDArrayArgsType)
s = np.array(self.shape).astype('<u8')
header['shape'][0][:len(s)] = s
header['frames_per_sample'] = self.frames_per_sample
encoded_type = json.dumps(self.dtype.descr)
encoded_type = np.frombuffer(encoded_type.encode('ascii'), dtype='<u1')
header['type_length'][0] = len(encoded_type)
to_write = np.concatenate([header.view('<u1'), encoded_type])
result[0][:to_write.shape[0]] = to_write
return result
def encode(self, destination, field, malloc):
frame_start, frame_end, episode_ix, episode_content = field
# We check if we already stored the content of that episode in the dataset
if episode_ix in self.episodes_stored:
addr = self.episodes_stored[episode_ix]
_, _ = malloc(0) # Dirty hack required for now
else: # We need to allocate memory to store it and store it there
addr, region = malloc(episode_content.itemsize * episode_content.size)
region[:] = episode_content.ravel().view('<u1')
self.episodes_stored[episode_ix] = addr
destination['episode_ptr'] = addr
destination['frame_start'] = frame_start
destination['frame_end'] = frame_end
def get_decoder_class(self):
return CustomFieldDecoder
And usage:
test = DatasetWriter('/tmp/test.beton', {
'sequence': MyCustomField(dataset.frames_per_sample, np.dtype('float64'), dataset.shape)
}, num_workers=1)
test.from_indexed_dataset(dataset)
loader = Loader('/tmp/test.beton', custom_fields={
'sequence': MyCustomField
}, batch_size=5)
Warning: It only really works with num_workers=1 and when using os_cache=False it will only work if either a sample allocate memory for all fields or for None. It will fail if one field is skipped but not another.
If this kind of thing is useful I could make the necessary changes to the writer to allow for multiple workers and support QUASI_RANDOM
Hi, this script seems to not quite work in its current form, it throws an error when I try to read from the loader:
>> for item in loader: print(item)
Exception in thread Thread-2:
Traceback (most recent call last):
File "/home/ubuntu/miniconda3/envs/ffcv/lib/python3.9/threading.py", line 973, in _bootstrap_inner
self.run()
File "/home/ubuntu/miniconda3/envs/ffcv/lib/python3.9/site-packages/ffcv/loader/epoch_iterator.py", line 80, in run
result = self.run_pipeline(b_ix, ixes, slot, events[slot])
File "/home/ubuntu/miniconda3/envs/ffcv/lib/python3.9/site-packages/ffcv/loader/epoch_iterator.py", line 134, in run_pipeline
result = code(*args)
ValueError: cannot assign slice from input of different size
It succeeds on 6 iterations before crashing, on the 3rd point out of 5 on the 7th minibatch. I've been trying to debug it but no luck so far. I've narrowed it down to an issue with data = mem_read(met['episode_ptr'], storage_state) sometimes yielding an array of size 0.
Also, do you think you could explain the high-level roles of the various function calls a bit further? What is the difference between the roles of encode/decode and to_binary/from_binary? Why is the decoder implemented as its own class, rather than as a function decode()?
Sorry for the delay. I'll try to debug that now but I'll answer your general questions first.
- to_binary and from binary are there to encode per_file settings that apply to a given field. In this example we are storing the shape of the individual frames. When the dataset is loaded this is then read and used to reshape the data appropriately. Without it there would be no way to distinguish between datasets with a 4x4 or a 2x8 frame. Storing it on a sample basis would mean a lot of wasted space. In general each Field has 1024bytes to store whatever they feel like and they will be able to decode these bytes however they want when the dataset is loaded
- Decode is implemented in its own class because for a given Field there might be different strategies to decode it. For example, with images, when they all have different resolution you could do pick a RandomCrop or a CenterCrop or Resize them. One could say that this choice can be encoded with the mechanism described above but it would mean that one has to regenerate a dataset if they want to change the strategy. It is therefore more flexible for a Field to declare a base class that all decoders have to inherit to declare that they are compatible with a given field and are able to decode it.
Everything should be fixed @jacobmanifold . I was simply writing the result in bix instead of i which was incorrect and out of bounds most of the time.
Thank you! I think you still have a small bug in decode where my_memcpy(data, dest[i].view(np.uint8)) should be my_memcpy(sl, dest[i].view(np.uint8)).
Running this code now, it doesn't crash, but seems to still be an issue where datapoints are improperly loaded. To see this, I modified the dataset to load all-1s instead of all-random, and some points seem to get loaded as very small numbers instead.
Here's the script I'm currently running:
Code
import numpy as np
from ffcv import Loader
from ffcv.writer import DatasetWriter
from ffcv.fields import NDArrayField, FloatField
from ffcv.fields.decoders import NDArrayDecoder, FloatDecoder
from ffcv.transforms import ToTensor
from ffcv.pipeline.operation import Operation
# Mock Dataset
class DemoDataset:
def __init__(self, num_episodes, num_samples_per_episode,
frame_dim=(4, 4),
frames_before=2,
frames_after=1):
self.data = np.ones([num_episodes, num_samples_per_episode, *frame_dim])
self.frames_before = frames_before
self.frames_after = frames_after
self.num_episodes = num_episodes
self.shape = frame_dim
self.valid_samples_per_episode = self.data.shape[1] - self.frames_before - self.frames_after
self.frames_per_sample = frames_before + frames_after + 1
def __len__(self):
return self.valid_samples_per_episode * self.num_episodes
def __getitem__(self, ix):
episode_ix = ix // self.valid_samples_per_episode
frame_ix = ix % self.valid_samples_per_episode
episode = self.data[episode_ix]
frame_start = frame_ix
frame_end = frame_ix + self.frames_before + 1 + self.frames_after
return ((frame_start, frame_end, episode_ix, episode),)
from typing import Callable, TYPE_CHECKING, Tuple, Type
import json
from dataclasses import replace
import numpy as np
from ffcv.fields.base import Field, ARG_TYPE
from ffcv.pipeline.operation import Operation
from ffcv.pipeline.state import State
from ffcv.pipeline.compiler import Compiler
from ffcv.pipeline.allocation_query import AllocationQuery
from ffcv.libffcv import memcpy
from ffcv.loader import OrderOption
if TYPE_CHECKING:
from ..memory_managers.base import MemoryManager
NDArrayArgsType = np.dtype([
('shape', '<u8', 32), # 32 is the max number of dimensions for numpy
('type_length', '<u8'), # length of the dtype description
('frames_per_sample', '<u8'),
])
RecordType = np.dtype([
('episode_ptr', '<u8'), # We store the address of the content of the episode
('frame_start', '<u4'), # This is where the sequence start in the episode
('frame_end', '<u4'), # This is where the sequence ends in the episode
])
from typing import Callable, TYPE_CHECKING, Tuple, Type
import json
from dataclasses import replace
import numpy as np
from ffcv.fields.base import Field, ARG_TYPE
from ffcv.pipeline.operation import Operation
from ffcv.pipeline.state import State
from ffcv.pipeline.compiler import Compiler
from ffcv.pipeline.allocation_query import AllocationQuery
from ffcv.libffcv import memcpy
if TYPE_CHECKING:
from ..memory_managers.base import MemoryManager
NDArrayArgsType = np.dtype([
('shape', '<u8', 32), # 32 is the max number of dimensions for numpy
('type_length', '<u8'), # length of the dtype description
('frames_per_sample', '<u8'),
])
RecordType = np.dtype([
('episode_ptr', '<u8'), # We store the address of the content of the episode
('frame_start', '<u4'), # This is where the sequence start in the episode
('frame_end', '<u4'), # This is where the sequence ends in the episode
])
class CustomFieldDecoder(Operation):
def declare_state_and_memory(self, previous_state: State) -> Tuple[State, AllocationQuery]:
my_shape = (self.field.frames_per_sample, *self.field.shape)
return (replace(previous_state, jit_mode=True, shape=my_shape, dtype=self.field.dtype),
AllocationQuery(shape=my_shape, dtype=self.field.dtype))
def generate_code(self) -> Callable:
mem_read = self.memory_read
my_range = Compiler.get_iterator()
my_memcpy = Compiler.compile(memcpy)
s = self.field.shape
d = self.field.dtype
def decode(batch_indices, dest, metadata, storage_state):
for i in my_range(len(batch_indices)):
bix = batch_indices[i]
met = metadata[bix]
data = mem_read(met['episode_ptr'], storage_state)
frame_start, frame_end = met['frame_start'], met['frame_end']
original_data = data.view(d).reshape(-1, *s)
sl = original_data[frame_start:frame_end]
my_memcpy(sl, dest[i].view(np.uint8))
ix = batch_indices[0]
return dest
return decode
class MyCustomField(Field):
def __init__(self, frames_per_sample, dtype: np.dtype, shape: Tuple[int, ...]):
self.episodes_stored = dict() # We remember which episodes have been stored
self.dtype = dtype
self.shape = shape
self.frames_per_sample = frames_per_sample
self.element_size = dtype.itemsize * np.prod(shape)
@property
def metadata_type(self) -> np.dtype:
return RecordType
@staticmethod
def from_binary(binary: ARG_TYPE) -> Field:
header_size = NDArrayArgsType.itemsize
header = binary[:header_size].view(NDArrayArgsType)[0]
type_length = header['type_length']
type_data = binary[header_size:][:type_length].tobytes().decode('ascii')
type_desc = json.loads(type_data)
type_desc = [tuple(x) for x in type_desc]
assert len(type_desc) == 1
dtype = np.dtype(type_desc)['f0']
shape = list(header['shape'])
while shape[-1] == 0:
shape.pop()
return MyCustomField(header['frames_per_sample'], dtype, tuple(shape))
def to_binary(self) -> ARG_TYPE:
result = np.zeros(1, dtype=ARG_TYPE)[0]
header = np.zeros(1, dtype=NDArrayArgsType)
s = np.array(self.shape).astype('<u8')
header['shape'][0][:len(s)] = s
header['frames_per_sample'] = self.frames_per_sample
encoded_type = json.dumps(self.dtype.descr)
encoded_type = np.frombuffer(encoded_type.encode('ascii'), dtype='<u1')
header['type_length'][0] = len(encoded_type)
to_write = np.concatenate([header.view('<u1'), encoded_type])
result[0][:to_write.shape[0]] = to_write
return result
def encode(self, destination, field, malloc):
frame_start, frame_end, episode_ix, episode_content = field
# We check if we already stored the content of that episode in the dataset
if episode_ix in self.episodes_stored:
addr = self.episodes_stored[episode_ix]
_, _ = malloc(0) # Dirty hack required for now
else: # We need to allocate memory to store it and store it there
addr, region = malloc(episode_content.itemsize * episode_content.size)
region[:] = episode_content.ravel().view('<u1')
self.episodes_stored[episode_ix] = addr
destination['episode_ptr'] = addr
destination['frame_start'] = frame_start
destination['frame_end'] = frame_end
def get_decoder_class(self):
return CustomFieldDecoder
dataset = DemoDataset(10, 15, frames_after=2)
test = DatasetWriter('/tmp/test.beton', {
'sequence': MyCustomField(dataset.frames_per_sample, np.dtype('float64'), dataset.shape)
}, num_workers=1)
test.from_indexed_dataset(dataset)
loader = Loader('/tmp/test.beton', custom_fields={
'sequence': MyCustomField
}, batch_size=12, os_cache=False, order=OrderOption.SEQUENTIAL)
for item in loader: print(item[0][:,0,0,0].tolist())
And the output:
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.8504618152133806e-307, 2.2022581597627517e-308]
[2.1935664976514946e-308, 2.1935664976514946e-308, 2.1935664976514946e-308, 2.1935664976514946e-308, 2.209863455298968e-308, 2.204431116099213e-308, 2.208776997146854e-308, 2.195739388728061e-308, 2.19573938872806e-308, 2.195739388728061e-308, 2.19573938872806e-308, 2.195739388728061e-308]
[2.204431116101237e-308, 1.4528842266508651e-303, 1.965409477558363e-308, 2.195739451314053e-308, 2.195739451314052e-308, 2.195739451314053e-308, 2.195739451314052e-308, 2.195739451314053e-308, 1.0, 1.0, 1.0, 1.0]
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.195739455001471e-308, 2.195739455001471e-308, 2.195739455001471e-308, 2.1924800657793804e-308, 2.204431116101237e-308]
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
[1.0, 1.0, 1.0, 1.0, 2.209863455298968e-308, 2.204431116099213e-308, 2.208776997146854e-308, 2.195739388728061e-308, 2.19573938872806e-308, 2.195739388728061e-308, 2.19573938872806e-308, 2.195739388728061e-308]
[2.204431116101237e-308, 1.4528842266508651e-303, 1.965409477558363e-308, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
Hi there, I have been giving this a bit of thought, and wanted to propose an alternative way of achieving this. Please let me know if this sounds like a sound approach:
- Dump the dataset "as usual" (ie no sequence involved) where each datapoint may contain several fields for observation, reward, action, etc.
- For each field type, create a decoder variant that accepts an offset. By this I mean replace https://github.com/libffcv/ffcv/blob/bfd9b3d85e31360fada2ecf63bea5602e4774ba3/ffcv/fields/ndarray.py#L37 with
sample_id = indices[ix] + offsetwhereoffsetis a parameter of the class. - Finally, use the new branch
new_codegento create several pipelines for the same field, something like:
pipelines={
"obs0": PipelineSpec(source="obs", transforms=[NDArrayDecoderWithOffset(0), ToTensor()]),
"obs1": PipelineSpec(source="obs", transforms=[NDArrayDecoderWithOffset(1), ToTensor()]),
"rew0": PipelineSpec(source="reward", transforms=[FloatDecoderWithOffset(0), ToTensor()]),
"rew1": PipelineSpec(source="reward", transforms=[FloatDecoderWithOffset(1), ToTensor()]),
}
- One caveat is that the user must provide a list of ids to the Loader to avoid out-of-bounds access. This can be done by providing a SequenceLoader that takes care of this.
How does that sound? Is it safe to modify the sample_id like that (provided that we avoid out of bounds access)? I have tried some initial prototypes that seemed to be giving the correct result.
One thing I haven't really figured out yet is related to data augmentation. If we apply random augmentations, for example random cropping, then most likely we want the exact same transform applied to all the observations. Is there a way to achieve that? One way I can think of is to stack the observations that need to be together, then apply augmentations directly to the stack. Something like "stacked_obs": PipelineSpec(source=["obs0", "obs1"], transforms=[torch.stack()]) However, AFAICT currently PipelineSpec doesn't accept several sources.
Let me know what you think Best