refit icon indicating copy to clipboard operation
refit copied to clipboard

feature: Support IAsyncEnumerable as a return type

Open sveinungf opened this issue 2 years ago • 23 comments

Is your feature request related to a problem? Please describe. It would be nice if Refit would support streaming deserialization of an API response. This could be useful for endpoints that return a large amount of data, since it could avoid buffering the whole API response in memory. It can also allow the consumer to start enumerating the items before the whole API response has been received.

Describe the solution you'd like Refit is now using System.Text.Json, and with .NET 6, System.Text.Json got support for streaming deserialization. Using it could be as easy as having an IAsyncEnumerable as the return type in an interface:

public interface IApi
{
    [Get("/items")]
    IAsyncEnumerable<Item> GetItems();
}

Describe alternatives you've considered Using Task<IEnumerable<Item>> as the return type, but this will buffer the whole API response in memory.

sveinungf avatar Jan 27 '22 09:01 sveinungf

Hello guys! What's with status? Any progress?

liven-maksim-infotecs avatar Sep 27 '23 13:09 liven-maksim-infotecs

I just got to a point where I needed this too. Say you have a big collection of items returned from the API call. No need to manifest the whole thing in memory in many cases. You just want to spin through the collection and do sometimes for each, async.

This I think could be a huge performance benefit.

Thinking a bit more about it, wouldn't you rather have an

IApiResponse<IAsyncEnumerable<T>>

signature (to inspect the response for faulty situations before trying to consume the observable)? Would that be a problem?

brumlemann avatar Oct 16 '23 08:10 brumlemann

since it could avoid buffering the whole API response in memory

I'm not sure if I understand what this would actually represent internally - what would back the IAsyncEnumerable<T>? The result itself is not being streamed, and there is no standard way to incrementally parse JSON (without constraining it somehow like JSON Lines or sth), so this would effectively be a Task<Lazy<List<T>> - a saved byte buffer that, on request, gets parsed.

anaisbetts avatar Oct 16 '23 12:10 anaisbetts

Streaming deserialization of root-level JSON arrays is already supported by System.Text.Json (the announcement is linked above in the OP) when using JsonSerializer.DeserializeAsyncEnumerable. But IIRC, when I last tried this in Blazor WASM, I had to enable response streaming for each request (using request.SetBrowserResponseStreamingEnabled(true); and HttpCompletionOption.ResponseHeadersRead^1) with a custom DelegatingHandler.

Ghost4Man avatar Oct 16 '23 12:10 Ghost4Man

since it could avoid buffering the whole API response in memory

I'm not sure if I understand what this would actually represent internally - what would back the IAsyncEnumerable<T>? The result itself is not being streamed, and there is no standard way to incrementally parse JSON (without constraining it somehow like JSON Lines or sth), so this would effectively be a Task<Lazy<List> - a saved byte buffer that, on request, gets parsed.

Quick chat with GPT gave the following response. I have NOT checked it for halluzinations ;-)

`using System; using System.Collections.Generic; using System.IO; using System.Net.Http; using System.Text.Json; using System.Threading.Tasks;

public async IAsyncEnumerable<T> GetItemsAsync<T>(string apiUrl) { using var httpClient = new HttpClient(); using var response = await httpClient.GetAsync(apiUrl);

if (!response.IsSuccessStatusCode)
{
    throw new Exception($"Failed to fetch data. Status code: {response.StatusCode}");
}

using var stream = await response.Content.ReadAsStreamAsync();
using var jsonReader = new Utf8JsonReader(stream);

while (await jsonReader.ReadAsync())
{
    if (jsonReader.TokenType == JsonTokenType.StartObject)
    {
        // Deserialize the JSON object as T and yield it.
        var item = JsonSerializer.Deserialize<T>(ref jsonReader);
        yield return item;
    }
}

}

Just for ideas ;-)`

brumlemann avatar Oct 16 '23 12:10 brumlemann

@Ghost4Man Oops, I missed that - that is interesting though I can imagine that a Correct implementation might be Weird; it seems like that's a better fit for IObservable to me (since you can't really control the upstream stream of data via not requesting items, it's Showing Up whether you like it or not - not requesting items would simply result in buffering), though I know that IAsyncEnumerable is the New Hotness that people are excited about.

