DeepSpeed icon indicating copy to clipboard operation
DeepSpeed copied to clipboard

[BUG: Whisper model pipeline parallel training] logits and ground truth size mismatch during loss calculation

Open josephwong14wkh opened this issue 1 year ago • 0 comments

I am currently training whisper with large-v2 size using pipeline training with 2 gpus. I have my own custom dataset and i pass it to deepspeed for pipeline training. However, during the loss calculation, there is a size mismatch between logits and ground truth.

Traceback (most recent call last): File "model_parallel_trainer.py", line 133, in loss = model_engine.train_batch() File "/home/joseph/anaconda3/envs/whisper_venv/lib/python3.8/site-packages/deepspeed/runtime/pipe/engine.py", line 378, in train_batch self._exec_schedule(sched) File "/home/joseph/anaconda3/envs/whisper_venv/lib/python3.8/site-packages/deepspeed/runtime/pipe/engine.py", line 1434, in _exec_schedule self._exec_instr(**cmd.kwargs) File "/home/joseph/anaconda3/envs/whisper_venv/lib/python3.8/site-packages/deepspeed/runtime/pipe/engine.py", line 758, in _exec_forward_pass self.loss = self.module.loss_fn(outputs, labels) File "model_parallel_trainer.py", line 89, in cal_loss asr_loss = F.cross_entropy(_logits.transpose(1, 2), _gt) File "/home/joseph/anaconda3/envs/whisper_venv/lib/python3.8/site-packages/torch/nn/functional.py", line 3059, in cross_entropy return torch._C._nn.cross_entropy_loss(input, target, weight, _Reduction.get_enum(reduction), ignore_index, label_smoothing).

I am very sure that there is nothing wrong with my dataset as I can run the training with num_stages=1 (no pipeline training). Did anyone knwo what will be the cause for this error. Thank you! Here is my finetune script:

import argparse import whisper from whisper.tokenizer import get_tokenizer import deepspeed from deepspeed.pipe import PipelineModule from dataloader import get_dataloader import torch import torch.distributed as dist from dataclasses import asdict from tqdm import tqdm from dataloader import collate_fn_parallel import os import torch.nn.functional as F

os.environ['NCCL_P2P_DISABLE'] = "1" os.environ['NCCL_P2P_LEVEL'] = "PXB" tokenizer = get_tokenizer(multilingual=True, task="transcribe") def get_args(): parser = argparse.ArgumentParser(description="Fine-tune a Whisper model in model parallel mode (pipeline parallelism)") # Dataloader-related arguments parser.add_argument( "--local_rank", default=-1, help="ranking for the current process. This argument will be passed automatically using deepspeed cmd line", ) parser.add_argument( "--checkpoint", type=str, help="model checkpoint", ) parser.add_argument( "--train-json", type=str, help="training data (json file)", ) parser.add_argument( "--train-batch-size", type=int, default=8, help="batch size for training", ) parser.add_argument( "--dev-batch-size", type=int, default=32, help="batch size for training", ) parser.add_argument( "--train-steps", type=int, default=100, help="batch size for training", ) parser.add_argument( "--accum-grad-steps", type=int, default=4, help="number of steps to accumulate gradient", ) parser.add_argument( "--eval-steps", type=int, default=10, help="number of steps to evaluate", ) parser.add_argument( "--save-dir", type=str, required=True, help="directory to save the model checkpoint", ) parser.add_argument('--seed', type=int, default=1138, help='PRNG seed') parser = deepspeed.add_config_arguments(parser) args = parser.parse_args() return args

def cal_loss(logits, y_out): total_loss = 0 lid_loss = None asr_loss = None

lang_logit = logits[:, :1, :]
lang_gt = y_out[:, :1]
_logits = logits[:, 1:]
_gt = y_out[:, 1:]

lid_loss = F.cross_entropy(lang_logit.transpose(1, 2), lang_gt)
#lid_loss = 0
asr_loss = F.cross_entropy(_logits.transpose(1, 2), _gt)

total_loss = lid_loss + asr_loss
#print(f"loss: {total_loss}")
return total_loss

args = get_args() deepspeed.init_distributed(dist_backend="nccl") args.local_rank = int(os.environ['LOCAL_RANK']) torch.cuda.set_device(args.local_rank)

torch.manual_seed(args.seed) deepspeed.runtime.utils.set_random_seed(args.seed)

tokenizer = get_tokenizer(multilingual=True, task="transcribe")

Load model and wrap the model in PipelineModule class

model = whisper.load_model(args.checkpoint, ds_parallel=True) # placed in cpu net = PipelineModule(model.to_layers(), loss_fn=cal_loss, num_stages=2)

Dataloader preparation

trainset = get_dataloader( name="ds_dataset", json=args.train_json, tokenizer=tokenizer, fp16=False, ds_parallel=True, local_rank = args.local_rank ) model_engine, optimizer, train_loader, _ = deepspeed.initialize( args=args, model=net, model_parameters=[p for p in net.parameters() if p.requires_grad], training_data=trainset, collate_fn=collate_fn_parallel )

training loop

with torch.cuda.amp.autocast(cache_enabled=False): pbar = tqdm(range(1, args.train_steps+1)) for step in pbar: torch.cuda.empty_cache() optimizer.zero_grad() loss = model_engine.train_batch()

    if step % args.eval_steps == 0:
        model_to_save = model_engine
        ckpt_id = f"steps_{step}_loss_{loss}"
        data = {"model_state_dict": model_to_save.state_dict(), "dims": asdict(model.dims)}
        torch.save(data, f"{args.save_dir}/{ckpt_id}.pt")

print(f"success")

Config file: { "train_batch_size" : 2, # small batch size for testing "train_micro_batch_size_per_gpu" : 1,

"fp16": {
  "enabled": true
},

"optimizer": {
  "type": "Adam",
  "params": {
    "torch_adam" : true,
    "lr": 0.001,
    "betas": [
      0.9,
      0.999
    ],
    "eps": 1e-8
  }
},

"zero_optimization": {
  "stage": 1,
  "reduce_bucket_size": 2e8
},
"steps_per_print" : 1,
"wall_clock_breakdown" : false

}

josephwong14wkh avatar Apr 18 '24 11:04 josephwong14wkh