accelerate icon indicating copy to clipboard operation
accelerate copied to clipboard

How to properly gather results of PartialState for inference on 4xGPUs

Open ZeusFSX opened this issue 1 year ago • 9 comments

System Info

torch==2.2.0
transformers==4.37.2
accelerate==0.27.0

Information

  • [X] The official example scripts
  • [X] My own modified scripts

Tasks

  • [ ] One of the scripts in the examples/ folder of Accelerate or an officially supported no_trainer script in the examples folder of the transformers repo (such as run_no_trainer_glue.py)
  • [X] My own task or dataset (give details below)

Reproduction

Hi, my question may look like stupid but I want to ask for clarification, because I didn't find it in documentation

I have 2 million documents to process with ner model. And also I have 4 GPU. I don't wanna write script with multiprocess and manually handle each gpu. I decided to try use accelerate.

# Assume there are two processes
from accelerate import PartialState
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline

model = AutoModelForTokenClassification.from_pretrained('ner')
tokenizer = AutoTokenizer.from_pretrained('ner')

ner = pipeline('token-classification', model=model, tokenizer=tokenizer, aggregation_strategy="simple")

state = PartialState()
ner.to(state)

# here the list of the list,  I wanna treat like a list of batches
data = [[{'text': 'text1', 'id': 1}, {'text': 'text2', 'id': 2}], [{'text': 'text3', 'id': 3}, {'text': 'text4', 'id': 4}] ]  

results = []
with state.split_between_processes(data) as inputs:
    output = ner([i['text'] for i in inputs], max_length=128)
    
    for i, o in zip(inputs, outputs):
        i['annotation'] = o
        results.append(i)

And my question is: Am I properly gather results or it could be problems because its distributed between different process.

How to properly gather results when use split_between_processes?

Expected behavior

Documentation will have more examples how to gather data.

ZeusFSX avatar Feb 13 '24 14:02 ZeusFSX

Use accelerator.utils.gather. We definitely could add this to the docs example.

muellerzr avatar Feb 13 '24 14:02 muellerzr

@muellerzr can you give me the example how to use accelerator.utils.gather with split_between_processes context manager?

ZeusFSX avatar Feb 13 '24 14:02 ZeusFSX

Something like so:

# Assume there are two processes
from accelerate import PartialState
from accelerate.utils import gather
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline

model = AutoModelForTokenClassification.from_pretrained('ner')
tokenizer = AutoTokenizer.from_pretrained('ner')

ner = pipeline('token-classification', model=model, tokenizer=tokenizer, aggregation_strategy="simple")

state = PartialState()
ner.to(state)

# here the list of the list,  I wanna treat like a list of batches
data = [[{'text': 'text1', 'id': 1}, {'text': 'text2', 'id': 2}], [{'text': 'text3', 'id': 3}, {'text': 'text4', 'id': 4}] ]  

results = []
with state.split_between_processes(data) as inputs:
    output = ner([i['text'] for i in inputs], max_length=128)
    
    for i, o in zip(inputs, outputs):
        i['annotation'] = o
        results.append(i)
results = gather(results)
print(results)

muellerzr avatar Feb 13 '24 14:02 muellerzr

@muellerzr Thank you! Also, have additional question, can I use tqdm with split_between_processes to see the progress?

ZeusFSX avatar Feb 13 '24 14:02 ZeusFSX

@muellerzr What I should pass into gather? Because I get the error:

TypeError: Unsupported types (<class 'int'>) passed to `_gpu_gather_one`. Only nested list/tuple/dicts of objects that are valid for `is_torch_tensor` should be passed.

ZeusFSX avatar Feb 14 '24 09:02 ZeusFSX

They should be tensors, ideally. Otherwise use the gather_object function instead

muellerzr avatar Feb 14 '24 10:02 muellerzr

@muellerzr thanks for your support. Here is my complete script, which also includes progress bar. It works on 4 GPUs.

I think you can use this to improve documentation, or you could direct me to where I can contribute to it, I would be glad to help.

import json
import math

from accelerate import PartialState
from accelerate.utils import gather_object
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
from transformers.pipelines.pt_utils import KeyDataset

model = AutoModelForTokenClassification.from_pretrained('models/ner')
tokenizer = AutoTokenizer.from_pretrained('models/ner')

batch_size = 1024

# Create a PartialState object to manage the distributed execution
state = PartialState()

# Create the instance of pipeline and move it to the correct device
ner = pipeline('token-classification',
               model=model,
               tokenizer=tokenizer,
               device=state.device,
               aggregation_strategy="simple",
               batch_size=batch_size)

# here the list of the dicts [{'id': 1, 'title': 'some title'}, ...] is loaded from the file
with open('data/data.json') as f:
    data = json.load(f)

results = []
num_steps = math.ceil(len(data)/state.num_processes)
progress = tqdm(total=num_steps, disable=not state.is_local_main_process)  # only show progress on the main process

with state.split_between_processes(data) as inputs:
    outputs = ner(KeyDataset(inputs, key='title'))
    for item, tags in zip(inputs, outputs):
        # convert numpy.float32 to float
        for word in tags:
            word['score'] = word['score'].item()

        item['annotation'] = tags
        results.append(item)
        progress.update(1)

# Wait for all processes to finish
state.wait_for_everyone()
# Gather the results
results = gather_object(results)

# Save the results
if state.is_main_process:
    with open('data/results.json', 'w') as f:
        json.dump(results, f)

ZeusFSX avatar Feb 14 '24 11:02 ZeusFSX

You’re more than welcome to add it to the distributed inference tutorial, however it is strongly encouraged to work with tensors up until after calling gather() as gather_object does not work on all distributed compute type.

muellerzr avatar Feb 14 '24 11:02 muellerzr

This issue has been automatically marked as stale because it has not had recent activity. If you think this still needs to be addressed please comment on this thread.

Please note that issues that do not follow the contributing guidelines are likely to be ignored.

github-actions[bot] avatar Mar 14 '24 15:03 github-actions[bot]