accelerate
accelerate copied to clipboard
How to properly gather results of PartialState for inference on 4xGPUs
System Info
torch==2.2.0
transformers==4.37.2
accelerate==0.27.0
Information
- [X] The official example scripts
- [X] My own modified scripts
Tasks
- [ ] One of the scripts in the examples/ folder of Accelerate or an officially supported
no_trainer
script in theexamples
folder of thetransformers
repo (such asrun_no_trainer_glue.py
) - [X] My own task or dataset (give details below)
Reproduction
Hi, my question may look like stupid but I want to ask for clarification, because I didn't find it in documentation
I have 2 million documents to process with ner model. And also I have 4 GPU. I don't wanna write script with multiprocess and manually handle each gpu. I decided to try use accelerate.
# Assume there are two processes
from accelerate import PartialState
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
model = AutoModelForTokenClassification.from_pretrained('ner')
tokenizer = AutoTokenizer.from_pretrained('ner')
ner = pipeline('token-classification', model=model, tokenizer=tokenizer, aggregation_strategy="simple")
state = PartialState()
ner.to(state)
# here the list of the list, I wanna treat like a list of batches
data = [[{'text': 'text1', 'id': 1}, {'text': 'text2', 'id': 2}], [{'text': 'text3', 'id': 3}, {'text': 'text4', 'id': 4}] ]
results = []
with state.split_between_processes(data) as inputs:
output = ner([i['text'] for i in inputs], max_length=128)
for i, o in zip(inputs, outputs):
i['annotation'] = o
results.append(i)
And my question is: Am I properly gather results or it could be problems because its distributed between different process.
How to properly gather results when use split_between_processes
?
Expected behavior
Documentation will have more examples how to gather data.
Use accelerator.utils.gather
. We definitely could add this to the docs example.
@muellerzr can you give me the example how to use accelerator.utils.gather
with split_between_processes
context manager?
Something like so:
# Assume there are two processes
from accelerate import PartialState
from accelerate.utils import gather
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
model = AutoModelForTokenClassification.from_pretrained('ner')
tokenizer = AutoTokenizer.from_pretrained('ner')
ner = pipeline('token-classification', model=model, tokenizer=tokenizer, aggregation_strategy="simple")
state = PartialState()
ner.to(state)
# here the list of the list, I wanna treat like a list of batches
data = [[{'text': 'text1', 'id': 1}, {'text': 'text2', 'id': 2}], [{'text': 'text3', 'id': 3}, {'text': 'text4', 'id': 4}] ]
results = []
with state.split_between_processes(data) as inputs:
output = ner([i['text'] for i in inputs], max_length=128)
for i, o in zip(inputs, outputs):
i['annotation'] = o
results.append(i)
results = gather(results)
print(results)
@muellerzr Thank you! Also, have additional question, can I use tqdm
with split_between_processes
to see the progress?
@muellerzr What I should pass into gather? Because I get the error:
TypeError: Unsupported types (<class 'int'>) passed to `_gpu_gather_one`. Only nested list/tuple/dicts of objects that are valid for `is_torch_tensor` should be passed.
They should be tensors, ideally. Otherwise use the gather_object function instead
@muellerzr thanks for your support. Here is my complete script, which also includes progress bar. It works on 4 GPUs.
I think you can use this to improve documentation, or you could direct me to where I can contribute to it, I would be glad to help.
import json
import math
from accelerate import PartialState
from accelerate.utils import gather_object
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
from transformers.pipelines.pt_utils import KeyDataset
model = AutoModelForTokenClassification.from_pretrained('models/ner')
tokenizer = AutoTokenizer.from_pretrained('models/ner')
batch_size = 1024
# Create a PartialState object to manage the distributed execution
state = PartialState()
# Create the instance of pipeline and move it to the correct device
ner = pipeline('token-classification',
model=model,
tokenizer=tokenizer,
device=state.device,
aggregation_strategy="simple",
batch_size=batch_size)
# here the list of the dicts [{'id': 1, 'title': 'some title'}, ...] is loaded from the file
with open('data/data.json') as f:
data = json.load(f)
results = []
num_steps = math.ceil(len(data)/state.num_processes)
progress = tqdm(total=num_steps, disable=not state.is_local_main_process) # only show progress on the main process
with state.split_between_processes(data) as inputs:
outputs = ner(KeyDataset(inputs, key='title'))
for item, tags in zip(inputs, outputs):
# convert numpy.float32 to float
for word in tags:
word['score'] = word['score'].item()
item['annotation'] = tags
results.append(item)
progress.update(1)
# Wait for all processes to finish
state.wait_for_everyone()
# Gather the results
results = gather_object(results)
# Save the results
if state.is_main_process:
with open('data/results.json', 'w') as f:
json.dump(results, f)
You’re more than welcome to add it to the distributed inference tutorial, however it is strongly encouraged to work with tensors up until after calling gather() as gather_object does not work on all distributed compute type.
This issue has been automatically marked as stale because it has not had recent activity. If you think this still needs to be addressed please comment on this thread.
Please note that issues that do not follow the contributing guidelines are likely to be ignored.