streaming icon indicating copy to clipboard operation
streaming copied to clipboard

GPU utilisation drop between epochs

Open rishabhm12 opened this issue 10 months ago • 20 comments

  • Environment: gcp a2-ultragpu-8g (8 A100), torch==1.13.1, mosaicml-streaming[databricks]>=0.6,<0.7
  • Hardware (GPU, or instance type): 8 A100

Hi Team, I am training a model for retail usecase and I have three streaming data loaders which are interleaving (multitask setup). After the first epoch, I see gpu utilisation drop. How can I make the utilisation close to 100%? I observe the same behaviour if I load from local_disk too, by providing remote=None and local = local_path for all the three data loaders. When I just train with one data loader which loads from local_disk, then I do not observe this behaviour. Ideally, I want the data to be streamed from remote location (gcs in this case) with a util close to 100%. This leads to a lot of ideal time and high training cost. Screenshot 2024-03-29 at 7 56 24 PM

I also tried playing with the predownload param but there's no improvement as such .

rishabhm12 avatar Mar 29 '24 14:03 rishabhm12

Cycling/interleaving/etc multiple StreamingDatasets/StreamingDataLoaders has the potential to result in complicated situations when it comes to coordination.

Instead, why not just use Streams? The sample space of a StreamingDataset is the concatenation of one or more Streams, which correspond to a serialized Streaming dataset directory, i.e. they are sub-datasets.