IO<T> might also be tricky from a type system perspective because the type you're deserializing is fundamentally List<T> but the type you'd return would not be IO<List<T>>, it'd be IO<T>.

anaisbetts avatar Oct 17 '23 11:10 anaisbetts

Not sure I understand above comments. The following code with native HttpClient works well to receive streamed content without buffering. Should be easy to implement in Refit too.

  using var client = new HttpClient();
  IAsyncEnumerable<Item?> result = client.GetFromJsonAsAsyncEnumerable<Item>(uri);
  await foreach (var itm in result)
  {
    if (itm != null)
      Console.WriteLine($"Returned item {itm.Id}.");
  }

SWarnberg avatar Dec 19 '23 13:12 SWarnberg

I don't really see the value of being to asynchronously parse JSON items, it really seems like a hyper-small optimization. If someone has a practical example where their App is Extremely Difficult To Write without this feature I'd reconsider it, but otherwise I think this is WONTFIX

anaisbetts avatar Dec 19 '23 14:12 anaisbetts

If my application transforms response items to objects of different type but the output is relatively large, it's better to convert each item separately rather than loading the whole payload and only then applying the transformation.

voroninp avatar Dec 19 '23 14:12 voroninp

This is a very strange comment and fast closing of something that in some use cases are extremely useful. We're talking about client-side support for Streaming API's. We're not talking about async loading of json into a collection and store in memory. Maybe you should update yourself on streaming api's and IAsyncEnumerable @anaisbetts?

Of course we can cope without this support in refit since there is good support in HttpClient, it's just unfortunate to mix different clients frameworks..

SWarnberg avatar Dec 19 '23 20:12 SWarnberg

**Maybe you should update yourself on streaming api's and IAsyncEnumerable @anaisbetts?

Do you....think that I don't know what these things are? 🙃

