koog icon indicating copy to clipboard operation
koog copied to clipboard

Usage is always null when streaming with OpenAILLMClient

Open croemmich opened this issue 2 months ago • 0 comments

When streaming an LLM response (requestLLMStreaming) using the OpenAILLMClient against xAI (and likely OpenAI proper), the ResponseMetaInfo token counts are always null. There is an opt-in feature, stream_options.include_usage=true, that:

If set, an additional chunk will be streamed before the data: [DONE] message. The usage field on this chunk shows the token usage statistics for the entire request, and the choices field will always be an empty array.

The problem however is that this chunk arrives after the chunk with a finishReason that currently causes StreamFrame.End event to be emitted.

I was able to get it working with the following client change that capture the finishReason and usage, however, I don't see a way to make a PR to fix this without breaking the StreamFrameFlowBuilder.processStreamingChunk(chunk: TStreamResponse) interface. Adding the streamOptions llm param properly shouldn't be a problem.

Thoughts on how I should proceed?

class StreamingUsageTrackingOpenAILLMClient(apiKey: String, private val settings: OpenAIClientSettings) : OpenAILLMClient(apiKey, settings) {
    override fun executeStreaming(
        prompt: Prompt,
        model: LLModel,
        tools: List<ToolDescriptor>,
    ): Flow<StreamFrame> {
        logger.debug { "Executing streaming prompt: $prompt with model: $model" }
        model.requireCapability(LLMCapability.Completion)

        val messages = convertPromptToMessages(prompt, model)
        val request = serializeProviderChatRequest(
            messages = messages,
            model = model,
            tools = tools.map { it.toOpenAIChatTool() },
            toolChoice = prompt.params.toolChoice?.toOpenAIToolChoice(),
            params = prompt.params,
            stream = true,
        )
        val requestJson = json.parseToJsonElement(request).jsonObject
        val streamOptions = JsonObject(mapOf("include_usage" to JsonPrimitive(true)))
        val requestJsonWithStreamOptions = JsonObject(requestJson.plus("stream_options" to streamOptions))

        var finishReason: String? = null
        var usage: OpenAIUsage? = null
        return buildStreamFrameFlow {
            httpClient.sse(
                path = settings.chatCompletionsPath,
                request = json.encodeToString(requestJsonWithStreamOptions),
                requestBodyType = String::class,
                dataFilter = { it != "[DONE]" },
                decodeStreamingResponse = ::decodeStreamingResponse,
                processStreamingChunk = { it },
            ).onCompletion {
                emitEnd(finishReason, createMetaInfo(usage))
            }.collect { chunk ->
                if (chunk.usage != null) {
                    usage = chunk.usage
                }
                chunk.choices.firstOrNull()?.let { choice ->
                    choice.delta.content?.let { emitAppend(it) }
                    choice.delta.toolCalls?.forEach { openAIToolCall ->
                        val index = openAIToolCall.index
                        val id = openAIToolCall.id
                        val functionName = openAIToolCall.function?.name
                        val functionArgs = openAIToolCall.function?.arguments
                        upsertToolCall(index, id, functionName, functionArgs)
                    }
                    choice.finishReason?.let { finishReason = choice.finishReason }
                }
            }
        }
    }
}

croemmich avatar Nov 04 '25 19:11 croemmich