accelerate icon indicating copy to clipboard operation
accelerate copied to clipboard

save_state not working correctly with FSDP on distributed setup

Open sam-hieken opened this issue 2 years ago • 8 comments

System Info

- `Accelerate` version: 0.18.0
- Platform: Linux-3.10.0-1160.76.1.el7.x86_64-x86_64-with-glibc2.17
- Python version: 3.9.12
- Numpy version: 1.22.4
- PyTorch version (GPU?): 2.0.0+cu117 (True)
- `Accelerate` default config:
        - compute_environment: LOCAL_MACHINE
        - distributed_type: FSDP
        - mixed_precision: fp16
        - use_cpu: False
        - num_processes: 2
        - machine_rank: 0
        - num_machines: 1
        - rdzv_backend: static
        - same_network: True
        - main_training_function: main
        - fsdp_config: {'fsdp_auto_wrap_policy': 'TRANSFORMER_BASED_WRAP', 'fsdp_backward_prefetch_policy': 'BACKWARD_PRE', 'fsdp_offload_params': False, 'fsdp_sharding_strategy': 2, 'fsdp_state_dict_type': 'FULL_STATE_DICT', 'fsdp_transformer_layer_cls_to_wrap': 'XLNetLayer'}
        - downcast_bf16: no
        - tpu_use_cluster: False
        - tpu_use_sudo: False
        - tpu_env: []

Information

  • [X] The official example scripts
  • [X] My own modified scripts

Tasks

  • [ ] One of the scripts in the examples/ folder of Accelerate or an officially supported no_trainer script in the examples folder of the transformers repo (such as run_no_trainer_glue.py)
  • [X] My own task or dataset (give details below)

Reproduction

Hello,

save_state() appears to be trying to save the same checkpoint twice when using FSDP, which leads to an error (I did check to ensure no checkpoints existed before running the script). The following script should fully reproduce the issue:

from torch.utils.data import DataLoader
from transformers import XLNetForSequenceClassification, XLNetTokenizerFast, get_scheduler
from accelerate import Accelerator, DistributedDataParallelKwargs
from accelerate.utils import ProjectConfiguration
from datasets import load_dataset
from torch.optim import AdamW

dataset = load_dataset("text", data_files="test.txt", split='train')

proj_conf = ProjectConfiguration(
        project_dir="./Checkpoints",
        automatic_checkpoint_naming=True,
        total_limit=3
)

ddp_kwargs = DistributedDataParallelKwargs(find_unused_parameters=True)
accelerator = Accelerator(kwargs_handlers=[ddp_kwargs], project_config=proj_conf)

model = XLNetForSequenceClassification.from_pretrained("xlnet-base-cased")
model = accelerator.prepare(model)

optimizer = AdamW(params=model.parameters(), lr=5e-5)
train_dl = DataLoader(dataset, shuffle=True, batch_size=2)

training_steps = len(train_dl)
scheduler = get_scheduler(
        "linear",
        optimizer=optimizer,
        num_warmup_steps=0,
        num_training_steps=training_steps
)

optimizer, train_dl, scheduler = accelerator.prepare(
        optimizer, train_dl, scheduler
)

accelerator.register_for_checkpointing(scheduler)

print("Saving the state of accelerator...")
accelerator.save_state()
print("Saved state.")

Where test.txt is just any text file.

The error is as follows:

