camel icon indicating copy to clipboard operation
camel copied to clipboard

[Feature Request] Add streaming chunk events to WorkforceCallback for real-time agent output monitoring

Open Wendong-Fan opened this issue 1 month ago • 8 comments

Required prerequisites

  • [x] I have searched the Issue Tracker and Discussions that this hasn't already been reported. (+1 or comment there if it has.)
  • [ ] Consider asking first in a Discussion.

Motivation

Workforce internally calls agent.step() and waits for the final result, never iterating through streaming chunks . Users cannot monitor real-time streaming output from agents when using Workforce, even with custom callbacks.

Solution

  1. Add new event type
  @dataclass
  class TaskStreamingChunkEvent:
      task_id: str
      worker_id: str
      chunk: ChatAgentResponse
      chunk_index: int
  1. Add callback method
  class WorkforceCallback:
      def log_task_streaming_chunk(self, event: TaskStreamingChunkEvent) -> None:
          """Called when a task produces a streaming output chunk."""
          pass
  1. Auto-detect and conditionally enable streaming
  class Workforce:
      def __init__(self, name, callbacks=None):
          self.callbacks = callbacks or []
          # Auto-detect if any callback needs streaming
          self._streaming_enabled = any(
              callable(getattr(cb, 'log_task_streaming_chunk', None))
              for cb in self.callbacks
          )

      async def _execute_task(self, task, worker):
          self._emit(TaskStartedEvent(...))

          if self._streaming_enabled:
              # Iterate through streaming chunks
              chunk_index = 0
              async for chunk in worker.agent.step(task.content):
                  self._emit(TaskStreamingChunkEvent(
                      task_id=task.id,
                      worker_id=worker.id,
                      chunk=chunk,
                      chunk_index=chunk_index
                  ))
                  chunk_index += 1
          else:
              # Current behavior: wait for final result
              result = await worker.agent.step(task.content)

          self._emit(TaskCompletedEvent(...))

Alternatives

No response

Additional context

No response

Wendong-Fan avatar Nov 04 '25 23:11 Wendong-Fan

A summary of the changes CodeRabbit can apply:

  • Implement streaming chunk events and fix workforce_callback syntax: remove duplicate @abstractmethod and reinsert single decorator in camel/societies/workforce/workforce_callback.py; add TaskStreamingChunkEvent to camel/societies/workforce/events.py; update Workforce.init to set _streaming_enabled based on callbacks; add Workforce._emit_streaming_chunk helper; and modify SingleAgentWorker._process_task to emit TaskStreamingChunkEvent per chunk when streaming is enabled.

  • Add real-time task streaming support by introducing a new TaskStreamingChunkEvent type (with fields task_id, worker_id, chunk_content, chunk_index) to camel/societies/workforce/events.py and include it in the exported event list, and update camel/societies/workforce/workforce_callback.py to import the new event and declare an abstract log_task_streaming_chunk(event: TaskStreamingChunkEvent) method so callbacks can optionally handle streaming chunks.

  • [ ] ✅ Create PR with these edits
  • [ ] 📋 Get copyable edits

coderabbitai[bot] avatar Nov 04 '25 23:11 coderabbitai[bot]

yeah i can pick this up

JINO-ROHIT avatar Nov 05 '25 04:11 JINO-ROHIT

apologies for my lack of knowledge, but from a quick glance i notice the workforce.py has a process_task(and a async entry function), is the idea to implement streaming here?

JINO-ROHIT avatar Nov 05 '25 05:11 JINO-ROHIT

apologies for my lack of knowledge, but from a quick glance i notice the workforce.py has a process_task(and a async entry function), is the idea to implement streaming here?

hi,@JINO-ROHIT async process_task is designed to solve the problem of excessively long waiting times when splitting tasks, and is unrelated to this issue.

fengju0213 avatar Nov 10 '25 04:11 fengju0213

yeah i can pick this up

@JINO-ROHIT thanks!

fengju0213 avatar Nov 10 '25 04:11 fengju0213

hi @fengju0213 would you mind giving a bit of brief on the specifics on the feature? im a bit lost, if not ill let someone else implement this

JINO-ROHIT avatar Nov 10 '25 04:11 JINO-ROHIT

hi @fengju0213 would you mind giving a bit of brief on the specifics on the feature? im a bit lost, if not ill let someone else implement this

Thanks for checking in! Right now the workforce layer only emits lifecycle events (task created/started/completed, etc.) and the callback interface matches that set. There’s no TaskStreamingChunkEvent, and WorkforceCallback has no hook for streaming output, so even though SingleAgentWorker can iterate over AsyncStreamingChatAgentResponse, those chunks never leave the worker.

To implement the feature we need:

  1. Add a TaskStreamingChunkEvent dataclass (task_id, worker_id, chunk, chunk_index) to camel/societies/workforce/events.py and include it in the exported union.
  2. Extend WorkforceCallback (and at least the default WorkforceLogger) with an optional log_task_streaming_chunk method that takes that event.

Let me know if anything there is unclear, happy to dive deeper.

fengju0213 avatar Nov 10 '25 04:11 fengju0213

yeah thanks a lot, this makes sense, ill piece together my understanding in a PR and hopefully we can review this together

JINO-ROHIT avatar Nov 10 '25 05:11 JINO-ROHIT