streaming
streaming copied to clipboard
GPU utilisation drop between epochs
- Environment: gcp a2-ultragpu-8g (8 A100), torch==1.13.1, mosaicml-streaming[databricks]>=0.6,<0.7
- Hardware (GPU, or instance type): 8 A100
Hi Team,
I am training a model for retail usecase and I have three streaming data loaders which are interleaving (multitask setup). After the first epoch, I see gpu utilisation drop. How can I make the utilisation close to 100%? I observe the same behaviour if I load from local_disk too, by providing remote=None and local = local_path for all the three data loaders. When I just train with one data loader which loads from local_disk, then I do not observe this behaviour. Ideally, I want the data to be streamed from remote location (gcs in this case) with a util close to 100%. This leads to a lot of ideal time and high training cost.
I also tried playing with the predownload param but there's no improvement as such .
Cycling/interleaving/etc multiple StreamingDatasets/StreamingDataLoaders has the potential to result in complicated situations when it comes to coordination.
Instead, why not just use Streams? The sample space of a StreamingDataset is the concatenation of one or more Streams, which correspond to a serialized Streaming dataset directory, i.e. they are sub-datasets.
(Well, technically, if you use StreamingDataset you are already using Streams, SD just creates a single Stream implicitly behind the scenes and hands various arguments off to it -- if you don't provide Streams explicitly.)
from streaming import Stream, StreamingDataset
first = Stream(local=..., remote=...)
second = Stream(local=..., remote=...)
dataset = StreamingDataset(streams=[first, second], batch_size=...)
@rishabhm12 You should also make sure to set persistent_workers=True
in the DataLoader so that workers are not shut down after each epoch, and the workers' dataset instances will stay alive. More info here
EDIT: using the Composer launcher instead of TorchDistributor and not setting persistent_workers=True seems to address the problem in my testing.
@knighton the same behaviour persists with even a single data loader when streaming from remote. @snarayan21 will keeping the workers alive completely solve for the util drop or will only slightly improve the downtime (from 30 mins to 25 mins), have you tried by changing the argument? I was of the opinion, this behaviour is when forward + back prop << time taken to download shards from remote to local.
@rishabhm12 This should solve the utilization drop if the issue was re-creating the worker StreamingDatasets. As I don't have your script, I don't know the exact improvement it will give you, but we've seen this in the past and it has addressed the problem. Also, since your job is continuing to run on the same set of GPUs/nodes, StreamingDataset is smart about partitioning shard downloads between epochs, so no shard files will have to be downloaded between epochs.
Let us know if setting persistent_workers=True
helps, and if not, happy to discuss further. A look into your training script would be useful as well.
@snarayan21 we did try setting the persistent_workers=True
but this did not help. Attaching the graph fyr, there's always a 30 min drop after each epoch
@rishabhm12 Ah that's not good, mind sending over a version of your training script we can repro? Would love to get to the bottom of this. Also, I don't think your screenshot uploaded fully 😅
Thanks!
Same issue of utilisation here, I train on GCP using 4 nodes (n1-highmem-16) on GCP each with 2 (V100) GPUs ,
First 2 nodes are busy at 98-99% utilisation (non-stop) while the others fluctuate, so I assume the GPUs are waiting for data to be ready
GCP Vertex AI component uses GCSFuse treating the buckets as local file system, so I am not "streaming" the data
Batch size per device (GPU) is 64
what do you recommend? I was thinking using prefetch_factor
> 2 or persistent_workers=True
@miguelalba96 @rishabhm12 Can you make sure that the correct batch_size
is being passed to both StreamingDataset
and the DataLoader? This batch size should be per-device. If that's correct, then can you try setting the StreamingDataset
's predownload
parameter higher? All this while making sure that persistent_workers=True
in your DataLoader.
Any minimal repro script either of you could provide so we can debug this would be greatly appreciated!
I;m having the same issue too. @snarayan21 assigning persistant worker didn't help
@snarayan21 I am passing the local batch size to StreamingDatasets. I have shared the scripts with databricks team, they will get in touch with you
@snarayan21 @knighton Here is my code. I have only one source of stream: After each epoch I have around 10 mins of idle state. Then it resumes to giving batches. After some point it crashes. It happens on multi gpu
def clean_previous_database(local):
# try:
# shutil.rmtree(local)
# except:
# pass
clean_stale_shared_memory()
def make_get_batch(batch_size, remote, local):
setup_google_cloud()
clean_previous_database(local)
ds = StreamingDataset(remote=remote, local=local, batch_size=batch_size)
dl = DataLoader(ds, batch_size=batch_size, collate_fn=lambda x: x)
it = iter(dl)
def f():
return next(it)
def clean():
clean_previous_database(local)
return f, ds, clean, dl
class DataAcess():
def __init__(self, device_id, batch_size, dataset_name, table_name,persistent_workers=False,num_workers=0):
print(f'Data Access Loading Started on GPU {device_id}')
self.get_batch, self.ds, self.clean, self.dl = mk_get_batch(dataset_name, table_name,batch_size=batch_size)
# Define your dataloader:
# self.dl = DataLoader(self.ds, batch_size=batch_size, collate_fn=lambda x: x, persistent_workers=persistent_workers,num_workers=num_workers)
print(f'Data Access Loading Finished on GPU {device_id}')
def training_loop(dist, rank):
start_time = time.time()
device_id = rank % torch.cuda.device_count()
master_process = rank == 0
batch_size = 64
NUM_EPOCH = 10
tensor_max_len = 300
num_workers = 1
dataAccess = DataAcess(device_id,batch_size, dataset_name, table_name)
print(f' Data Access Loading Time: {time.time() - start_time} rank {rank} ')
start_time = time.time()
query_count = 0
for epoch in range(NUM_EPOCH):
with tqdm(desc = f'EPOCH {epoch} QPS on master GPU ', disable=not master_process, unit=" item") as pbar:
for batch in dataAccess.dl:
tensor = batch_to_tensor(batch, tensor_max_len).cuda(device_id)
query_count += tensor.size(0)
len_batch = tensor.size(0)
pbar.update(len_batch)
dataAccess.clean()
print(f'\nEPOCH {epoch} QPS on GPU {rank} : {query_count / (time.time() - start_time)}\n')
And I think wait between epochs is also batch_size dependent. The higher the batch_size, higher is the wait time
@rishabhm12 Yes, and it only happens on multi GPU. on single gpu it is ok
I'm using AMD ROCM rccl
@rishabhm12 @smilenaderi Are both of you using pre-processing functions for each sample/batch before training? Also, how big are your batch sizes and epoch sizes?
@miguelalba96 In the past we've seen that treating GCSFuse as "local" can be slow. Have you tried treating it as remote, or moving your data to local disk?
@snarayan21 yeah there's lite preprocessing happening, basically a lookup (O(1)) and converting np arr to torch tensors. local_batch_size in my case is 512 and global is 4096, I train for 14 epochs each epoch having ~24300 steps
You could log your actual disk read/write speed and see if your Dataloaders are IO bound, i had this issue as well when loading big images from local drive while also croping them (thowing alot of stuff away right away) while still wanting big batch sizes. Increesing Numbers of workers even decreesed read MB/Sec because (i guess) the head of my harddrive now had to jump between locations.
@miguelalba96 In the past we've seen that treating GCSFuse as "local" can be slow. Have you tried treating it as remote, or moving your data to local disk?
I transferred the data locally (1.1TB) to the instance and the problem of low GPU utilisation was solved. It's cheaper to pay the extra for TB of SSD storage than GPU hours
@miguelalba96 some things you could try, given that local disk works well instead of FUSE-mounted:
- increase prefetch factor on dataloader, and predownload on dataset
- Set remote to be the FUSE-mounted path and local to be on disk
let me know if that helps!
I am curious, what launchers are people using? I have reproduced the issue of low utilization between epochs when using TorchDistributor, but the issue goes away with the Composer launcher.
Hey @rishabhm12 @Matagi1996 @miguelalba96 @smilenaderi -- @XiaohanZhangCMU was able to root cause and fix the hangs between epochs. In internal testing, this has resolved inter-epoch hangs and has improved overall run throughput by 10-40%. We're going to cut a new Streaming release soon, but if you'd like to try it before then, just install Streaming from main!