petastorm
petastorm copied to clipboard
"Currently do not support resetting a reader while in the middle of iteration."
I have this piece of code:
batch_size = 8
with converter_train.make_torch_dataloader(transform_spec=transform, batch_size=batch_size, num_epochs=1) as dataloader_train, \
converter_test.make_torch_dataloader(transform_spec=transform, batch_size=batch_size, num_epochs=1) as dataloader_test:
trainer.fit(
model,
dataloader_train,
dataloader_test,
)
when it gets to the validation bit, it throws the previously mentioned
Currently do not support resetting a reader while in the middle of iteration.
I am using PyTorch Lightning
The stack looks like this:
---------------------------------------------------------------------------
NotImplementedError Traceback (most recent call last)
<command-128477> in <module>
65 model,
66 dataloader_train,
---> 67 dataloader_test,
68 )
/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/pytorch_lightning/trainer/states.py in wrapped_fn(self, *args, **kwargs)
46 if entering is not None:
47 self.state = entering
---> 48 result = fn(self, *args, **kwargs)
49
50 # The INTERRUPTED state can be set inside the run function. To indicate that run was interrupted
/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/pytorch_lightning/trainer/trainer.py in fit(self, model, train_dataloader, val_dataloaders, datamodule)
1071 self.accelerator_backend = GPUBackend(self)
1072 model = self.accelerator_backend.setup(model)
-> 1073 results = self.accelerator_backend.train(model)
1074
1075 elif self.use_tpu:
/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/pytorch_lightning/accelerators/gpu_backend.py in train(self, model)
49
50 def train(self, model):
---> 51 results = self.trainer.run_pretrain_routine(model)
52 return results
53
/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/pytorch_lightning/trainer/trainer.py in run_pretrain_routine(self, model)
1237
1238 # CORE TRAINING LOOP
-> 1239 self.train()
1240
1241 def _run_sanity_check(self, ref_model, model):
/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/pytorch_lightning/trainer/training_loop.py in train(self)
392 # RUN TNG EPOCH
393 # -----------------
--> 394 self.run_training_epoch()
395
396 if self.max_steps and self.max_steps <= self.global_step:
/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/pytorch_lightning/trainer/training_loop.py in run_training_epoch(self)
514 should_check_val = self.should_check_val(batch_idx, is_last_batch)
515 if should_check_val:
--> 516 self.run_evaluation(test_mode=False)
517
518 # -----------------------------------------
/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/pytorch_lightning/trainer/evaluation_loop.py in run_evaluation(self, test_mode)
580
581 # run evaluation (val_step + val_step_end + val_epoch_end)
--> 582 eval_results = self._evaluate(self.model, dataloaders, max_batches, test_mode)
583
584 # log the final eval loop metrics
/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/pytorch_lightning/trainer/evaluation_loop.py in _evaluate(self, model, dataloaders, max_batches, test_mode)
303 dl_max_batches = max_batches[dataloader_idx]
304
--> 305 for batch_idx, batch in enumerate(dataloader):
306 if batch is None:
307 continue
/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/petastorm/pytorch.py in __iter__(self)
113 raise RuntimeError(_PARALLEL_ITER_ERROR)
114 if self._in_iter is not None:
--> 115 self.reader.reset()
116 logger.warning('Start a new pass of Petastorm DataLoader, reset underlying Petastorm reader to position 0.')
117 self._in_iter = True
/local_disk0/.ephemeral_nfs/envs/pythonEnv-af2dcbc2-ffc2-4df8-b9aa-f92890c53c30/lib/python3.7/site-packages/petastorm/reader.py in reset(self)
456 # drop these in-flight samples? Or just ignore it? What would happen if we have two concurrent ventilators
457 # that are emitting load requests at the same time?
--> 458 raise NotImplementedError('Currently do not support resetting a reader while in the middle of iteration. '
459 'You can call reset only after all samples were consumed.')
460 self.last_row_consumed = False
NotImplementedError: Currently do not support resetting a reader while in the middle of iteration. You can call reset only after all samples were consumed.
Lightning trainer config:
trainer = Trainer(
max_epochs=50,
gpus=1,
logger=logger,
check_val_every_n_epoch=5,
progress_bar_refresh_rate=250,
)
Is there something that I am doing wrong?
The error indicates that we were trying to reset()
a reader before the number of epochs specified during its creation was consumed. It's hard to say what went wrong without seeing how do you create the reader/dataloader. Can you please post that code?
Also, please post the version of pytorch_lightning that you use.
petastorm[torch] (0.9.5)
pytorch-lightning==0.9.0
Creation code (a bit simplified)
indexed_dataset = some_data \
.zipWithUniqueId()
def is_validation(x):
return (x[1] % 100) >= 80
def not_(f):
def step(x):
return not f(x)
return step
validation_dataset = indexed_dataset \
.filter(is_validation) \
.map(lambda x: x[0])
training_dataset = indexed_dataset \
.filter(not_(is_validation)) \
.map(lambda x: x[0])
def rdd_to_df(rdd):
rdd = rdd.map(lambda x: Row(**(x)))
return spark.createDataFrame(rdd)
training_dataset_rdd = training_dataset.map(vectorize).map(create_pytorch_dict_with_numpy_bytes)
validation_dataset_rdd = validation_dataset.map(vectorize).map(create_pytorch_dict_with_numpy_bytes)
converter_train = make_spark_converter(rdd_to_df(training_dataset_rdd))
converter_test = make_spark_converter(rdd_to_df(validation_dataset_rdd))
For now, I've loaded the validation dataset into memory and used the regular DataLoader and it seems to work fine.
But I've also noticed a strange behavior just recently - if I start training from "scratch", the process will train as it is supposed to - it will go from epoch 1 to 50. However, if I reuse the same session run and run the pipeline without modifications, it will run just the last epoch and terminate.
@selitvin do you have any ideas why that could happen? :)
Thanks
Hmm, I am not sure. I am not familiar with pytorch-lighting and how they interact with DataLoader.
I am facing the same issue using a custom petastorm.Dataloader
(actually I only provide a custom collate function, not changing the Dataloader at all) and feeding it to pytorch-lightning Trainer. @tadas-subonis were you able to fix it or we shall rely on the default Dataloader if using pytorch-lightning? By chance, did you experiment with PyTorch to see if in that case, it works as expected?
Is there any progress on this?
Is there any progress on this?
I had to switch to plain PyTorch, was not able to do it using Pytorch Lightning,
Also getting this issue. At first I was using "limit_train_batches" in my Pytorch Lightning trainer, but I removed it and still ended up with this error. PTL runs two validation steps at the beginning of the model, then returns to the validation dataloader when it finishes a training epoch. I assume, upon beginning the full validation run, it resets the val dataloader. If this is what's happening, it sounds like Pytorch Lightning is not supported. If so, that should be made clear somewhere.
Unfortunately, I cannot copy my stack trace due to some VPN things, but the error occurs at line 116 in petastorm/pytorch.py
and then line 490 at petastorm/reader.py
I solved this w/ num_sanity_val_steps=0
in my PytorchLightning Trainer! Seems like my intuition above was correct, and after turning the sanity check off, the val set doesnt need be reset, and no errors occur.
Nice!
As an alternative fix to this, there's a relatively simple workaround that involves subclassing the Petastorm DataLoader, such that instead of being initialized with a Reader object, it is initialized with a zero-argument function which returns a Reader, so that the loader can re-create its reader if needs be. A possible implementation of this is given below:
import logging
from petastorm.pytorch import _PARALLEL_ITER_ERROR, DataLoader, decimal_friendly_collate
logger = logging.getLogger(__name__)
class SafeResetLoader(DataLoader):
"""
A data loader adaptor for ``torch.utils.data.DataLoader``.
This class iterates and returns items from the Reader in batches.
Behaves the same as the Petastorm dataloader, except it takes a function which generates a reader,
so that it can reset itself at any time.
This loader can be used as an iterator and will terminate when the reader used in the construction of the class
runs out of samples.
"""
def __init__(self, reader_fn, batch_size=1, collate_fn=decimal_friendly_collate,
shuffling_queue_capacity=0):
"""
Initializes a data loader object, with a default collate.
Number of epochs is defined by the configuration of the reader argument.
An optional shuffling queue is created if shuffling_queue_capacity is greater than 0. No samples will be
returned to a user by the ``DataLoader`` until the queue is full. After that, batches of `batch_size`
will be created by uniformly sampling the shuffling queue. Once no more samples are available from the data
reader, the shuffling queue is allowed to be consumed till no further samples are available.
Note that the last returned batch could have less then ``batch_size`` samples.
NOTE: ``make_batch_reader`` has it's own ``shuffle_row_groups`` argument. It randomizes order in
which parquet row-groups are loaded and has no effect on the order of rows within each row-group. To achieve
row-level shuffling you should set shuffling_queue_capacity to a non zero value.
:param reader: petastorm Reader instance
:param batch_size: the number of items to return per batch; factored into the len() of this reader
:param collate_fn: an optional callable to merge a list of samples to form a mini-batch.
:param shuffling_queue_capacity: Queue capacity is passed to the underlying :class:`tf.RandomShuffleQueue`
instance. If set to 0, no shuffling will be done.
"""
super().__init__(reader_fn(),batch_size=batch_size,collate_fn=collate_fn,
shuffling_queue_capacity=shuffling_queue_capacity)
self._reader_fn = reader_fn
def __iter__(self):
if self._error is not None:
raise RuntimeError('Cannot start a new iteration because last time iteration failed with error {err}.'
.format(err=repr(self._error)))
if self._in_iter is not None and self._in_iter == True: # noqa: E712
raise RuntimeError(_PARALLEL_ITER_ERROR)
if self._in_iter is not None:
try:
self.reader.reset()
except NotImplementedError:
# Called reset in the middle of an iteration.
self.reader.stop()
self.reader.join()
self.reader = self._reader_fn()
logger.warning('Start a new pass of Petastorm DataLoader, reset underlying Petastorm reader to position 0.')
self._in_iter = True
try:
for batch in self._iter_impl():
yield batch
except Exception as e:
self._error = e
logger.error('Iteration on Petastorm DataLoader raise error: %s', repr(e))
raise
finally:
self._in_iter = False
def terminate(self):
self.reader.stop()
self.reader.join()
This loader can then be called in the following manner:
import functools
make_reader_fn = functools.partial(partial(make_reader, ...))
loader = SafeResetLoader(make_reader_fn, collate_fn=..., batch_size=...)