openai-python
openai-python copied to clipboard
How to stream responses from Assistants API? The quickstart example doesn't seem to be working
Confirm this is an issue with the Python library and not an underlying OpenAI API
- [X] This is an issue with the Python library
Describe the bug
I get ERROR:api.routes:Error in event generator: 'StreamingEventHandler' object has no attribute '_AssistantEventHandler__stream' when I use the example code to enable streaming from Assistants API. Additionaly, The method "create_and_stream" in class "AsyncRuns" is deprecated.
To Reproduce
Use example code from https://platform.openai.com/docs/assistants/quickstart
from typing_extensions import override
from openai import AssistantEventHandler
# First, we create a EventHandler class to define
# how we want to handle the events in the response stream.
class EventHandler(AssistantEventHandler):
@override
def on_text_created(self, text) -> None:
print(f"\nassistant > ", end="", flush=True)
@override
def on_text_delta(self, delta, snapshot):
print(delta.value, end="", flush=True)
def on_tool_call_created(self, tool_call):
print(f"\nassistant > {tool_call.type}\n", flush=True)
def on_tool_call_delta(self, delta, snapshot):
if delta.type == 'code_interpreter':
if delta.code_interpreter.input:
print(delta.code_interpreter.input, end="", flush=True)
if delta.code_interpreter.outputs:
print(f"\n\noutput >", flush=True)
for output in delta.code_interpreter.outputs:
if output.type == "logs":
print(f"\n{output.logs}", flush=True)
# Then, we use the `stream` SDK helper
# with the `EventHandler` class to create the Run
# and stream the response.
with client.beta.threads.runs.stream(
thread_id=thread.id,
assistant_id=assistant.id,
instructions="Please address the user as Jane Doe. The user has a premium account.",
event_handler=EventHandler(),
) as stream:
stream.until_done()
Code snippets
No response
OS
Docker container: FROM python:3.9
Python version
Python v3.9
Library version
v1.51.2
Same issue
same issue, is OpenAI letting chatGPT write their API documentation and hallucinating? wth lol
did anyone find a solution to this. I am also getting an "AttributeError: 'EventHandler' object has no attribute '_AssistantEventHandler__stream'" error...
I cannot reproduce this, does anyone have a complete example snippet? Running this snippet works for me.
@RobertCraigie Here is the code I am trying to run. Hoping to use a websocket to deliver the stream to my front end:
Back-end:
class MessageRequest(BaseModel):
message: str
num_msg: int
class EventHandler(AssistantEventHandler):
def __init__(self, websocket: WebSocket):
self.websocket = websocket
@override
async def on_text_created(self, text) -> None:
await self.websocket.send_text("\nassistant > ")
@override
async def on_text_delta(self, delta, snapshot):
await self.websocket.send_text(delta.value)
async def on_tool_call_created(self, tool_call):
await self.websocket.send_text(f"\nassistant > {tool_call.type}\n")
async def on_tool_call_delta(self, delta, snapshot):
if delta.type == 'code_interpreter':
if delta.code_interpreter.input:
await self.websocket.send_text(delta.code_interpreter.input)
if delta.code_interpreter.outputs:
await self.websocket.send_text("\n\noutput >")
for output in delta.code_interpreter.outputs:
if output.type == "logs":
await self.websocket.send_text(f"\n{output.logs}")
@app.get("/")
def read_root():
return {"message": "Hello, FastAPI!"}
@app.websocket("/chat")
async def chat_with_assistant(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
user_input = data['message']
num_msg = data['num_msg']
global t_id
if num_msg == 0:
thread = client.beta.threads.create()
t_id = thread.id
message = client.beta.threads.messages.create(
thread_id=t_id,
role="user",
content= user_input
)
event_handler = EventHandler(websocket)
with client.beta.threads.runs.stream(
thread_id=t_id,
assistant_id=assistant_id,
instructions="Please address the user as Jane Doe. The user has a premium account.",
event_handler=event_handler,
) as stream:
await stream.until_done()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Front-end:
interface Message {
text: string;
isUser: boolean;
}
export function Chat() {
const [messages, setMessages] = useState<Message[]>([])
const [input, setInput] = useState('')
const [numMessages, setNumMessages] = useState(0)
const [isConnected, setIsConnected] = useState(false)
const textareaRef = useRef<HTMLTextAreaElement>(null)
const scrollAreaRef = useRef<HTMLDivElement>(null)
const socketRef = useRef<WebSocket | null>(null)
useEffect(() => {
socketRef.current = new WebSocket('ws://localhost:8000/chat')
socketRef.current.onopen = () => {
console.log('WebSocket connection established')
setIsConnected(true)
}
socketRef.current.onmessage = (event) => {
const newMessage = { text: event.data, isUser: false }
setMessages((prev) => [...prev, newMessage])
}
socketRef.current.onclose = () => {
console.log('WebSocket connection closed')
setIsConnected(false)
}
socketRef.current.onerror = (error) => {
console.error('WebSocket error:', error)
}
return () => {
if (socketRef.current) {
// socketRef.current.close()
}
}
}, [])
const handleSend = () => {
if (input.trim() && isConnected && socketRef.current) {
const userMessage = { text: input, isUser: true }
setMessages((prev) => [...prev, userMessage])
socketRef.current.send(JSON.stringify({
message: input,
num_msg: numMessages
}))
setInput('')
setNumMessages((prev) => prev + 1)
}
}
lmk if you have any other questions or see any obvious flaws in my implementation.
I’m currently using this code as a workaround. Please note that it requires modification because it includes undefined functions as it was copied directly from my project.
import openai
from openai import AssistantEventHandler
try:
client = openai.OpenAI(api_key=api_key)
logger.info("OpenAI client initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize OpenAI client: {str(e)}")
# Create a custom event handler for streaming
class StreamHandler(AssistantEventHandler):
def __init__(self, message_placeholder):
super().__init__()
self.message_placeholder = message_placeholder
self.full_response = ""
@override
def on_text_created(self, text) -> None:
logger.info("Starting new response")
self.full_response = ""
@override
def on_text_delta(self, delta, snapshot) -> None:
self.full_response += delta.value
self.message_placeholder.markdown(self.full_response)
logger.info(f"Response delta: {delta.value}")
def get_full_response(self) -> str:
return self.full_response
def process_message_stream(thread_id: str, message: str, message_placeholder, assistant_id: str) -> tuple[str, list]:
"""Process a message and stream the assistant's response. Returns (response, annotations)."""
try:
# Add message to thread
client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=message
)
logger.info(f"Message added to thread {thread_id}")
# Show thinking indicator
message_placeholder.markdown("*Thinking...*")
# Create run with retries
max_retries = 3
retry_delay = 5
run = None
for attempt in range(max_retries):
try:
run = client.beta.threads.runs.create(
thread_id=thread_id,
assistant_id=assistant_id,
stream=True
)
logger.info(f"Created streaming run with assistant {assistant_id}")
break
except openai.RateLimitError as e:
if attempt == max_retries - 1:
error_msg = "Rate limit exceeded. Please try again in a few minutes."
logger.error(f"Rate limit error after {max_retries} attempts: {str(e)}")
message_placeholder.error(error_msg)
return error_msg, []
logger.warning(f"Rate limit hit, attempt {attempt + 1}/{max_retries}. Waiting {retry_delay} seconds...")
message_placeholder.markdown(f"*Rate limit reached. Retrying in {retry_delay} seconds... ({attempt + 1}/{max_retries})*")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
except Exception as e:
logger.error(f"Error creating run: {str(e)}")
message_placeholder.error("Failed to create run. Please try again.")
return "Error creating run", []
if run is None:
error_msg = "Failed to create run after retries"
logger.error(error_msg)
message_placeholder.error(error_msg)
return error_msg, []
full_response = ""
first_token = True
# Process the stream with rate limit handling
try:
for chunk in run:
if chunk.event == "thread.message.delta":
if hasattr(chunk.data.delta, 'content') and chunk.data.delta.content:
content_delta = chunk.data.delta.content[0].text.value
logger.info(f"Received content delta: {content_delta}")
if content_delta:
full_response += content_delta
if first_token:
first_token = False
message_placeholder.markdown(full_response)
logger.info(f"Updated response: {full_response}")
elif chunk.event == "thread.run.completed":
logger.info("Stream completed")
break
elif chunk.event == "thread.run.failed":
logger.info(f"Run failed. Full data: {chunk}")
# Extract error details from the failed run
if hasattr(chunk.data, 'last_error'):
error_code = getattr(chunk.data.last_error, 'code', 'unknown')
error_message = getattr(chunk.data.last_error, 'message', 'Unknown error')
if error_code == 'rate_limit_exceeded':
error_msg = f"Rate limit exceeded. {error_message}"
else:
error_msg = f"Stream failed: {error_code} - {error_message}"
else:
error_msg = "Stream failed with an unknown error"
logger.error(f"Run failed with error: {error_msg}")
message_placeholder.error(error_msg)
return error_msg, []
# Get the complete message with annotations after streaming is done
messages = client.beta.threads.messages.list(
thread_id=thread_id,
order="desc",
limit=1
)
annotations = []
if messages.data:
message = messages.data[0]
if hasattr(message.content[0].text, 'annotations'):
annotations = message.content[0].text.annotations
logger.info(f"Found annotations: {annotations}")
# Clean the full response by removing citation markers
for annotation in annotations:
if hasattr(annotation, 'text'):
citation_text = annotation.text
full_response = full_response.replace(citation_text, '')
logger.info(f"Removed citation text: {citation_text}")
return full_response.strip(), annotations
except openai.RateLimitError as e:
error_msg = "Rate limit exceeded during streaming. Please try again in a few minutes."
logger.error(f"Rate limit error during streaming: {str(e)}")
message_placeholder.error(error_msg)
return error_msg, []
except Exception as e:
logger.error(f"Error processing message: {str(e)}")
return f"Error processing request: {str(e)}", []
I am getting this error: ERROR:
Exception in ASGI application Traceback (most recent call last): File "/Users/nickhenderson/Projects/react-website/run.py", line 152, in chat_with_assistant with client.beta.threads.runs.stream( File "/Users/nickhenderson/Projects/react-website/venv/lib/python3.11/site-packages/openai/lib/streaming/_assistants.py", line 447, in __enter__ self.__event_handler._init(self.__stream) File "/Users/nickhenderson/Projects/react-website/venv/lib/python3.11/site-packages/openai/lib/streaming/_assistants.py", line 59, in _init if self.__stream: ^^^^^^^^^^^^^ AttributeError: 'EventHandler' object has no attribute '_AssistantEventHandler__stream'
Are we having a similar issue? I am hoping to use a websocket to deliver the stream to my front end:
I have also:
import openai
from openai import AssistantEventHandler
# Create a custom event handler for streaming
class StreamHandler(AssistantEventHandler):
def __init__(self, message_placeholder):
super().__init__()
self.message_placeholder = message_placeholder
self.full_response = ""
@override
def on_text_created(self, text) -> None:
logger.info("Starting new response")
self.full_response = ""
@override
def on_text_delta(self, delta, snapshot) -> None:
self.full_response += delta.value
self.message_placeholder.markdown(self.full_response)
logger.info(f"Response delta: {delta.value}")
def get_full_response(self) -> str:
return self.full_response
I've updated my previous message.
if anyone could help, I would greatly appreciate it:
https://community.openai.com/t/error-trying-to-stream-assistant-api-with-websocket/1092353
Hi all, I appeared to have fixed this issue after upgrading my OpenAI package and Python Package (3.10.9), not sure which one actually fixed the issue, either upgrade both, or OpenAI package first (1.59.3).
if you're defining __init__, you'll need to call super init
class EventHandler(AssistantEventHandler):
def __init__(self, websocket: WebSocket):
super().__init__() # the absence will cause error
self.websocket = websocket
Sorry for the delayed response, looks like some people here managed to get it working? Let us know if you're still running into issues.