Implement new streaming ask endpoint (WIP)
Motivation and Context (Why the change? What's the scenario?)
Draft PR for issue https://github.com/microsoft/kernel-memory/issues/100 . This draft pr was very briefly discussed in discord. The changes still need work. currently missing:
- more examples (serverless, service, curl,...)
- ~~Get the streaming working in WebClient, it currently still buffers the response (i was not able to find a fix)~~
High level description (Approach, Design)
Used existing AskAsync method as reference point for most of the code. The new streaming endpoint does not return a MemoryAnswer, but returns the actual text result in an async enumerable. If sources are needed the SearchAsync method can still be used to fetch them with minimal extra performance overhead.
@microsoft-github-policy-service agree
Also I want to note, the way I have this implemented currently is just returning the whole answer as a stream of strings. This might not be optimal as the code that consumes these methods needs to call SearchAsync() if they want the sources, and this would result in an extra DB query.
Another implementation i was thinking of was returning a stream of PartialMemoryAnswer(MemoryAnswer with nullable properties), with the first object returning the metadata (such as facts/citations), and the parts after the first one only containing their pieces of text. This would also mean that the object can be extended upon in the future.
example:
var noAnswerFound = new PartialMemoryAnswer
{
Question = question,
NoResult = true,
Result = this._config.EmptyAnswer,
};
StringBuilder bufferedAnswer = new(); // Buffering result in memory to count chars and check if not empty answer
bool finishedRequiredBuffering = false;
var watch = Stopwatch.StartNew();
await foreach (var x in this.GenerateAnswerAsync(question, facts.ToString()).WithCancellation(cancellationToken).ConfigureAwait(false))
{
if (x is null || x.Length == 0)
{
continue;
}
bufferedAnswer.Append(x);
int currentLength = bufferedAnswer.Length;
if (!finishedRequiredBuffering)
{
if (currentLength <= this._config.EmptyAnswer.Length && ValueIsEquivalentTo(bufferedAnswer.ToString(), this._config.EmptyAnswer))
{
this._log.LogTrace("Answer generated in {0} msecs. No relevant memories found", watch.ElapsedMilliseconds);
noAnswerFound.NoResultReason = "No relevant memories found";
yield return noAnswerFound;
yield break;
}
else if (currentLength > this._config.EmptyAnswer.Length)
{
finishedRequiredBuffering = true;
yield return new PartialMemoryAnswer
{
Question = question,
NoResult = false,
Result = bufferedAnswer.ToString(),
RelevantSources = citationList,
};
}
}
if (finishedRequiredBuffering)
{
yield return new PartialMemoryAnswer
{
Result = x
};
}
if (this._log.IsEnabled(LogLevel.Trace) && currentLength >= 30)
{
this._log.LogTrace("{0} chars generated", currentLength);
}
}
some conflicts to address
some conflicts to address
if it helps - I rebased and fixed some issues => https://github.com/chaelli/kernel-memory/tree/100-streaming-askAsync-response not sure how to handle this. PR into @JonathanVelkeneers' branch? Or create a separat PR so that the validation tasks can run? Anyway... probably should do some more testing before that.
some conflicts to address
Resolved these
any reason this is waiting? do we need to add SSE format for the endpoint? (if so - maybe this works https://github.com/microsoft/kernel-memory/discussions/625#discussioncomment-9698298 - did for my quick test in the browser - but my experience with SSE are minimal ;))
@dluc just checking in, can this be merged soon please? We're about to build a UI using Kernel Memory and streaming is a must-have
@JonathanVelkeneers can you please check as well? Thanks heaps, would be so great to have streaming capabilities, and I believe we're not the only one :)
@dluc just checking in, can this be merged soon please? We're about to build a UI using Kernel Memory and streaming is a must-have
@JonathanVelkeneers can you please check as well? Thanks heaps, would be so great to have streaming capabilities, and I believe we're not the only one :)
sorry to keep this on hold but I cannot merge yet. I'd suggest to work with a fork not to be blocked. It's an important feature but there's some other critical work in progress.
@dluc thanks for the reply, understood. Do you by any chance have a rough ETA when you will be ready to merge it?
We've already forked the repo so we can auto deploy it to our infra, so the service side is no big deal.
However, in our apps we're using the nuget package Microsoft.KernelMemory.WebClient which we'd then have to build ourselves.
@JonathanVelkeneers I played around a bit with SSE and now have a working solution that does "clean" SSE (via get). Basically, I use two endpoints - one that starts the answer generation and returns an id and one that then streams the answer:
public static void AddAskStreamEndpoint(
this IEndpointRouteBuilder builder, string apiPrefix = "/", IEndpointFilter? authFilter = null)
{
RouteGroupBuilder group = builder.MapGroup(apiPrefix);
// Ask streaming endpoint
var route = group.MapPost(Constants.HttpAskStreamEndpoint, async Task<IResult> (
HttpContext context,
MemoryQuery query,
IKernelMemory service,
ILogger<KernelMemoryWebAPI> log,
CancellationToken cancellationToken) =>
{
log.LogTrace("New search request, index '{0}', minRelevance {1}", query.Index, query.MinRelevance);
var askId = Guid.NewGuid().ToString("N").Substring(0, 8);
s_askStreams.Add(askId, service.AskStreamingAsync(
question: query.Question,
index: query.Index,
filters: query.Filters,
minRelevance: query.MinRelevance,
cancellationToken: cancellationToken));
return Results.Ok(new AskStreamResponse { AskId = askId });
})
.Produces<AskStreamResponse>(StatusCodes.Status200OK)
.Produces<ProblemDetails>(StatusCodes.Status401Unauthorized)
.Produces<ProblemDetails>(StatusCodes.Status403Forbidden);
if (authFilter != null) { route.AddEndpointFilter(authFilter); }
// SSE endpoint
var sseRoute = group.MapGet($"{Constants.HttpAskStreamEndpoint}/{{askId}}", async Task (
HttpContext context,
ILogger<KernelMemoryWebAPI> log,
CancellationToken cancellationToken,
string askId) =>
{
log.LogTrace("Accessing SSE stream for askId '{0}'", askId);
context.Response.Headers.Add("Content-Type", "text/event-stream");
if (s_askStreams.TryGetValue(askId, out IAsyncEnumerable<MemoryAnswer>? stream))
{
s_askStreams.Remove(askId);
var jsonOptions = new JsonSerializerOptions { DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingDefault };
var response = context.Response;
if (stream != null)
{
await foreach (var ma in stream)
{
await response.WriteAsync($"data: {JsonSerializer.Serialize(new { message = ma }, jsonOptions)}\n\n", cancellationToken).ConfigureAwait(false);
await response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
}
// Send an end-of-stream event if needed
await response.WriteAsync("event: end\ndata: End of stream\n\n", cancellationToken).ConfigureAwait(false);
await response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
return;
}
}
context.Response.StatusCode = StatusCodes.Status404NotFound;
await context.Response.WriteAsync("Not found", cancellationToken).ConfigureAwait(false);
});
When requesting the answer from the browser, this is the basic functionality:
"headers": {
"accept": "*/*",
"authorization": "...",
"cache-control": "no-cache",
"Content-Type": "application/json"
},
"body": "{\"question\":\"was macht viu?\",\"index\":\"viuch-largeembedding\"}",
"method": "POST",
"mode": "cors",
"credentials": "include"
});
const responseData = await response.json();
const askId = responseData.askId;
const sseUrl = `http://localhost:9001/ask/stream/${askId}`;
const eventSource = new EventSource(sseUrl);
let responseText = '';
eventSource.onmessage = function(event) {
const responeObject = JSON.parse(event.data);
responseText += responeObject.message.text;
console.log(responseText);
};
eventSource.addEventListener('end', function(event) {
console.log('End of stream');
eventSource.close(); // Close the SSE connection
});
maybe you can add this to your branch?
@dluc does it make sense for us to rebase this? or will it take much longer to merge? And @JonathanVelkeneers if you do rebase (or merge & fix) - I'd suggest you extract the fact generation & token calculation into a separate method - otherwise we'll always need to fix this stuff twice if it changes. I can certainly help - but I'd rather only invest the time once we know theres a good chance it will be merged
@dluc does it make sense for us to rebase this? or will it take much longer to merge? And @JonathanVelkeneers if you do rebase (or merge & fix) - I'd suggest you extract the fact generation & token calculation into a separate method - otherwise we'll always need to fix this stuff twice if it changes. I can certainly help - but I'd rather only invest the time once we know theres a good chance it will be merged
no worries I'll take care of the rebase ;-)
Is this feature still gonna be implemented?
Is this feature still gonna be implemented?
Apologies for the delay, I had to put this topic on hold, and we currently have two PRs for the same feature. I’ll need to review and decide the best approach. Unfortunately, no ETA at the moment
Update: for this feature to be merged, there's a couple of things to do:
- Check this similar PR https://github.com/microsoft/kernel-memory/pull/726 and decide which approach to take
- Support content moderation. The stream of tokens needs to be validated while streamed, on a configurable frequence. If at any point the text moderation fails, the stream needs to be reset, e.g. sending a special token or similar.
@JonathanVelkeneers It would be great if you could complete this feature. Thanks
@JonathanVelkeneers thanks for helping! - There were two PRs addressing the same feature, and I had to decide between them. The Response Streaming implementation is now merged into main. For more details, please refer to PR #726.