Usage is always null when streaming with OpenAILLMClient
When streaming an LLM response (requestLLMStreaming) using the OpenAILLMClient against xAI (and likely OpenAI proper), the ResponseMetaInfo token counts are always null. There is an opt-in feature, stream_options.include_usage=true, that:
If set, an additional chunk will be streamed before the data: [DONE] message. The usage field on this chunk shows the token usage statistics for the entire request, and the choices field will always be an empty array.
The problem however is that this chunk arrives after the chunk with a finishReason that currently causes StreamFrame.End event to be emitted.
I was able to get it working with the following client change that capture the finishReason and usage, however, I don't see a way to make a PR to fix this without breaking the StreamFrameFlowBuilder.processStreamingChunk(chunk: TStreamResponse) interface. Adding the streamOptions llm param properly shouldn't be a problem.
Thoughts on how I should proceed?
class StreamingUsageTrackingOpenAILLMClient(apiKey: String, private val settings: OpenAIClientSettings) : OpenAILLMClient(apiKey, settings) {
override fun executeStreaming(
prompt: Prompt,
model: LLModel,
tools: List<ToolDescriptor>,
): Flow<StreamFrame> {
logger.debug { "Executing streaming prompt: $prompt with model: $model" }
model.requireCapability(LLMCapability.Completion)
val messages = convertPromptToMessages(prompt, model)
val request = serializeProviderChatRequest(
messages = messages,
model = model,
tools = tools.map { it.toOpenAIChatTool() },
toolChoice = prompt.params.toolChoice?.toOpenAIToolChoice(),
params = prompt.params,
stream = true,
)
val requestJson = json.parseToJsonElement(request).jsonObject
val streamOptions = JsonObject(mapOf("include_usage" to JsonPrimitive(true)))
val requestJsonWithStreamOptions = JsonObject(requestJson.plus("stream_options" to streamOptions))
var finishReason: String? = null
var usage: OpenAIUsage? = null
return buildStreamFrameFlow {
httpClient.sse(
path = settings.chatCompletionsPath,
request = json.encodeToString(requestJsonWithStreamOptions),
requestBodyType = String::class,
dataFilter = { it != "[DONE]" },
decodeStreamingResponse = ::decodeStreamingResponse,
processStreamingChunk = { it },
).onCompletion {
emitEnd(finishReason, createMetaInfo(usage))
}.collect { chunk ->
if (chunk.usage != null) {
usage = chunk.usage
}
chunk.choices.firstOrNull()?.let { choice ->
choice.delta.content?.let { emitAppend(it) }
choice.delta.toolCalls?.forEach { openAIToolCall ->
val index = openAIToolCall.index
val id = openAIToolCall.id
val functionName = openAIToolCall.function?.name
val functionArgs = openAIToolCall.function?.arguments
upsertToolCall(index, id, functionName, functionArgs)
}
choice.finishReason?.let { finishReason = choice.finishReason }
}
}
}
}
}