[Feature Request] Add streaming chunk events to WorkforceCallback for real-time agent output monitoring
Required prerequisites
- [x] I have searched the Issue Tracker and Discussions that this hasn't already been reported. (+1 or comment there if it has.)
- [ ] Consider asking first in a Discussion.
Motivation
Workforce internally calls agent.step() and waits for the final result, never iterating through streaming chunks . Users cannot monitor real-time streaming output from agents when using Workforce, even with custom callbacks.
Solution
- Add new event type
@dataclass
class TaskStreamingChunkEvent:
task_id: str
worker_id: str
chunk: ChatAgentResponse
chunk_index: int
- Add callback method
class WorkforceCallback:
def log_task_streaming_chunk(self, event: TaskStreamingChunkEvent) -> None:
"""Called when a task produces a streaming output chunk."""
pass
- Auto-detect and conditionally enable streaming
class Workforce:
def __init__(self, name, callbacks=None):
self.callbacks = callbacks or []
# Auto-detect if any callback needs streaming
self._streaming_enabled = any(
callable(getattr(cb, 'log_task_streaming_chunk', None))
for cb in self.callbacks
)
async def _execute_task(self, task, worker):
self._emit(TaskStartedEvent(...))
if self._streaming_enabled:
# Iterate through streaming chunks
chunk_index = 0
async for chunk in worker.agent.step(task.content):
self._emit(TaskStreamingChunkEvent(
task_id=task.id,
worker_id=worker.id,
chunk=chunk,
chunk_index=chunk_index
))
chunk_index += 1
else:
# Current behavior: wait for final result
result = await worker.agent.step(task.content)
self._emit(TaskCompletedEvent(...))
Alternatives
No response
Additional context
No response
A summary of the changes CodeRabbit can apply:
Implement streaming chunk events and fix workforce_callback syntax: remove duplicate @abstractmethod and reinsert single decorator in camel/societies/workforce/workforce_callback.py; add TaskStreamingChunkEvent to camel/societies/workforce/events.py; update Workforce.init to set _streaming_enabled based on callbacks; add Workforce._emit_streaming_chunk helper; and modify SingleAgentWorker._process_task to emit TaskStreamingChunkEvent per chunk when streaming is enabled.
Add real-time task streaming support by introducing a new TaskStreamingChunkEvent type (with fields task_id, worker_id, chunk_content, chunk_index) to camel/societies/workforce/events.py and include it in the exported event list, and update camel/societies/workforce/workforce_callback.py to import the new event and declare an abstract log_task_streaming_chunk(event: TaskStreamingChunkEvent) method so callbacks can optionally handle streaming chunks.
- [ ] ✅ Create PR with these edits
- [ ] 📋 Get copyable edits
yeah i can pick this up
apologies for my lack of knowledge, but from a quick glance i notice the workforce.py has a process_task(and a async entry function), is the idea to implement streaming here?
apologies for my lack of knowledge, but from a quick glance i notice the workforce.py has a process_task(and a async entry function), is the idea to implement streaming here?
hi,@JINO-ROHIT async process_task is designed to solve the problem of excessively long waiting times when splitting tasks, and is unrelated to this issue.
yeah i can pick this up
@JINO-ROHIT thanks!
hi @fengju0213 would you mind giving a bit of brief on the specifics on the feature? im a bit lost, if not ill let someone else implement this
hi @fengju0213 would you mind giving a bit of brief on the specifics on the feature? im a bit lost, if not ill let someone else implement this
Thanks for checking in! Right now the workforce layer only emits lifecycle events (task created/started/completed, etc.) and the
callback interface matches that set. There’s no TaskStreamingChunkEvent, and WorkforceCallback has no hook for streaming output, so
even though SingleAgentWorker can iterate over AsyncStreamingChatAgentResponse, those chunks never leave the worker.
To implement the feature we need:
- Add a
TaskStreamingChunkEventdataclass (task_id, worker_id, chunk, chunk_index) tocamel/societies/workforce/events.pyand include it in the exported union. - Extend
WorkforceCallback(and at least the defaultWorkforceLogger) with an optionallog_task_streaming_chunkmethod that takes that event.
Let me know if anything there is unclear, happy to dive deeper.
yeah thanks a lot, this makes sense, ill piece together my understanding in a PR and hopefully we can review this together