Traceback (most recent call last):
  File "/home/hiekense/GPT2/accel-reprod.py", line 40, in <module>
    accelerator.save_state()
  File "/home/hiekense/.local/lib/python3.9/site-packages/accelerate/accelerator.py", line 2229, in save_state
    raise ValueError(
ValueError: Checkpoint directory ./Checkpoints/checkpoints/checkpoint_0 (0) already exists. Please manually override `self.save_iteration` with what iteration to start with.

Running the script with the same configuration above but without FSDP appeared successful.

Like I said above, I believe it's trying to save the same checkpoint twice, based on my error message, and the fact that the checkpoint is created after the script fails.

Expected behavior

Working save_state() function.

On a somewhat related note, I did try using

if accelerator.is_main_process:
        accelerator.save_state()

instead of

accelerator.save_state()

But as stated in #1171 this shouldn't be necessary, and just lead to the program blocking indefinitely at save_state()

Thank you.

sam-hieken avatar May 01 '23 01:05 sam-hieken

cc @pacman100

sgugger avatar May 01 '23 13:05 sgugger

Hello @sam-hieken, please refer to this: https://github.com/huggingface/accelerate/issues/1358#issuecomment-1523784839

As stated in that comment, the checkpointing tests are working. This is probably an issue with the ProjectConfiguration. @muellerzr, could you look into this?

Also @sam-hieken, before saving state make sure to add distributed barrier so that all processes reach that point before saving. Adding accelerator.wait_for_everyone() before accelerator.save() resolves the issue with FSDP. What is happening is that in normal Multi-GPU setup even without wait_for_everyone(), both processes execute savve_state at the same time, whereas in FSDP, one of the processes is reaching that point late (hence the need for wait_for_everyone). However, note that this has nothing to do with FSDP integration. Hope this helps

pacman100 avatar May 02 '23 05:05 pacman100

Thanks so much @pacman100, adding wait_for_everyone() before save_state() fixed my problem! I take it I should leave this open since there may be an issue with ProjectConfiguration?

sam-hieken avatar May 02 '23 20:05 sam-hieken

Hello @pacman100, is there any update on this?

It seems that save_state() is called multiple times also when running with distributed_type: MULTI_GPU. When automatic_checkpoint_naming=True, I got the following same error.

ValueError: Checkpoint directory . /output/checkpoints/checkpoint_0 (0) already exists. Please manually override `self.save_iteration` with what iteration to start with.

khokao avatar May 09 '23 15:05 khokao

Hello @khokao, does the above comment suggestion not resolve it https://github.com/huggingface/accelerate/issues/1374#issuecomment-1530917319?

As stated in that comment, the checkpointing tests are working. This is probably an issue with the ProjectConfiguration. @muellerzr, could you look into this?

As I said, this isn't an issue with FSDP integration

pacman100 avatar May 09 '23 15:05 pacman100

@pacman100 I added wait_for_everyone() before save_state(), but it still raises the same error.

khokao avatar May 09 '23 16:05 khokao

I've tried the experiment several times, but errors rarely occur, and in most cases, there seem to be no errors. It might be an issue with my server, so you don't need to address it!

Thank you very much for the swift reply! @pacman100

khokao avatar May 09 '23 16:05 khokao

This issue has been automatically marked as stale because it has not had recent activity. If you think this still needs to be addressed please comment on this thread.

Please note that issues that do not follow the contributing guidelines are likely to be ignored.

github-actions[bot] avatar Jun 03 '23 15:06 github-actions[bot]

@pacman100 @muellerzr I'm having this issue as well, even after trying the wait_for_everyone() fix. As mentioned before, this likely has nothing to do with the distributed_type and instead seems to be an issue with ProjectConfiguration. Below is my Accelerate config:

compute_environment: LOCAL_MACHINE
deepspeed_config:
  gradient_accumulation_steps: 16
  gradient_clipping: 1.0
  offload_optimizer_device: none
  offload_param_device: none
  zero3_init_flag: false
  zero_stage: 2
distributed_type: DEEPSPEED
downcast_bf16: 'no'
machine_rank: 0
main_training_function: main
mixed_precision: bf16
num_machines: 1
num_processes: 2
rdzv_backend: static
same_network: true
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false

My error is the following:

ValueError: Checkpoint directory . /output/checkpoints/checkpoint_0 (0) already exists. Please manually override `self.save_iteration` with what iteration to start with.

acnagle avatar Jul 06 '23 05:07 acnagle

Hello, a minimal reproducer for this would help @muellerzr deep dive into this wrt Project Configuration

pacman100 avatar Jul 06 '23 06:07 pacman100

Thanks for the reply @pacman100 . Here is a script for @muellerzr that reproduces the error with my above Accelerate config:

import os
import random

import numpy as np
import torch
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import SequentialLR, LinearLR, CosineAnnealingLR
from accelerate import Accelerator
from accelerate.utils import ProjectConfiguration
from datasets import load_dataset
from transformers import GPT2Tokenizer, GPT2Model


def seed_everything(seed):
	random.seed(seed)
	os.environ['PYTHONHASHSEED'] = str(seed)
	np.random.seed(seed)
	torch.manual_seed(seed)
	torch.cuda.manual_seed_all(seed)
	torch.backends.cudnn.deterministic = True
	torch.backends.cudnn.benchmark = False


tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
model = GPT2Model.from_pretrained('gpt2')

project_config = ProjectConfiguration(
    project_dir='./output',
    automatic_checkpoint_naming=True,
    total_limit=None,
)

accelerator = Accelerator(
    mixed_precision='bf16',
    gradient_accumulation_steps=16,
    log_with=None,
    project_config=project_config,
)

device = accelerator.device
seed_everything(1337)

dataset = load_dataset('wikitext', 'wikitext-103-v1', num_proc=12)
accelerator.print(dataset)

train_loader = DataLoader(
    dataset=dataset['train'],
    batch_size=16,
    shuffle=True,
    num_workers=8,
    pin_memory=True,
    drop_last=False,
)

val_loader = DataLoader(
    dataset=dataset['validation'],
    batch_size=16,
    shuffle=False,
    num_workers=8,
    pin_memory=True,
    drop_last=False,
)

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, betas=(0.9, 0.999), eps=1e-8, weight_decay=0.1, amsgrad=False)

warmup_iters = 100
max_iters = 1000
scheduler = SequentialLR(
    optimizer=optimizer,
    schedulers=[
        LinearLR(optimizer, start_factor=1e-8, end_factor=1.0, total_iters=warmup_iters),
        CosineAnnealingLR(optimizer, T_max=max_iters - warmup_iters),
    ],
    milestones=[warmup_iters],
)

model, optimizer, train_loader, val_loader, scheduler = accelerator.prepare(model, optimizer, train_loader, val_loader, scheduler)

accelerator.save_state()    # ValueError: Checkpoint directory ./output/checkpoints/checkpoint_0 (0) already exists. Please manually override `self.save_iteration` with what iteration to start with

acnagle avatar Jul 06 '23 17:07 acnagle

I'm also having this issue, and it's not resolved by calling wait_for_everyone() before save_state().

noahtopper avatar Jul 28 '23 20:07 noahtopper