(Well, technically, if you use StreamingDataset you are already using Streams, SD just creates a single Stream implicitly behind the scenes and hands various arguments off to it -- if you don't provide Streams explicitly.)

from streaming import Stream, StreamingDataset

first = Stream(local=..., remote=...)
second = Stream(local=..., remote=...)
dataset = StreamingDataset(streams=[first, second], batch_size=...)

knighton avatar Mar 29 '24 14:03 knighton

@rishabhm12 You should also make sure to set persistent_workers=True in the DataLoader so that workers are not shut down after each epoch, and the workers' dataset instances will stay alive. More info here

EDIT: using the Composer launcher instead of TorchDistributor and not setting persistent_workers=True seems to address the problem in my testing.

snarayan21 avatar Mar 29 '24 14:03 snarayan21

@knighton the same behaviour persists with even a single data loader when streaming from remote. @snarayan21 will keeping the workers alive completely solve for the util drop or will only slightly improve the downtime (from 30 mins to 25 mins), have you tried by changing the argument? I was of the opinion, this behaviour is when forward + back prop << time taken to download shards from remote to local.

rishabhm12 avatar Mar 29 '24 15:03 rishabhm12

@rishabhm12 This should solve the utilization drop if the issue was re-creating the worker StreamingDatasets. As I don't have your script, I don't know the exact improvement it will give you, but we've seen this in the past and it has addressed the problem. Also, since your job is continuing to run on the same set of GPUs/nodes, StreamingDataset is smart about partitioning shard downloads between epochs, so no shard files will have to be downloaded between epochs.

Let us know if setting persistent_workers=True helps, and if not, happy to discuss further. A look into your training script would be useful as well.

snarayan21 avatar Apr 01 '24 15:04 snarayan21

@snarayan21 we did try setting the persistent_workers=True but this did not help. Attaching the graph fyr, there's always a 30 min drop after each epoch Uploading Screenshot 2024-04-03 at 10.25.47 AM.png…

rishabhm12 avatar Apr 03 '24 04:04 rishabhm12

@rishabhm12 Ah that's not good, mind sending over a version of your training script we can repro? Would love to get to the bottom of this. Also, I don't think your screenshot uploaded fully 😅

Thanks!

snarayan21 avatar Apr 03 '24 17:04 snarayan21

Same issue of utilisation here, I train on GCP using 4 nodes (n1-highmem-16) on GCP each with 2 (V100) GPUs , image

First 2 nodes are busy at 98-99% utilisation (non-stop) while the others fluctuate, so I assume the GPUs are waiting for data to be ready

GCP Vertex AI component uses GCSFuse treating the buckets as local file system, so I am not "streaming" the data

Batch size per device (GPU) is 64

what do you recommend? I was thinking using prefetch_factor > 2 or persistent_workers=True

miguelalba96 avatar Apr 04 '24 13:04 miguelalba96

@miguelalba96 @rishabhm12 Can you make sure that the correct batch_size is being passed to both StreamingDataset and the DataLoader? This batch size should be per-device. If that's correct, then can you try setting the StreamingDataset's predownload parameter higher? All this while making sure that persistent_workers=True in your DataLoader.

Any minimal repro script either of you could provide so we can debug this would be greatly appreciated!

snarayan21 avatar Apr 04 '24 21:04 snarayan21

I;m having the same issue too. @snarayan21 assigning persistant worker didn't help

smilenaderi avatar Apr 05 '24 20:04 smilenaderi

@snarayan21 I am passing the local batch size to StreamingDatasets. I have shared the scripts with databricks team, they will get in touch with you

rishabhm12 avatar Apr 08 '24 05:04 rishabhm12

@snarayan21 @knighton Here is my code. I have only one source of stream: After each epoch I have around 10 mins of idle state. Then it resumes to giving batches. After some point it crashes. It happens on multi gpu

def clean_previous_database(local):
#     try:
#         shutil.rmtree(local)
#     except:
#         pass
    clean_stale_shared_memory()


def make_get_batch(batch_size, remote, local):
    setup_google_cloud()
    clean_previous_database(local)
    ds = StreamingDataset(remote=remote, local=local, batch_size=batch_size)
    dl = DataLoader(ds, batch_size=batch_size, collate_fn=lambda x: x)
    it = iter(dl)

    def f():
        return next(it)
    def clean():
        clean_previous_database(local)
    return f, ds, clean, dl
class DataAcess():

    def __init__(self, device_id, batch_size, dataset_name, table_name,persistent_workers=False,num_workers=0):
        print(f'Data Access Loading Started on GPU {device_id}')
        
        self.get_batch, self.ds, self.clean, self.dl = mk_get_batch(dataset_name, table_name,batch_size=batch_size)
#         Define your dataloader:
#         self.dl = DataLoader(self.ds, batch_size=batch_size, collate_fn=lambda x: x, persistent_workers=persistent_workers,num_workers=num_workers)
        print(f'Data Access Loading Finished on GPU {device_id}')


def training_loop(dist, rank):
    
    start_time = time.time()
    device_id = rank % torch.cuda.device_count()
    master_process = rank == 0
    batch_size = 64
    NUM_EPOCH = 10
    tensor_max_len = 300
    num_workers = 1
    
    dataAccess = DataAcess(device_id,batch_size, dataset_name, table_name)
    print(f' Data Access Loading Time: {time.time() - start_time} rank {rank} ')

    start_time = time.time()
    query_count = 0
    
    for epoch in range(NUM_EPOCH):
        with tqdm(desc = f'EPOCH {epoch} QPS on master GPU ', disable=not master_process, unit=" item") as pbar:
            
            for batch in dataAccess.dl:
                tensor = batch_to_tensor(batch, tensor_max_len).cuda(device_id)
                query_count += tensor.size(0)
                len_batch = tensor.size(0)
                pbar.update(len_batch)
            dataAccess.clean()
        print(f'\nEPOCH {epoch} QPS on GPU {rank}  : {query_count / (time.time() - start_time)}\n')

smilenaderi avatar Apr 08 '24 15:04 smilenaderi

And I think wait between epochs is also batch_size dependent. The higher the batch_size, higher is the wait time

rishabhm12 avatar Apr 08 '24 15:04 rishabhm12

@rishabhm12 Yes, and it only happens on multi GPU. on single gpu it is ok

I'm using AMD ROCM rccl

smilenaderi avatar Apr 08 '24 16:04 smilenaderi

@rishabhm12 @smilenaderi Are both of you using pre-processing functions for each sample/batch before training? Also, how big are your batch sizes and epoch sizes?

snarayan21 avatar Apr 08 '24 21:04 snarayan21

@miguelalba96 In the past we've seen that treating GCSFuse as "local" can be slow. Have you tried treating it as remote, or moving your data to local disk?

snarayan21 avatar Apr 09 '24 01:04 snarayan21

@snarayan21 yeah there's lite preprocessing happening, basically a lookup (O(1)) and converting np arr to torch tensors. local_batch_size in my case is 512 and global is 4096, I train for 14 epochs each epoch having ~24300 steps

rishabhm12 avatar Apr 09 '24 04:04 rishabhm12

You could log your actual disk read/write speed and see if your Dataloaders are IO bound, i had this issue as well when loading big images from local drive while also croping them (thowing alot of stuff away right away) while still wanting big batch sizes. Increesing Numbers of workers even decreesed read MB/Sec because (i guess) the head of my harddrive now had to jump between locations.

Matagi1996 avatar Apr 10 '24 06:04 Matagi1996

@miguelalba96 In the past we've seen that treating GCSFuse as "local" can be slow. Have you tried treating it as remote, or moving your data to local disk?

I transferred the data locally (1.1TB) to the instance and the problem of low GPU utilisation was solved. It's cheaper to pay the extra for TB of SSD storage than GPU hours

miguelalba96 avatar Apr 15 '24 06:04 miguelalba96

@miguelalba96 some things you could try, given that local disk works well instead of FUSE-mounted:

  • increase prefetch factor on dataloader, and predownload on dataset
  • Set remote to be the FUSE-mounted path and local to be on disk

let me know if that helps!

snarayan21 avatar Apr 17 '24 17:04 snarayan21

I am curious, what launchers are people using? I have reproduced the issue of low utilization between epochs when using TorchDistributor, but the issue goes away with the Composer launcher.

snarayan21 avatar Apr 17 '24 17:04 snarayan21

Hey @rishabhm12 @Matagi1996 @miguelalba96 @smilenaderi -- @XiaohanZhangCMU was able to root cause and fix the hangs between epochs. In internal testing, this has resolved inter-epoch hangs and has improved overall run throughput by 10-40%. We're going to cut a new Streaming release soon, but if you'd like to try it before then, just install Streaming from main!

snarayan21 avatar Aug 22 '24 18:08 snarayan21