claude-agent-sdk-python icon indicating copy to clipboard operation
claude-agent-sdk-python copied to clipboard

feat: add accumulate_streaming_content option for automatic TextBlock streaming

Open aetherwu opened this issue 2 months ago • 4 comments

Summary

Adds automatic accumulation of streaming text/thinking deltas into TextBlock and ThinkingBlock objects, making it much easier to build real-time UIs that display text as it streams from the LLM.

Changes

  • New option: accumulate_streaming_content in ClaudeAgentOptions (default: False)
  • New component: StreamAccumulator class that tracks and accumulates streaming deltas
  • Enhanced streaming: Emits partial AssistantMessage objects with growing content blocks alongside raw StreamEvent objects
  • Full backward compatibility: Existing code works unchanged

Before (Manual Accumulation)

accumulated = {}
async for message in client.receive_messages():
    if isinstance(message, StreamEvent):
        event = message.event
        if event.get("type") == "content_block_delta":
            index = event.get("index", 0)
            delta = event.get("delta", {})
            if delta.get("type") == "text_delta":
                if index not in accumulated:
                    accumulated[index] = ""
                accumulated[index] += delta.get("text", "")

After (Automatic Accumulation)

options = ClaudeAgentOptions(
    include_partial_messages=True,
    accumulate_streaming_content=True,  # ✨ Enable automatic accumulation
)

last_text = ""
async for message in client.receive_messages():
    if isinstance(message, AssistantMessage):
        for block in message.content:
            if isinstance(block, TextBlock):
                new_text = block.text[len(last_text):]
                print(new_text, end="", flush=True)
                last_text = block.text

Key Features

  • ✅ Accumulates text, thinking, and tool use blocks
  • ✅ Supports multiple content blocks per message
  • ✅ Handles multiple concurrent sessions independently
  • ✅ Preserves parent_tool_use_id for subagent messages
  • ✅ Comprehensive test coverage (6 new tests, all 120 tests pass)
  • ✅ Type-safe (mypy compliant)
  • ✅ Fully backward compatible

Files Changed

Modified:

  • src/claude_agent_sdk/types.py - Added option
  • src/claude_agent_sdk/client.py - Pass option to Query
  • src/claude_agent_sdk/_internal/client.py - Pass option to Query
  • src/claude_agent_sdk/_internal/query.py - Integration logic

New:

  • src/claude_agent_sdk/_internal/stream_accumulator.py - Core accumulator logic
  • tests/test_stream_accumulator.py - Unit tests
  • examples/streaming_textblock_accumulation.py - Usage examples

Testing

# All tests pass
python -m pytest tests/ -k "not e2e" -q  # ✅ 120 passed

# Linting passes
python -m ruff check src/ tests/  # ✅

# Type checking passes
python -m mypy src/  # ✅

Resolves

Closes #164

🤖 Generated with Claude Code

aetherwu avatar Oct 21 '25 03:10 aetherwu

Tested this branch with Claude Code. Works as described - text streams incrementally instead of arriving in complete blocks.

Test setup

#!/usr/bin/env python3.11

import asyncio
import sys
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, AssistantMessage, TextBlock

async def main():
    options = ClaudeAgentOptions(
        model='claude-sonnet-4-5-20250929',
        include_partial_messages=True,
        accumulate_streaming_content=True,
    )

    async with ClaudeSDKClient(options) as client:
        await client.query("Write a two paragraph story about squirrels")

        last_text = ""
        async for message in client.receive_response():
            if not isinstance(message, AssistantMessage):
                continue

            for block in message.content:
                if not isinstance(block, TextBlock):
                    continue

                current_text = block.text
                if len(current_text) >= len(last_text):
                    new_text = current_text[len(last_text):]
                    print(new_text, end='', flush=True)
                    last_text = current_text

        print()

if __name__ == '__main__':
    asyncio.run(main())

Reproduction

git clone https://github.com/aetherwu/claude-agent-sdk-python.git
cd claude-agent-sdk-python
git checkout feat/streaming-textblock-accumulation
python3.11 -m venv venv
./venv/bin/pip install .
./venv/bin/python3.11 test-streaming.py

Characters appear as they're generated rather than waiting for complete blocks. Without accumulate_streaming_content=True, text arrives in chunks with noticeable delays.

The key difference: before this PR you had to manually extract and accumulate text from event.event.get("delta", {}).get("text", ""). Now TextBlock objects get updated automatically with accumulated content.

Would like to see this merged. The current behavior of waiting for complete blocks before updating TextBlock content creates poor UX for longer responses.

KJ7LNW avatar Oct 27 '25 06:10 KJ7LNW

Do you support parameter retrieval for streaming tools, not just text increments?

vassain avatar Oct 27 '25 09:10 vassain

Do you support parameter retrieval for streaming tools, not just text increments?

Could you please explain more on this scene?

aetherwu avatar Oct 29 '25 08:10 aetherwu