Looking at the implementation of GetFromJsonAsAsyncEnumerable, there are two possible benefits here:

  1. You could save a small amount of memory - i.e. instead of using [Size of Response as Bytes] + [Size of JSON as C# objects], you could, in an extremely optimal scenario, allocate [Size of a single Buffer frame] + [Size of JSON as C# objects]. This would require you to be in absolute perfect lockstep (with absolutely no way of ensuring it is the case), since if you fell behind calling MoveNext on the IAsyncEnumerator even a little bit, you will start to Buffer the response. This is largely Not Worth It in my opinion. If your scenario is that performance-sensitive that response buffer sizes are a concern, you would do better hand-writing the method anyways.

  2. This could add conditional cancellation to the Task-based API - i.e. you partially deserialize the result, decide "Eh, don't care", then you kill the IAsyncEnumerator by DisposeAsync'ing it, the underlying network request is cancelled. This is what GetFromJsonAsAsyncEnumerable does. This is at least slightly more compelling, though the IObservable API already supports cancellation via unsubscribing before the request completes (though not conditionally, you have to blindly cancel early). To be honest, while this is likely a Real benefit for someone, I am extremely skeptical that anyone will actually take advantage of it in practice, because you would have to Know About It, Need this Feature, then be smart enough to Dispose the enumerator early.

anaisbetts avatar Dec 20 '23 16:12 anaisbetts

Not Worth It in my opinion.

Why do you think it's that rare thing? It looks like a perfect case for ETL processing pipeline built with a chain of microservices.

voroninp avatar Dec 20 '23 16:12 voroninp

@voroninp I don't think that this scenario is rare, but I do think that it's rare that you are so short on memory that you cannot afford to buffer the response and parse it all at once. I think this optimization might be Intellectually Satisfying but ultimately not really meaningful in real-world apps.

anaisbetts avatar Dec 20 '23 16:12 anaisbetts

I do think that it's rare that you are so short on memory that you cannot afford to buffer the response and parse it all at once.

In theory enumerable can be infinite. I mean, if we speak about streaming, the stream can potentially contain a gazillion of items without any chance to put them all into memory.

Also, I see Refit as a nice library to be consumed from Blazor. And saving on memory allocations there is very important for performance.

voroninp avatar Dec 20 '23 16:12 voroninp

I would expect the common use-case to be to use this as a simple way to display/process individual items in real-time (as soon as they arrive), not waiting for the whole response which might take very long to finish (results of a complicated search/query, results from a test runner, etc.).

Ghost4Man avatar Dec 20 '23 17:12 Ghost4Man

My use-case is I might be receiving very large amounts of data, and it's very useful to be able to have subsequent requests pick up from where we leave off when the initial request fails due to poor internet.

Line- avatar Dec 21 '23 14:12 Line-

it's very useful to be able to have subsequent requests pick up from where we leave off when the initial request fails due to poor internet.

Now that's interesting, can you show an example of how that works in code with HttpClient?

anaisbetts avatar Dec 22 '23 07:12 anaisbetts

it's very useful to be able to have subsequent requests pick up from where we leave off when the initial request fails due to poor internet.

Now that's interesting, can you show an example of how that works in code with HttpClient?

There's nothing special to it, the example that @SWarnberg wrote above covers it - the fact we're able to iterate on results as we receive them means if I'm retrieving 5 million objects but get disconnected, if the last one I've retrieved was number 1,500,000 then when I regain connectivity I can make a new API request that only asks for the remaining items, so every request can make some progress.

Line- avatar Dec 22 '23 09:12 Line-

I'll reopen this since people have come up with some compelling use-cases

anaisbetts avatar Dec 22 '23 09:12 anaisbetts

The obvious case for this is OpenAI/Azure OpenAI. The mime-type will be application/x-ndjson.

When it is x-ndjson as a mime type then it should make the call and stream the result parsing each of the lines as ndjson.

There's already an extension on the HttpClient for a get that does this. But here's the equivalent with a post:

	public static async IAsyncEnumerable<TResponse> PostFromJsonAsAsyncEnumerable<TRequest, TResponse>(this HttpClient client, string requestUri, TRequest request, [EnumeratorCancellation] CancellationToken cancellationToken = default) {
		var requestMessage = new HttpRequestMessage(HttpMethod.Post, requestUri);

		using var ms = new MemoryStream();
		await JsonSerializer.SerializeAsync(ms, request, JsonSerializerSettings.Options, cancellationToken: cancellationToken);
		ms.Position = 0;

		using var requestContent = new StreamContent(ms);
		requestMessage.Content = requestContent;

		requestContent.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json");
		requestMessage.Headers.Add("Accept", "application/json");

		using var response = await client.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken);

		response.EnsureSuccessStatusCode();

		var items = response.Content.ReadFromJsonAsAsyncEnumerable<TResponse>(cancellationToken);

		await foreach (var item in items) {
			if (cancellationToken.IsCancellationRequested)
				throw new TaskCanceledException();
			if (item is null)
				continue;

			yield return item;
		}
	}

I would expect that this specific use case is going to come up a lot as people start using AI and want to stream the response back.

Here's the Refitter equivalent request for automatic generation:

Refitter Issue

JohnGalt1717 avatar Jan 18 '24 19:01 JohnGalt1717

@JohnGalt1717 I would argue that the OpenAI streaming case would actually be better served by IObservable, and detecting application/x-ndjson and being able to stream results would definitely be a PR I'd be interested in

anaisbetts avatar Jan 20 '24 18:01 anaisbetts

Well it really isn't an observable. It's an IAsyncEnumerable that runs until completion. It's the identical case to when you return IAsyncEnumerable for MinimalApis right now. They just don't set the content type correctly to application/x-ndjson as they should despite that being exactly what happens and exactly what the extension method on HttpClient assumes its getting.

It isn't like, later, much longer down the road, OpenAI will then go and pump something else into the Iasyncenumerable like you could with say GRPC server streams. It runs until done and then is closed. IObservable really is only for the later case like GRPC not for a stream of messages that you know ends.

So I'd suggest that this just handles the IAsyncEnumerable case properly and doesn't really care about the content type, and just makes the assumption that it's ndjson despite the content type not being set in some cases. It should just map to the default Get extension method, and should implement the suggested Post version that I gave for post/put and call it a day because that's literally what you'd expect a client to have done if you were to write it by hand.

JohnGalt1717 avatar Jan 20 '24 18:01 JohnGalt1717