accelerate
accelerate copied to clipboard
Fine-tuning only doesn't work with "basic" distributed settings
System Info
- `Accelerate` version: 0.25.0
- Platform: Linux-6.5.0-18-generic-x86_64-with-glibc2.35
- Python version: 3.11.5
- Numpy version: 1.26.3
- PyTorch version (GPU?): 2.1.2 (True)
- PyTorch XPU available: False
- PyTorch NPU available: False
- System RAM: 31.25 GB
- GPU type: NVIDIA GeForce RTX 3090 (2 of them)
Information
- [ ] The official example scripts
- [X] My own modified scripts
Tasks
- [ ] One of the scripts in the examples/ folder of Accelerate or an officially supported
no_trainer
script in theexamples
folder of thetransformers
repo (such asrun_no_trainer_glue.py
) - [X] My own task or dataset (give details below)
Reproduction
Accelerate works when I use non-distributed training, any DeepSpeed, and FSDP. It does not, however, work with just selecting multi-gpu and putting all settings to default. They seem to be running out of VRAM, even though there should be PLENTY of space. Here are the yaml config files that worked/didn't work... followed by the code and the error statement. I tried it with and without NCCL_P2P_DISABLE=1
to see if that changed anything but to no avail. Also, jeez is running it solo so much fast haha. I'd love to find what the issue is. I don't seem to be using up all my CPU ram or processing power- and running it solo doesn't even use half of what I need according to nvidia-smi
and accelerate estimate-memory
with TinyLlama.
non-distributed (works)
compute_environment: LOCAL_MACHINE
debug: false
distributed_type: 'NO'
downcast_bf16: 'no'
gpu_ids: all
machine_rank: 0
main_training_function: main
mixed_precision: bf16
num_machines: 1
num_processes: 1
rdzv_backend: static
same_network: true
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false
base-distrubuted (doesn't work)
compute_environment: LOCAL_MACHINE
debug: false
distributed_type: MULTI_GPU
downcast_bf16: 'no'
gpu_ids: all
machine_rank: 0
main_training_function: main
mixed_precision: bf16
num_machines: 1
num_processes: 2
rdzv_backend: static
same_network: true
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false
0 DeepSpeed ZeRO (works)
compute_environment: LOCAL_MACHINE
debug: false
deepspeed_config:
gradient_accumulation_steps: 1
zero3_init_flag: false
zero_stage: 0
distributed_type: DEEPSPEED
downcast_bf16: 'no'
machine_rank: 0
main_training_function: main
mixed_precision: bf16
num_machines: 1
num_processes: 2
rdzv_backend: static
same_network: true
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false
FSDP (works)
compute_environment: LOCAL_MACHINE
debug: false
distributed_type: FSDP
downcast_bf16: 'no'
fsdp_config:
fsdp_auto_wrap_policy: NO_WRAP
fsdp_backward_prefetch_policy: BACKWARD_PRE
fsdp_cpu_ram_efficient_loading: true
fsdp_forward_prefetch: false
fsdp_offload_params: false
fsdp_sharding_strategy: 2
fsdp_state_dict_type: SHARDED_STATE_DICT
fsdp_sync_module_states: true
fsdp_use_orig_params: true
machine_rank: 0
main_training_function: main
mixed_precision: bf16
num_machines: 1
num_processes: 2
rdzv_backend: static
same_network: true
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false
Here's the code.
import argparse
from time import time
import torch
from accelerate import Accelerator
from datasets import Dataset
from torch.optim import AdamW
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
get_linear_schedule_with_warmup,
set_seed,
)
# This allows adjusting training arguments without needing to change the code
def parse_args():
parser = argparse.ArgumentParser(description="Training script arguments.")
parser.add_argument("--batch_size", type=int, default=1,
help="Batch size for training.")
parser.add_argument("--mixed_precision", type=str,
default="bf16", help="Mixed precision type.")
parser.add_argument("--lr", type=float, default=5e-5,
help="Learning rate.")
parser.add_argument("--num_epochs", type=int, default=3,
help="Number of training epochs.")
parser.add_argument("--seed", type=int, default=None, help="Random seed.")
parser.add_argument("--num_warmup_steps", type=int,
default=100, help="Number of warm-up steps.")
parser.add_argument("--num_processes", type=int,
default=2, help="Number of gpus to use.")
parser.add_argument("--model_name", type=str,
default="TinyLlama/TinyLlama-1.1B-Chat-v1.0", help="Model to use.")
parser.add_argument("--data_location", type=str,
default="examples/preprocessed_data.json", help="File location for data.")
parser.add_argument("--save_location", type=str,
default="saved_1000", help="File location for data.")
parser.add_argument("--gradient_accumulation_steps",
type=int, default=1, help="Gradient accumulation steps.")
return parser.parse_args()
def process_dataset(json_file, tokenizer):
ds = Dataset.from_json(json_file)
def transform_example(example):
# Construct system message
system_message = f"Consult ID: {example['CONSULTID']}. Patient's age: {example['AGE_AT_CONSULT']}. Gender: {example['GENDER']}. Diagnosis Code: {example['DIAGNOSIS_CODE']}."
# Construct messages in the required format
messages = [
{"role": "system", "content": system_message},
{"role": "user", "content": example["PCP_MESSAGE"]},
{"role": "assistant", "content": example["SR_MESSAGE"]}
]
return messages
ds = ds.map(lambda x: {"formatted_chat": tokenizer.apply_chat_template(transform_example(x), tokenize=False, add_generation_prompt=False)})
return ds
def get_dataloaders(accelerator: Accelerator, batch_size, model_name, data_location, save_location):
# 1. Initialize tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
# 2. Convert JSON to readable dataset
with accelerator.main_process_first():
dataset = process_dataset(data_location, tokenizer)
accelerator.print(dataset["formatted_chat"][0])
def tokenize_function(examples):
# Tokenize, pad and truncate the 'formatted_chat' content
return tokenizer(examples["formatted_chat"], padding="max_length", truncation=True, max_length=128)
with accelerator.main_process_first():
tokenized_dataset = dataset.map(tokenize_function, batched=True)
tokenized_dataset.set_format(
"torch", columns=["input_ids", "attention_mask"])
# 4
split_datasets = tokenized_dataset.train_test_split(test_size=0.2)
tokenized_train_dataset = split_datasets["train"]
tokenized_eval_dataset = split_datasets["test"]
if accelerator.is_main_process:
print("saving tokenizer")
# Saving the tokenizer
tokenizer.save_pretrained(save_location)
print("saved tokenizer")
# 5
train_sampler = DistributedSampler(
tokenized_train_dataset, num_replicas=accelerator.num_processes, rank=accelerator.process_index, shuffle=True
)
eval_sampler = DistributedSampler(
tokenized_eval_dataset, num_replicas=accelerator.num_processes, rank=accelerator.process_index, shuffle=False
)
# 6
train_dataloader = DataLoader(
tokenized_train_dataset,
batch_size=batch_size,
drop_last=True,
sampler=train_sampler
)
eval_dataloader = DataLoader(
tokenized_eval_dataset,
batch_size=batch_size*2,
drop_last=(accelerator.mixed_precision == "fp8"),
sampler=eval_sampler
)
accelerator.print("returning dataloaders")
return train_dataloader, eval_dataloader
# 1. Initialize accelerator with mixed percision and define training parameters via arguments given in command line
# 2. Sets seed (if given as a command line argument) for reproducability
# 3. Get dataloaders
# 4. Initialize more training perameters and "prepare"/optimize them via Accelerate
# 5. Train/fine-tune model with new data & set parameters using FSDP
# 6. Evaluate quality of trainer for that epoch
# 7. Have the first GPU save the newly fine-tuned dataset
def training_function(args):
# 1
accelerator = Accelerator(mixed_precision=args.mixed_precision,
gradient_accumulation_steps=args.gradient_accumulation_steps)
accelerator.print("set acceleraror")
lr = args.lr
num_epochs = args.num_epochs
batch_size = args.batch_size
num_warmup_steps = args.num_warmup_steps
# 2
if args.seed:
set_seed(args.seed)
# 3
train_dataloader, eval_dataloader = get_dataloaders(
accelerator, batch_size, args.model_name, args.data_location, args.save_location)
accelerator.print("set dataloaders")
# 4
# Instantiate the model (we build the model here so that the seed also control new weights initialization)
model = AutoModelForCausalLM.from_pretrained(args.model_name)
# model = accelerator.prepare(model)
accelerator.print("set model")
optimizer = AdamW(params=model.parameters(), lr=lr)
accelerator.print("set optimizer")
# Instantiate scheduler
lr_scheduler = get_linear_schedule_with_warmup(
optimizer=optimizer,
num_warmup_steps=num_warmup_steps,
num_training_steps=(len(train_dataloader) *
num_epochs) // args.gradient_accumulation_steps
)
accelerator.print("set lr_scheduler")
# Prepare everything
# There is no specific order to remember, we just need to unpack the objects in the same order we gave them to the
# prepare method.
accelerator.wait_for_everyone()
accelerator.print("preparing!")
model, optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare(
model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
)
accelerator.print("preprared stuff")
# Initialize logging variables
total_train_loss = 0
total_eval_loss = 0
# 5
# Now we train the model
for epoch in range(num_epochs):
accelerator.print("training")
model.train()
total_train_loss = 0
for batch in tqdm(train_dataloader, desc="Training"):
with accelerator.accumulate(model):
# Process the batch
inputs = {k: v.to(accelerator.device)
for k, v in batch.items()}
if "labels" not in inputs:
inputs["labels"] = inputs["input_ids"]
outputs = model(**inputs)
loss = outputs.loss
total_train_loss += loss.item()
accelerator.backward(loss)
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()
accelerator.wait_for_everyone()
# 6
# Evaluation loop after each training epoch
model.eval()
total_eval_loss = 0
for batch in tqdm(eval_dataloader, "Evaluating"):
with torch.no_grad():
inputs = {k: v.to(accelerator.device)
for k, v in batch.items()}
if "labels" not in inputs:
inputs["labels"] = inputs["input_ids"]
outputs = model(**inputs)
loss = outputs.loss
total_eval_loss += loss.item()
accelerator.wait_for_everyone()
# # Log the average losses
avg_train_loss = total_train_loss / len(train_dataloader)
avg_eval_loss = total_eval_loss / len(eval_dataloader)
print(
f"Epoch: {epoch}, Average Training Loss: {avg_train_loss}, Average Evaluation Loss: {avg_eval_loss}")
accelerator.wait_for_everyone()
# 7
accelerator.wait_for_everyone()
accelerator.print("saving")
accelerator.unwrap_model(model).save_pretrained(
args.save_location,
is_main_process=accelerator.is_main_process,
save_function=accelerator.save,
state_dict=accelerator.get_state_dict(model),
)
def main():
args = parse_args()
training_function(args)
if __name__ == "__main__":
start = time()
main()
print(f"Total Execution Time: {time() - start} seconds")
I'd run it via
$ accelerate launch file.py --num_processes 1 # or 2 depending on situation
Here's an example of my example/preprocessed_data.json
(not real data)
[
{
"CONSULTID": "61110688",
"TAR_STATUS_NAME": "Closed",
"CODE_ID": "108",
"CODE_DESC": "Cancelled",
"STATUS": "02.Cancelled",
"YEAR_CREATED": "2023",
"SUBMIT_TO_RESPOND": "3.17",
"SUBMIT_TO_CLOSE": "30.06",
"SPECIALTY_NAME": "GASTROENTEROLOGY - ADULT",
"GENDER": "M",
"AGE_AT_CONSULT": "69",
"CREATED": "2023-01-03T12:15:16",
"DOB": "1953-05-01",
"PCP_NAME": "Armen Babaian",
"SR_NAME": "James Tabibian",
"ORG_NAME": "AAA - OVM Medi-Cal Ineligible Over 50",
"ORG_TYPE": "OTHER",
"DIAGNOSIS_CODE": "Z12.11",
"CATEGORY_NAME": "Medicine/Non-Surg",
"SUBCATEGORY_NAME": "GI",
"PCP_MESSAGE": "Hi James, I have a patient with chronic constipation who has failed medical management. What are your recommendations?",
"TQ_HEADER": "Clinical question",
"SR_MESSAGE": "Hi Armen, thanks for your message. I would recommend you referring your patient to a gastroenterologist for further evaluation and treatment. They may need additional tests, such as a colonoscopy or endoscopy, to determine the cause of their constipation. Additionally, I recommend you discuss with your patient about dietary and lifestyle changes that may help relieve their symptoms."
},
{
"CONSULTID": "61110688",
"TAR_STATUS_NAME": "Closed",
"CODE_ID": "108",
"CODE_DESC": "Cancelled",
"STATUS": "02.Cancelled",
"YEAR_CREATED": "2023",
"SUBMIT_TO_RESPOND": "3.17",
"SUBMIT_TO_CLOSE": "30.06",
"SPECIALTY_NAME": "GASTROENTEROLOGY - ADULT",
"GENDER": "M",
"AGE_AT_CONSULT": "69",
"CREATED": "2023-01-03T12:15:16",
"DOB": "1953-05-01",
"PCP_NAME": "Armen Babaian",
"SR_NAME": "James Tabibian",
"ORG_NAME": "AAA - OVM Medi-Cal Ineligible Over 50",
"ORG_TYPE": "OTHER",
"DIAGNOSIS_CODE": "Z12.11",
"CATEGORY_NAME": "Medicine/Non-Surg",
"SUBCATEGORY_NAME": "GI",
"PCP_MESSAGE": "Hi James, I have a patient with chronic constipation who has failed medical management. What are your recommendations?",
"TQ_HEADER": "Clinical question",
"SR_MESSAGE": "Hi Armen, thanks for your message. I would recommend you referring your patient to a gastroenterologist for further evaluation and treatment. They may need additional tests, such as a colonoscopy or endoscopy, to determine the cause of their constipation. Additionally, I recommend you discuss with your patient about dietary and lifestyle changes that may help relieve their symptoms."
},
...
]
Expected behavior
.