distilabel
distilabel copied to clipboard
Questions about threads
After I successfully ran the pipeline once, I can no longer reproduce my code, even if I changed my name, entered data and related parameters, and reported the following error. What may be the cause?
EOFError
Exception in thread Thread-1 (_monitor):
Traceback (most recent call last):
File "/root/miniconda3/envs/datagen/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File "/root/miniconda3/envs/datagen/lib/python3.10/threading.py", line 953, in run
self._target(*self._args, **self._kwargs)
File "/root/miniconda3/envs/datagen/lib/python3.10/logging/handlers.py", line 1556, in _monitor
record = self.dequeue(True)
File "/root/miniconda3/envs/datagen/lib/python3.10/logging/handlers.py", line 1505, in dequeue
return self.queue.get(block)
File "/root/miniconda3/envs/datagen/lib/python3.10/multiprocessing/queues.py", line 103, in get
res = self._recv_bytes()
File "/root/miniconda3/envs/datagen/lib/python3.10/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/root/miniconda3/envs/datagen/lib/python3.10/multiprocessing/connection.py", line 414, in _recv_bytes
buf = self._recv(4)
File "/root/miniconda3/envs/datagen/lib/python3.10/multiprocessing/connection.py", line 383, in _recv
raise EOFError
EOFError
/root/miniconda3/envs/datagen/lib/python3.10/multiprocessing/resource_tracker.py:224: UserWarning: resource_tracker: There appear to be 3 leaked semaphore objects to clean up at shutdown
warnings.warn('resource_tracker: There appear to be %d '
Hi @YueWu0301, could you share the code of your pipeline?
Hi @gabrielmbmb, I face the same problem. The code is as follows:
import json
import os
import pdb
import openai
from distilabel.llms import AzureOpenAILLM, OpenAILLM, vLLM
from distilabel.llms.mistral import MistralLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import (
CombineColumns,
KeepColumns,
LoadDataFromDicts,
LoadHubDataset,
PreferenceToArgilla,
TextGenerationToArgilla,
)
from distilabel.steps.tasks import TextGeneration, UltraFeedback
from distilabel.steps.tasks.text_generation import TextGeneration
from dotenv import load_dotenv
load_dotenv()
def read_jsonl_file(file_path):
"""
Reads a .jsonl file where each line is a separate JSON object, and returns a list of dictionaries.
:param file_path: str - The path to the .jsonl file.
:return: list - A list containing dictionaries, each representing a JSON object from the file.
"""
data = []
try:
with open(file_path, "r") as file:
for line in file:
json_object = json.loads(line.strip())
json_object["instruction"] = json_object.pop("question")
json_object["generations"] = json_object.pop("answer")
data.append(json_object)
except FileNotFoundError:
print(f"Error: The file '{file_path}' does not exist.")
except json.JSONDecodeError:
print(f"Error: The file '{file_path}' contains invalid JSON.")
except Exception as e:
print(f"An unexpected error occurred: {e}")
return data
llm = AzureOpenAILLM(
model=os.getenv("api_engine_gpt4"),
base_url=os.getenv("api_base_gpt4"),
api_key=os.getenv("api_key_gpt4"),
api_version=os.getenv("api_version"),
)
with Pipeline(name="ultrafeedback-pipeline") as pipeline:
data = read_jsonl_file(
"data.json"
)
load_hub_dataset = LoadDataFromDicts(
name="load_data",
data=data,
batch_size=1,
)
ultrafeedback = UltraFeedback(
name="ultrafeedback_overall_rating",
llm=llm,
aspect="overall-rating",
output_mappings={"model_name": "ultrafeedback_model"},
)
load_hub_dataset.connect(ultrafeedback)
dataset = pipeline.run(
parameters={
"ultrafeedback_overall_rating": {
"generation_kwargs": {
"max_new_tokens": 1024,
"temperature": 0.7,
},
},
}
)
Hi, I think the issue might be caused because the run
method is not being called within an if __name__ == "__main__":
block. Could you try to update your script and check if you still have the error?
Hi @gabrielmbmb, I'm currently trying to modify it as follows:
import json
import os
import pdb
import openai
from distilabel.llms import AzureOpenAILLM, OpenAILLM, vLLM
from distilabel.llms.mistral import MistralLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import (
CombineColumns,
KeepColumns,
LoadDataFromDicts,
LoadHubDataset,
PreferenceToArgilla,
TextGenerationToArgilla,
)
from distilabel.steps.tasks import TextGeneration, UltraFeedback
from distilabel.steps.tasks.text_generation import TextGeneration
from dotenv import load_dotenv
import pandas as pd
load_dotenv()
def location_extraction(article):
system_prompt = """
You are an advanced Named Entity Recognition (NER) system specializing in disease-related information.
Task: Identify geographical locations from a given list of entities.
Instruction:
- Focus on identifying specific and recognized geographical locations in each paragraph.
- LOCATION: Extract names of countries, cities, regions, and towns. Do not include vague or non-specific locations.
- Present your findings for each entity in a clear, line-separated format. If an entity value includes a list or multiple components, separate these so that each item appears on its own line.
Example Output Format:
- LOCATION: Mexico
- LOCATION: Vietnam
"""
prompt = f"""
Article Content:
----------------
{article}
Analysis Task:
--------------
Please analyze the above article for the specified entities. If certain entities, like dates or locations, are not mentioned, indicate this by stating 'Not mentioned'. For example, 'DATE: Not mentioned'.
"""
return system_prompt + prompt
llm = AzureOpenAILLM(
model=os.getenv("api_engine_gpt4"),
base_url=os.getenv("api_base_gpt4"),
api_key=os.getenv("api_key_gpt4"),
api_version=os.getenv("api_version"),
)
with Pipeline(name="ultrafeedback-pipeline") as pipeline:
df=pd.read_csv('/g/data/ue03/duongd/ews-nlp-llm-inference/dataset/ner/collected/latest_ner.csv')
df['instruction'] = [location_extraction(x) for x in df['summary']]
df=df[['instruction', 'locations']]
df=df.rename(columns={'locations':'generations'})
df=df.loc[:3, :]
data = df.to_dict(orient='records')
load_hub_dataset = LoadDataFromDicts(
name="load_data",
data=data,
batch_size=1,
)
ultrafeedback = UltraFeedback(
name="ultrafeedback_overall_rating",
llm=llm,
aspect="overall-rating",
output_mappings={"model_name": "ultrafeedback_model"},
)
load_hub_dataset.connect(ultrafeedback)
if __name__ == "__main__":
dataset = pipeline.run(
parameters={
"ultrafeedback_overall_rating": {
"generation_kwargs": {
"max_new_tokens": 1024,
"temperature": 0.7,
},
},
}
)
The errors still occurred:
Hi @tridungduong-unsw, thanks for the details! I'll try to reproduce the error and get back to you. You are using conda, right?
Hi @gabrielmbmb, yes, I'm using conda env. btw, I make it run now but need to modify a little bit. Other people will the same problem can try:
import json
import os
import pdb
import openai
from distilabel.llms import AzureOpenAILLM, OpenAILLM, vLLM
from distilabel.llms.mistral import MistralLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import (
CombineColumns,
KeepColumns,
LoadDataFromDicts,
LoadHubDataset,
PreferenceToArgilla,
TextGenerationToArgilla,
)
from distilabel.steps.tasks import TextGeneration, UltraFeedback
from distilabel.steps.tasks.text_generation import TextGeneration
from dotenv import load_dotenv
import pandas as pd
load_dotenv()
def location_extraction(article):
return system_prompt + prompt
llm = AzureOpenAILLM(
model=os.getenv("api_engine_gpt4"),
base_url=os.getenv("api_base_gpt4"),
api_key=os.getenv("api_key_gpt4"),
api_version=os.getenv("api_version"),
)
with Pipeline(name="ultrafeedback-pipeline") as pipeline:
df=pd.read_csv('data.csv')
df['instruction'] = [location_extraction(x) for x in df['summary']]
df=df[['instruction', 'locations']]
df=df.rename(columns={'locations':'generations'})
df=df.loc[:3, :]
data = df.to_dict(orient='records')
load_hub_dataset = LoadDataFromDicts(
name="load_data",
data=data,
batch_size=1,
)
ultrafeedback = UltraFeedback(
name="ultrafeedback_overall_rating",
llm=llm,
aspect="overall-rating",
output_mappings={"model_name": "ultrafeedback_model"},
)
load_hub_dataset.connect(ultrafeedback)
if __name__ == "__main__":
dataset = pipeline.run(
parameters={
"ultrafeedback_overall_rating": {
"generation_kwargs": {
"max_new_tokens": 1024,
"temperature": 0.7,
},
},
}
)
Hi @YueWu0301, could you share the code of your pipeline?
sure,here is my code:
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(
repo_id="xxxx",
name="load_dataset2"
# output_mappings={"input": "instruction"},
)
push_to_hub = PushToHub(
name="push_to_hub1",
repo_id="xxxx",
token="xxxxxx"
)
llm1 = OpenAILLM(model="xxxx",
api_key = "xxxx",
base_url="xxxx")
task = TextGeneration(name=f"text_generation1", llm=llm1)
load_dataset.connect(task)
task.connect(push_to_hub)
re = pipeline.run(
parameters={
"load_dataset2":{
"repo_id":"xxxxxx",
},
"text_generation1": {
"llm": {
"generation_kwargs": {
"temperature": 0.9,
}
}
},
"push_to_hub1":{
"repo_id":"xxxxxxx",
}
}
)
Thanks a lot
Hi @YueWu0301, could you try running and see if it works for you too? (mind the if __name__ == "__main__":
)
with Pipeline("pipe-name", description="My first pipe") as pipeline:
load_dataset = LoadHubDataset(
repo_id="xxxx",
name="load_dataset2"
# output_mappings={"input": "instruction"},
)
push_to_hub = PushToHub(
name="push_to_hub1",
repo_id="xxxx",
token="xxxxxx"
)
llm1 = OpenAILLM(model="xxxx",
api_key = "xxxx",
base_url="xxxx")
task = TextGeneration(name=f"text_generation1", llm=llm1)
load_dataset.connect(task)
task.connect(push_to_hub)
if __name__ == "__main__":
re = pipeline.run(
parameters={
"load_dataset2":{
"repo_id":"xxxxxx",
},
"text_generation1": {
"llm": {
"generation_kwargs": {
"temperature": 0.9,
}
}
},
"push_to_hub1":{
"repo_id":"xxxxxxx",
}
}
)
I don't know whether it is proper to ask some advice here, I also run with the thread error with the 1.3.1 version.
RuntimeError: Failed to load all the steps. Could not run pipeline.
Exception in thread Thread-1 (_monitor):
Traceback (most recent call last):
File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
following is my code to reproduce the error
from distilabel.llms.vllm import ClientvLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadDataFromHub, CombineColumns
from distilabel.steps.tasks import TextGeneration, UltraFeedback
Qwen7B = ClientvLLM(
base_url="http://localhost:8001/v1",
model="/home/public_data/qwen/Qwen2-7B-Instruct/"
)
Qwen72B = ClientvLLM(
base_url="http://localhost:8008/v1",
model="/home/public_data/qwen/Qwen2-72B-Instruct/"
)
with Pipeline(name="synthetic-data-with-qwen") as pipeline:
load_dataset = LoadDataFromHub(
repo_id="argilla/10Kprompts-mini"
)
generate = [
TextGeneration(llm=Qwen7B),
TextGeneration(llm=Qwen72B)
]
combine = CombineColumns(
columns=["generation", "model_name"],
output_columns=["generations", "model_names"]
)
rate = UltraFeedback(aspect="overall-rating", llm=Qwen72B)
load_dataset >> generate >> combine >> rate
if __name__ == "__main__":
distiset = pipeline.run()
the full log is like following:
Downloading readme: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 347/347 [00:00<00:00, 1.54kB/s]
[08/17/24 08:41:48] INFO ['distilabel.pipeline'] 📝 Pipeline data will be written to '/root/.cache/distilabel/pipelines/synthetic-data-with-qwen/afa142b13d9d3dc171d7a6159a1c21c3ecd41911/data' base.py:696
INFO ['distilabel.pipeline'] ⌛ The steps of the pipeline will be loaded in stages: base.py:705
* Stage 0: ['load_data_from_hub_0', 'text_generation_0', 'text_generation_1', 'combine_columns_0', 'ultra_feedback_0']
INFO ['distilabel.pipeline'] ⏳ Waiting for all the steps of stage 0 to load... base.py:918
[08/17/24 08:41:50] ERROR ['distilabel.pipeline'] ❌ Failed with an unhandled exception: Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x7f67784bed70>'. Reason: 'TypeError("cannot pickle '_thread.RLock' object")' local.py:263
INFO ['distilabel.pipeline'] 🛑 Stopping pipeline. Waiting for steps to finish processing batches... local.py:363
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /home/qiey/DataGeneration/test_preference.py:37 in <module> │
│ │
│ 34 │ load_dataset >> generate >> combine >> rate │
│ 35 │
│ 36 if __name__ == "__main__": │
│ ❱ 37 │ distiset = pipeline.run() │
│ 38 │
│ │
│ ╭────────────────────────────────────── locals ───────────────────────────────────────╮ │
│ │ ClientvLLM = <class 'distilabel.llms.vllm.ClientvLLM'> │ │
│ │ combine = CombineColumns( │ │
│ │ │ name='combine_columns_0', │ │
│ │ │ resources=StepResources( │ │
│ │ │ │ replicas=1, │ │
│ │ │ │ cpus=None, │ │
│ │ │ │ gpus=None, │ │
│ │ │ │ memory=None, │ │
│ │ │ │ resources=None │ │
│ │ │ ), │ │
│ │ │ input_mappings={}, │ │
│ │ │ output_mappings={}, │ │
│ │ │ input_batch_size=50, │ │
│ │ │ columns=['generation', 'model_name'], │ │
│ │ │ output_columns=['generations', 'model_names'] │ │
│ │ ) │ │
│ │ CombineColumns = <class 'distilabel.steps.columns.group.CombineColumns'> │ │
│ │ generate = [ │ │
│ │ │ TextGeneration( │ │
│ │ │ │ name='text_generation_0', │ │
│ │ │ │ resources=StepResources( │ │
│ │ │ │ │ replicas=1, │ │
│ │ │ │ │ cpus=None, │ │
│ │ │ │ │ gpus=None, │ │
│ │ │ │ │ memory=None, │ │
│ │ │ │ │ resources=None │ │
│ │ │ │ ), │ │
│ │ │ │ input_mappings={}, │ │
│ │ │ │ output_mappings={}, │ │
│ │ │ │ input_batch_size=50, │ │
│ │ │ │ llm=ClientvLLM( │ │
│ │ │ │ │ use_magpie_template=False, │ │
│ │ │ │ │ magpie_pre_query_template=None, │ │
│ │ │ │ │ generation_kwargs={}, │ │
│ │ │ │ │ model='/home/public_data/qwen/Qwen2-7B-Instruct/', │ │
│ │ │ │ │ base_url='http://localhost:8001/v1', │ │
│ │ │ │ │ api_key=None, │ │
│ │ │ │ │ max_retries=6, │ │
│ │ │ │ │ timeout=120, │ │
│ │ │ │ │ structured_output=None, │ │
│ │ │ │ │ tokenizer=None, │ │
│ │ │ │ │ tokenizer_revision=None │ │
│ │ │ │ ), │ │
│ │ │ │ group_generations=False, │ │
│ │ │ │ add_raw_output=True, │ │
│ │ │ │ num_generations=1, │ │
│ │ │ │ use_system_prompt=True │ │
│ │ │ ), │ │
│ │ │ TextGeneration( │ │
│ │ │ │ name='text_generation_1', │ │
│ │ │ │ resources=StepResources( │ │
│ │ │ │ │ replicas=1, │ │
│ │ │ │ │ cpus=None, │ │
│ │ │ │ │ gpus=None, │ │
│ │ │ │ │ memory=None, │ │
│ │ │ │ │ resources=None │ │
│ │ │ │ ), │ │
│ │ │ │ input_mappings={}, │ │
│ │ │ │ output_mappings={}, │ │
│ │ │ │ input_batch_size=50, │ │
│ │ │ │ llm=ClientvLLM( │ │
│ │ │ │ │ use_magpie_template=False, │ │
│ │ │ │ │ magpie_pre_query_template=None, │ │
│ │ │ │ │ generation_kwargs={}, │ │
│ │ │ │ │ model='/home/public_data/qwen/Qwen2-72B-Instruct/', │ │
│ │ │ │ │ base_url='http://localhost:8008/v1', │ │
│ │ │ │ │ api_key=None, │ │
│ │ │ │ │ max_retries=6, │ │
│ │ │ │ │ timeout=120, │ │
│ │ │ │ │ structured_output=None, │ │
│ │ │ │ │ tokenizer=None, │ │
│ │ │ │ │ tokenizer_revision=None │ │
│ │ │ │ ), │ │
│ │ │ │ group_generations=False, │ │
│ │ │ │ add_raw_output=True, │ │
│ │ │ │ num_generations=1, │ │
│ │ │ │ use_system_prompt=True │ │
│ │ │ ) │ │
│ │ ] │ │
│ │ Qwen72B = ClientvLLM( │ │
│ │ │ use_magpie_template=False, │ │
│ │ │ magpie_pre_query_template=None, │ │
│ │ │ generation_kwargs={}, │ │
│ │ │ model='/home/public_data/qwen/Qwen2-72B-Instruct/', │ │
│ │ │ base_url='http://localhost:8008/v1', │ │
│ │ │ api_key=None, │ │
│ │ │ max_retries=6, │ │
│ │ │ timeout=120, │ │
│ │ │ structured_output=None, │ │
│ │ │ tokenizer=None, │ │
│ │ │ tokenizer_revision=None │ │
│ │ ) │ │
│ │ Qwen7B = ClientvLLM( │ │
│ │ │ use_magpie_template=False, │ │
│ │ │ magpie_pre_query_template=None, │ │
│ │ │ generation_kwargs={}, │ │
│ │ │ model='/home/public_data/qwen/Qwen2-7B-Instruct/', │ │
│ │ │ base_url='http://localhost:8001/v1', │ │
│ │ │ api_key=None, │ │
│ │ │ max_retries=6, │ │
│ │ │ timeout=120, │ │
│ │ │ structured_output=None, │ │
│ │ │ tokenizer=None, │ │
│ │ │ tokenizer_revision=None │ │
│ │ ) │ │
│ │ load_dataset = LoadDataFromHub( │ │
│ │ │ name='load_data_from_hub_0', │ │
│ │ │ resources=StepResources( │ │
│ │ │ │ replicas=1, │ │
│ │ │ │ cpus=None, │ │
│ │ │ │ gpus=None, │ │
│ │ │ │ memory=None, │ │
│ │ │ │ resources=None │ │
│ │ │ ), │ │
│ │ │ input_mappings={}, │ │
│ │ │ output_mappings={}, │ │
│ │ │ batch_size=50, │ │
│ │ │ repo_id='argilla/10Kprompts-mini', │ │
│ │ │ split='train', │ │
│ │ │ config=None, │ │
│ │ │ streaming=False, │ │
│ │ │ num_examples=None, │ │
│ │ │ storage_options=None │ │
│ │ ) │ │
│ │ LoadDataFromHub = <class 'distilabel.steps.generators.huggingface.LoadDataFromHub'> │ │
│ │ Pipeline = <class 'distilabel.pipeline.local.Pipeline'> │ │
│ │ pipeline = <distilabel.pipeline.local.Pipeline object at 0x7f679049ffd0> │ │
│ │ rate = UltraFeedback( │ │
│ │ │ name='ultra_feedback_0', │ │
│ │ │ resources=StepResources( │ │
│ │ │ │ replicas=1, │ │
│ │ │ │ cpus=None, │ │
│ │ │ │ gpus=None, │ │
│ │ │ │ memory=None, │ │
│ │ │ │ resources=None │ │
│ │ │ ), │ │
│ │ │ input_mappings={}, │ │
│ │ │ output_mappings={}, │ │
│ │ │ input_batch_size=50, │ │
│ │ │ llm=ClientvLLM( │ │
│ │ │ │ use_magpie_template=False, │ │
│ │ │ │ magpie_pre_query_template=None, │ │
│ │ │ │ generation_kwargs={}, │ │
│ │ │ │ model='/home/public_data/qwen/Qwen2-72B-Instruct/', │ │
│ │ │ │ base_url='http://localhost:8008/v1', │ │
│ │ │ │ api_key=None, │ │
│ │ │ │ max_retries=6, │ │
│ │ │ │ timeout=120, │ │
│ │ │ │ structured_output=None, │ │
│ │ │ │ tokenizer=None, │ │
│ │ │ │ tokenizer_revision=None │ │
│ │ │ ), │ │
│ │ │ group_generations=False, │ │
│ │ │ add_raw_output=True, │ │
│ │ │ num_generations=1, │ │
│ │ │ aspect='overall-rating' │ │
│ │ ) │ │
│ │ TextGeneration = <class 'distilabel.steps.tasks.text_generation.TextGeneration'> │ │
│ │ UltraFeedback = <class 'distilabel.steps.tasks.ultrafeedback.UltraFeedback'> │ │
│ ╰─────────────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /usr/local/lib/python3.10/dist-packages/distilabel/pipeline/local.py:205 in run │
│ │
│ 202 │ │ │ self._teardown() │
│ 203 │ │ │ │
│ 204 │ │ │ if self._exception: │
│ ❱ 205 │ │ │ │ raise self._exception │
│ 206 │ │ │
│ 207 │ │ distiset = create_distiset( │
│ 208 │ │ │ self._cache_location["data"], │
│ │
│ ╭────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
│ │ dataset = None │ │
│ │ distiset = None │ │
│ │ manager = <multiprocessing.managers.SyncManager object at 0x7f66381da500> │ │
│ │ num_processes = 5 │ │
│ │ parameters = None │ │
│ │ pool = <distilabel.pipeline.local._NoDaemonPool state=TERMINATE pool_size=5> │ │
│ │ self = <distilabel.pipeline.local.Pipeline object at 0x7f679049ffd0> │ │
│ │ storage_parameters = None │ │
│ │ use_cache = True │ │
│ │ use_fs_to_pass_data = False │ │
│ ╰─────────────────────────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
RuntimeError: Failed to load all the steps. Could not run pipeline.
Exception in thread Thread-1 (_monitor):
Traceback (most recent call last):
File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
@gabrielmbmb could you help me with this error?