Adding Event Type information on get_task_result_stream()
From the following code, I am using the get_task_result_stream() from Session in core.py.
This is used to return Event information from a llama index workflow. However, the method actually only returns a dict and not the Eventtype itself, which would be useful for filtering later on.
I am currently using a websocket to pass EvenInformation to a Frontend using this:
# Initialize workflow handler using existing pattern
async for event in session.get_task_result_stream(task_id):
await websocket.send_json(event)
print(type(event)) # returns dict without event type info
# Handle human input requests
if "prefix" in event:
response = await websocket.receive_json()
Especially for InputRequiredEvent, I would like a better identifier.
So is there a way to either
- modify
get_task_result_stream()to return the actual EventObjects instead of dicts? - add a param to the object returned by
get_task_result_stream()? - add a param to the Event-Type Definition, maybe via decorators? This would probably affect llama-index code.
This would allow for better downstream Event filtering
-I have accidentally closed this issue, please reopen if possible-
I have thought about this a little bit. I am tempted to just modify the Event class on the workflow side but this seems like a fix rather than a solution.
I think that the get_task_result_stream functnalities, either in the Session or the Control Plane Server code should wrap the Event Data together with additional metadata such as event_type and possibly even a tracing id before sending it as a payload.
I am interested whether this seems important for you currently. I also wondered whether you have intentions of moving the tracing capabilities from lama-index over to llama-deploy. A lot of these functionalities seem to be initially integrated for llama-index but are also very relevant for llama-deploy.
@robbyfrt we just resuming the work here so yes, it's important. We have an internal doc where we're collecting what we want to add or change in light of an upcoming 1.0 release (you can see there are already many changes in main compared to 0.8) and observability is on the list. Good point about attaching the trace id to the event, let me have a better look at this issue and I'll come back with something. If you have any further pointer to give us, please go ahead, that helps a lot!
Thank you @masci for the heads up! I am currently still evaluating different components of the llama-index /deploy framework so I still have little experience to share regarding any actual loads on the system. From initial assessment though, quite a lot requirements are met with either llama-index or llama-deploy already, it's just a matter of clear documentation. The frontend team has mentioned requirements towards caching for performance improvements but so far I have only found checkpointing for development in llama index. Not sure if applicable in this context.
Regarding tracing, mlflow integration on llama-index is really easy to set up but I am not sure where to integrate this in a production scenario as workflow.run is abstracted away here. Some OpenTelemetry based tracing would still be useful I think.
For the issue described above, I have opted for now atöeast for a typed base class to inherit from:
class WebssocketEvent(Event):
@computed_field
@property
def type(self) -> str:
return self.__class__.__name__
This can be applied only to those events I intend to send and it is easily extendable to metadata like timestamps, etc.