[BUG: Whisper model pipeline parallel training] logits and ground truth size mismatch during loss calculation
I am currently training whisper with large-v2 size using pipeline training with 2 gpus. I have my own custom dataset and i pass it to deepspeed for pipeline training. However, during the loss calculation, there is a size mismatch between logits and ground truth.
Traceback (most recent call last):
File "model_parallel_trainer.py", line 133, in
I am very sure that there is nothing wrong with my dataset as I can run the training with num_stages=1 (no pipeline training). Did anyone knwo what will be the cause for this error. Thank you! Here is my finetune script:
import argparse import whisper from whisper.tokenizer import get_tokenizer import deepspeed from deepspeed.pipe import PipelineModule from dataloader import get_dataloader import torch import torch.distributed as dist from dataclasses import asdict from tqdm import tqdm from dataloader import collate_fn_parallel import os import torch.nn.functional as F
os.environ['NCCL_P2P_DISABLE'] = "1" os.environ['NCCL_P2P_LEVEL'] = "PXB" tokenizer = get_tokenizer(multilingual=True, task="transcribe") def get_args(): parser = argparse.ArgumentParser(description="Fine-tune a Whisper model in model parallel mode (pipeline parallelism)") # Dataloader-related arguments parser.add_argument( "--local_rank", default=-1, help="ranking for the current process. This argument will be passed automatically using deepspeed cmd line", ) parser.add_argument( "--checkpoint", type=str, help="model checkpoint", ) parser.add_argument( "--train-json", type=str, help="training data (json file)", ) parser.add_argument( "--train-batch-size", type=int, default=8, help="batch size for training", ) parser.add_argument( "--dev-batch-size", type=int, default=32, help="batch size for training", ) parser.add_argument( "--train-steps", type=int, default=100, help="batch size for training", ) parser.add_argument( "--accum-grad-steps", type=int, default=4, help="number of steps to accumulate gradient", ) parser.add_argument( "--eval-steps", type=int, default=10, help="number of steps to evaluate", ) parser.add_argument( "--save-dir", type=str, required=True, help="directory to save the model checkpoint", ) parser.add_argument('--seed', type=int, default=1138, help='PRNG seed') parser = deepspeed.add_config_arguments(parser) args = parser.parse_args() return args
def cal_loss(logits, y_out): total_loss = 0 lid_loss = None asr_loss = None
lang_logit = logits[:, :1, :]
lang_gt = y_out[:, :1]
_logits = logits[:, 1:]
_gt = y_out[:, 1:]
lid_loss = F.cross_entropy(lang_logit.transpose(1, 2), lang_gt)
#lid_loss = 0
asr_loss = F.cross_entropy(_logits.transpose(1, 2), _gt)
total_loss = lid_loss + asr_loss
#print(f"loss: {total_loss}")
return total_loss
args = get_args() deepspeed.init_distributed(dist_backend="nccl") args.local_rank = int(os.environ['LOCAL_RANK']) torch.cuda.set_device(args.local_rank)
torch.manual_seed(args.seed) deepspeed.runtime.utils.set_random_seed(args.seed)
tokenizer = get_tokenizer(multilingual=True, task="transcribe")
Load model and wrap the model in PipelineModule class
model = whisper.load_model(args.checkpoint, ds_parallel=True) # placed in cpu net = PipelineModule(model.to_layers(), loss_fn=cal_loss, num_stages=2)
Dataloader preparation
trainset = get_dataloader( name="ds_dataset", json=args.train_json, tokenizer=tokenizer, fp16=False, ds_parallel=True, local_rank = args.local_rank ) model_engine, optimizer, train_loader, _ = deepspeed.initialize( args=args, model=net, model_parameters=[p for p in net.parameters() if p.requires_grad], training_data=trainset, collate_fn=collate_fn_parallel )
training loop
with torch.cuda.amp.autocast(cache_enabled=False): pbar = tqdm(range(1, args.train_steps+1)) for step in pbar: torch.cuda.empty_cache() optimizer.zero_grad() loss = model_engine.train_batch()
if step % args.eval_steps == 0:
model_to_save = model_engine
ckpt_id = f"steps_{step}_loss_{loss}"
data = {"model_state_dict": model_to_save.state_dict(), "dims": asdict(model.dims)}
torch.save(data, f"{args.save_dir}/{ckpt_id}.pt")
print(f"success")
Config file: { "train_batch_size" : 2, # small batch size for testing "train_micro_batch_size_per_gpu" : 1,
"fp16": {
"enabled": true
},
"optimizer": {
"type": "Adam",
"params": {
"torch_adam" : true,
"lr": 0.001,
"betas": [
0.9,
0.999
],
"eps": 1e-8
}
},
"zero_optimization": {
"stage": 1,
"reduce_bucket_size": 2e8
},
"steps_per_print" : 1,
"wall_clock_breakdown" : false
}