[BUG] PFClient.run using flow with AsyncIterator raises `TypeError: cannot pickle '_thread.lock' object`
Describe the bug
Batch runs cannot be successfully completed if the flow call produces an AsyncIterator that ends up wrapped by promptflow.tracing.TracedAsyncIterator. Each run from data files errors with TypeError: cannot pickle 'thread.lock' object
How To Reproduce the bug Code below produces error consistently
import tempfile
from openai import AzureOpenAI
from promptflow.tracing import trace
from promptflow.core import AzureOpenAIModelConfiguration, Prompty
from promptflow.client import PFClient
class ChatFlow:
def __init__(
self, model_config: AzureOpenAIModelConfiguration
):
self.model_config = model_config
@trace
async def __call__(
self,
topic: str,
) -> str:
"""Flow entry function."""
client = AzureOpenAI(
azure_endpoint=self.model_config.azure_endpoint,
api_key=self.model_config.api_key,
api_version=self.model_config.api_version,
)
response = client.chat.completions.create(
model=self.model_config.azure_deployment,
messages = [
{"role": "system", "content": "Create a story about the topic provided by the user"},
{"role": "user", "content": f"Tell me a story about {topic}"},
],
max_tokens=150,
)
for chunk in response:
if len(chunk.choices) > 0 and (message := chunk.choices[0].message):
content = message.content
yield content + "\n"
def main():
f = tempfile.NamedTemporaryFile(suffix=".csv", mode="w+t")
try:
f.write("topic\nlittle league\n")
f.seek(0)
config = AzureOpenAIModelConfiguration(
connection="aoai_connection", azure_deployment="gpt-35-turbo"
)
chat_flow = ChatFlow(model_config=config)
result = PFClient().run(chat_flow, data=f.name)
finally:
f.delete()
if __name__ == "__main__":
main()
Bug can be traced to run_info submitted to queue at
https://github.com/microsoft/promptflow/blob/745704a5b7f868c61c71f7a12eb13ef695ab4333/src/promptflow-core/promptflow/storage/_queue_run_storage.py#L24
For the example code, the results property within run_info has an instance of promptflow.tracing.TracedAsyncIterator which is not able to be pickled when submitted to multiprocessing queue and raises the mentioned error.
Error file from batch run attached error.json
Expected behavior Successful execution of batch run
Running Information(please complete the following information):
- Promptflow Package Version using
pf -v: 1.12.0 - Operating System: macOS Sonoma 14.5
- Python Version using
python --version: 3.12.2
Hi @bwilliams2 , it seems that you forgot to specify "streaming" to true in completion API, and the method to iterate over result is not right. Please try change your code to the following:
response = client.chat.completions.create(
model="gpt-35-turbo",
messages=[
{"role": "system", "content": "Create a story about the topic provided by the user"},
{"role": "user", "content": f"Tell me a story about {topic}"},
],
max_tokens=150,
stream=True
)
for chunk in response:
print(f"chunk: {chunk}")
if len(chunk.choices) > 0 and (message := chunk.choices[0].delta.content):
yield str(message)
@guming-learning
I updated with these changes and get the exact same error. I don't believe the function ever gets executed because of the pickling error in the original issue. It seems that the promptflow batch run cannot be used with streaming flows.