langchain icon indicating copy to clipboard operation
langchain copied to clipboard

Design challenges with streaming content part deltas and message persistence.

Open vlymar opened this issue 5 months ago • 4 comments

👋 Hello, first off I wanted to thank you for the awesome work you've been doing on this library.

I'm building an agent with a chat interface and have run into some design challenges. I'm hoping to address these either by working together to extend this library, or to get guidance on a workaround.

For context, we're building a chat assistant embedded in our application. For now I'm only testing with the Anthropic provider, and I'm building with stream: true set.

We've found that rendered messages in our chat UI map more to "content blocks" (anthropic api term) than they do to LangChain.Message objects. For example, here is a Message returned by on_message_processed:

%LangChain.Message{
  content: [
    %LangChain.Message.ContentPart{
      type: :thinking,
      content: "Here is my full thinking...",
      options: [ ... ]
    },
    %LangChain.Message.ContentPart{
      type: :text,
      content: "Here is my full response...",
      options: []
    }
  ],
  processed_content: nil,
  index: 1,
  status: :complete,
  role: :assistant,
  name: nil,
  tool_calls: [],
  tool_results: nil,
  metadata: %{
    usage: %LangChain.TokenUsage{ ... },
      cumulative: false
    }
  }
}

This Message has two content blocks. As the deltas are emitted, they first fire with index: 0 (for :thinking) then index: 1 (for :text).

Here are the design challenges:

  1. I need to represent content block boundaries in my agent's API response to our frontend. In Anthropic's API, these boundary events are "content_block_start" and "content_block_stop". OpenAI's new response API has the same notion. Vercel's AI SDK does as well. On the other hand, langchain's delta callback provides no indication of a content block start or finish. I've found that I can infer a content block boundary by sending delta events from a callback to a genserver, and checking for a change from one index to another, but this has issues:
    • Requires stateful aggregation of data in my langchain wrapper to reconstruct data already returned by the provider APIs (I've only verified that anthropic and openAI's Responses API return boundary events).
    • The final content block is the last index, so there's no index increment to detect after it. Can workaround by looking for a status: :completed delta or a on_message_processed callback fire, but that's additional complexity.
    • Depends on each content block being sent consecutively. Maybe this is a safe assumption but I'm not sure.

What do you think about introducing a new field that could be used to detect the end of a content block? I think if langchain announces the end of a content block, my wrapper can infer the start of a new block as well. Perhaps this could be ContentPart.status which is marked as :complete when the provider emits its form of content_block_stop?

  1. I need to persist agent messages so that a) historical chats can be viewed and b) chat history can be loaded as context into multi-turn conversations. I haven't quite figured out how best to do this. I'm conflicted between storing content parts as individual rows (which maps well to my agent's API and our chat interface), vs storing an entire representation of a Langchain.Message together. Regardless, I believe this ticket is related as either way it'd be helpful to have a stable identifier for messages that I could use to address DB rows. But here I'm less confident and more looking for any guidance you have to offer.

I apologize for the length of this ticket. Happy to clarify anything, or split it up into two tickets if that'd help!

vlymar avatar Sep 23 '25 19:09 vlymar

Hi @vlymar!

Thanks for your thoughtful write-up. Here's how I'm handling a similar situation in my own application.

  • I have a set tables to store conversation history in my application. And I'm storing ContentParts as a separate table.
  • I have conversion functions for going from a LangChain.Message to my app's message type and another function for going back the other way.
  • As deltas are streamed back through the callback in a separate process, I send a message to the process showing the UI. It merges the partial delta fragments it into the LLMChain.

Here's a sample of putting that together.

Process that submits the LLMChain to the server:

defp assign_llm_chain(socket) do
  conversation = socket.assigns.conversation

  live_view_pid = self()

  handlers = %{
    on_llm_new_delta: fn _chain, deltas ->
      send(live_view_pid, {:chat_response_deltas, deltas})
    end,
    on_llm_new_message: fn
      _chain, %LCMessage{} = message ->
        send(live_view_pid, {:chat_response, message})
    end
  }

  # convert the DB stored message to LLMChain messages
  chain_messages =
    conversation.id
    |> Messages.list_messages()
    |> Messages.db_messages_to_langchain_messages()

  llm_chain =
    LLMChain.new!(%{llm: setup_model(conversation)})
    |> LLMChain.add_callback(handlers)
    |> LLMChain.add_messages(chain_messages)

  assign(socket, :llm_chain, llm_chain)
end

The LiveView/process responsible for rendering:

def handle_info({:chat_response_deltas, deltas}, socket) do
  # Merge in the deltas. The message processed callback is used to save it and clear the delta.
  updated_chain =
    try do
      LLMChain.merge_deltas(socket.assigns.llm_chain, deltas)
    rescue
      error ->
        Logger.error("Error applying delta: #{inspect(error)}")
        socket.assigns.llm_chain
    end

  socket =
    socket
    |> assign(:llm_chain, updated_chain)
    |> flash_error_if_stopped_for_limit()
    |> scroll_to_bottom()

  {:noreply, socket}
end
  • For the partial delta (not yet complete), I have a separate rendering component that updates as it's streamed back. It renders the current merged delta as a whole
<.delta_display
  :if={@llm_chain}
  id="delta-display"
  show_thinking={@is_debug}
  delta={@llm_chain.delta}  # <- my merged but still incomplete delta
  class="mb-1"
/>
  • When the final delta is received and merged, the llm_chain.delta will be nil and the completed message will be appended to the LLMChain.messages

As the deltas are received, it forwards it to the LiveView (or GenServer) process that's managing things.

The other process merges the chunked deltas into a complete view of the delta received thus far. I render that using llm_chain.delta.

Does this provide any insights and guidance for how it might work in your situation?

brainlid avatar Sep 30 '25 23:09 brainlid

Thanks for the response, and sorry for the delayed response on my end.

That all makes sense, but the part I'm most curious about is this:

I have a set tables to store conversation history in my application. And I'm storing ContentParts as a separate table.

It sounds like we're both storing content parts as individual rows. How do you determine when to start writing a new row? That's the part that's messiest in my application. Or do you wait for on_llm_new_message to fire and then write the whole message set of content parts at once?

vlymar avatar Oct 08 '25 17:10 vlymar

Good question. I don't write the partial message to the database as it's received. It would cause high DB churn/updates for no real benefit. I don't write it the DB until it's fully received or failed or cancelled. But once the message stream is complete.

brainlid avatar Oct 08 '25 20:10 brainlid

That makes sense. Zooming out, it'd still be very useful for us to have a way to detect that we've received the end of a content part. As signal for this being useful to clients, these "content stop" messages are built into anthropic's message API, openAI's response API, and vercel's AI SDK (although they don't exist in openAI's chat completions API).

These could be modeled either as fields on ContentParts that get set when we detect the part is finished, or perhaps a callback that fires when the part is finished. I'm still onboarding to LangChain and provider APIs so I could definitely be missing something.

If you think this fits into LangChain, I'd be happy to take a stab at implementation (with some guidance from you). Otherwise, I'm not blocked - I have this logic in my genserver...it's just a lot of complexity to wrangle something that anthropic already has built into the response stream.

vlymar avatar Oct 08 '25 21:10 vlymar