Handle StreamingResponse and Server-Sent Events
Use Case
I want to display a Progress bar for process running in the backend. I am also writing the API so I can make it anything: a streaming response, SSE or even a websocket. I managed to make rtk-query work with all three of these solutions but only for GET requests. In my case, the process is triggered by a POST request and I seem unable to make it work.
using onCacheEntryAdded
I would like to see rtk-query handling streaming responses and/or SSE. I made a simple version work with a StreamingResponse using the onCacheEntryAdded prop similar to how it's used in the docs for websockets. It feels quite quirky because, I have to set the original queryFn: () => ({data: null}) and then write the whole fetching logic inside the onCacheEntryAdded function.
async onCacheEntryAdded(arg, { cacheDataLoaded }) {
console.log('here', arg);
await cacheDataLoaded;
// Would be nice if I had access to the baseQuery here
const response = await fetch(
`${process.env.NEXT_PUBLIC_API_URL}/risk_profiles/`
);
const reader = response.body
.pipeThrough(new TextDecoderStream())
.getReader();
let done, value;
while (!done) {
({ value, done } = await reader.read());
// Overwrite the data value
updateCachedData(() => value);
}
}
Without having looked at the code, I could imagine that maybe a returned ReadableStream response could be handled differently. Unfortunately, I can imagine two ways consecutive responses should be handled
a) By overwriting the previous value b) By appending to a list
I'd say b) is the more general case and could be the default.
In any case, above works for a get request. Unfortunately, the isLoading is already false after it ran the queryFn and I haven't found a way to change it back to true.
The bigger issue is that this doesn't work for mutations like POST requests. While the mutation builder has onCacheEntryAdded property, it has no access to updateCachedData. Is there any specific reason for this? I think adding that to the props would be enough for me to hack a solution together for my use case. Even now I might be able to use the dispatch function, but I would love to see an easier way to do this in rtk-query.
I'm also interested in this being officially supported. My current workaround involves passing stream handlers as an API arg:
const [apiStream] = useApiStreamMutation();
...
const onSubmit = () => {
apiStream({
data,
// @ts-ignore - ideally I don't need this ignore
onChunk: (chunk) => {
// do something with each chunk
}
}).unwrap().then((chunks) => {
// do something with all chunks - requires type casting to an array
})
}
which I use when I extend the endpoint definition:
apiStream: {
query: (
queryArg: ApiStreamApiArg & CustomStreamHandlers<ApiStreamResponse>
) => ({
url: `/api/stream`,
method: "POST",
body: queryArg.data,
responseHandler: (response) => {
const reader = response.body.getReader();
let done, value;
while (!done) {
({ value, done } = await reader.read());
queryArg.onChunk(value);
}
},
}),
}
This is not ideal because I have to do a bunch of manual type casting, serialization ignoring, and type ignoring in order to get this to work. Ideally there's a solution provided by rtkq out of the box
@dvargas92495 : what would a notional API change look like here?
There are four things that I care about:
- when the api started receiving chunks
- when the api has received a chunk
- when the api is done receiving chunks
- a way to cancel all future chunks from arriving
Here's one possible way I can think of specifying this, but I'm not at all married to it:
const [apiStream, {
isStreaming,
data, // or chunk
cancelStreaming,
}] = useApiStreamMutation();
const onSubmit = () => {
apiStream().unwrap().then((fullData: (typeof data)[]) => {
notifyUserWeAreDoneStreaming();
handleAllData(fullData);
})
}
React.useEffect(() => {
if (isStreaming) {
notifyUserWeStartedStreaming();
}
}, [isStreaming]);
React.useEffect(() => {
if (data) {
handleChunkFromApi(data);
}
}, [data]);
const onUserClickCancel = () => {
cancelStreaming();
}
This is increasingly relevant as we are seeing a growing number of applications interacting with LLM's. In these instances, we're expecting to send POST requests/ mutations to endpoints where we receive only a streamed response. @dvargas92495 's sample would work very well.
@markerikson would your team be open to proposals to help move this forward or is this something you guys want to handle internally as part of the Post 2.0 effort?
@dvargas92495 we're always open to proposals and potential PRs, yeah.
@dvargas92495 we're always open to proposals and potential PRs, yeah.
okay so let's just give the possibility to return a readablestream from RTK query
I'm using useLazyQuery for this one,
inside I"m handling it like the websocket / sse in RTK Query documentation.
the streaming is available on the result
const [callOpenai, callOpenaiResult] = useLazyCall()
return (<div><button onClick={async ()=>await callOpenai()}></button><div>{callOpenaiResult}</div></div>)
In conclusion. Is it possible to read a stream from a POST request in chunks with StreamingResponse without workarounds? Which is directly supported by Redux Toolkit (not sockets)
What was the solution to this?
@johannao76 : at the moment, there isn't one.
At the moment this isn't on our roadmap, largely because I don't have experience with anything like this and don't know what an appropriate API design would look like.
I see a suggestion upstream to "allow returning a ReadableStream from a query", but I'm not sure what that implies.
How do other libraries like React Query handle this, if at all? If folks could point to other similar examples in the ecosystem that I could look at and investigate, that would at least provide a starting point to understand the problem and potential solutions.
I used fetchEventSource from @microsoft/fetch-event-source
I wrote a basic handler with this stream handler
const baseQueryWithEventSource: BaseQueryFn<
{
url: string;
method?: 'GET' | 'POST' | 'PUT' | 'DELETE';
body?: any;
headers?: Record<string, string>;
onMessage?: (message: string) => void;
},
unknown,
unknown
> = async ({ url, method, body, headers = {}, onMessage },api,extraOptions) => {
try {
const rs = await fetchEventSource(`${process.env.BASE_API}${url}`, {
method,
headers,
body: body ? JSON.stringify(body) : undefined,
async onopen(response) {
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
return Promise.resolve();
},
onmessage(event) {
if (onMessage) {
onMessage(event.data);
}
},
onclose() {
console.log('Connection closed');
},
onerror(err) {
console.error('Error occurred:', err);
throw err;
},
});
return { data: null }; // Return null if the stream completes successfully
} catch (error) {
console.error('Error in fetchEventSource:', error);
return { error };
}
};
We connect this handler to the API
export const streamApi = createApi({
reducerPath: 'streamApi',
tagTypes: 'YourTagName',
baseQuery: baseQueryWithEventSource,
endpoints: () => ({}),
});
In endpoint you can use base query with this handler
const response = await baseQuery({
url: `yourURL`,
method: 'POST',
body: arg,
onMessage: (message: string) => {
// your logic
},
});
When I use onMessage I can process incoming messages including directly saving them to the cache
This is the most roundabout, but working way. The solution is not ideal, but in my case it helped to catch incoming messages