How to stream an HTTP response with connexion 3 and AsyncApp?
Description
I want to stream HTTP responses with connexion 3 + AsyncApp.
Expected behaviour
I would expect something like this to work:
async def async_gen_numbers():
for i in range(10):
await asyncio.sleep(1)
yield str(i)
async def streaming():
# option 1
return async_gen_numbers(), 200, {'Content-Type': 'text/plain'}
# option 2
return ConnexionResponse(status_code=200, content_type='text/plain', body=async_gen_numbers(), headers={'transfer-encoding': 'chunked'}, is_streamed=True)
Actual behaviour
Neither of the two approaches listed above works. In both cases there's an error like this:
ERROR: Request ID e5e552a9-7835-406f-a7fe-89501b09f73e - 'async_generator' object has no attribute 'encode'
Also I couldn't find any docs on how to do this with AsyncApp, only with FlaskApp.
Steps to reproduce
- Create a minimum connexion 3 app with AsyncApp.
- Use above code
Additional info:
Output of the commands:
-
python --version: 3.11.9 -
pip show connexion | grep "^Version\:": 3.0.6
Hi @mr-flannery ,
I'll have to look into the is_streamed parameter, it seems that it is not correctly used.
In the meantime, can you try by using the underlying Starlette StreamingResponse?
Hi @Ruwann ,
thanks for the quick response! Your suggestion did solve the problem!
from starlette.responses import StreamingResponse
# ...
async def streaming():
return StreamingResponse(async_gen_numbers(), status_code=200, media_type='text/plain')
Hi @Ruwann ,
unfortunately, my response was a bit premature. It looks like it doesn't actually work as expected. While the code does run, it does not actually stream. From a caller's perspective it's still synchronous.
When calling the endpoint with curl, it still blocks for 10 seconds before anything happens, i.e.:
$ curl -N http://localhost:8080/streaming
# nothing happens for 10 seconds
0123456789
$
I wrote an Express app to make sure this was actually the server's fault, not curls:
const Express = require('express');
const app = new Express();
async function* generateNumbers() {
for (const i of [1, 2, 3, 4, 5]) {
await new Promise(resolve => setTimeout(resolve, 1000));
yield await Promise.resolve(i);
}
}
app.get('/streaming', async (req, res) => {
for await (const i of generateNumbers()) {
res.write(`data: ${i}\n\n`);
}
res.end();
});
app.listen(3456, () => {
console.log('Server is running on port 3456');
})
When using curl here, this works as expected, i.e. it sends a chunk of data every second.
Therefore, I believe there might actually be a bug with sending StreamingResponses in connexion3 + AsyncApp.
I cannot seem to reproduce the issue. When I use StreamingResponse from starlette, the response is streamed to my terminal as expected.
I used the following sample code: https://github.com/Ruwann/connexion-streaming
Hi @Ruwann ,
thanks for the repo, I was able to track it down. This is what causes the behavior to change:
app.add_api("openapi.yaml", validate_responses=True)
Once I add validate_responses=True to your code, it exhibits the same problem. Vice versa, if I remove it from my code, streaming works as expected.
Hi @Ruwann ,
just wanted to check in with you and see if there are there any plans to fix this?
Best regards!
An easy way to handle that would be to have the possibility to disable response body validation on per route basis
This seem to work, though it is probably not future proof.
validator_map = {
"response": MediaTypeDict(
{
'application/jsonlines+json': StreamResponseBodyValidator
}
),
}
class StreamResponseBodyValidator(AbstractResponseBodyValidator):
def wrap_send(self, send):
"""Disable validation, leaving stream untouched"""
return send
def _parse(self, stream: t.Generator[bytes, None, None]) -> t.Any: # type: ignore
return stream
def _validate(self, body: dict):
pass