connexion icon indicating copy to clipboard operation
connexion copied to clipboard

How to stream an HTTP response with connexion 3 and AsyncApp?

Open mr-flannery opened this issue 1 year ago • 8 comments

Description

I want to stream HTTP responses with connexion 3 + AsyncApp.

Expected behaviour

I would expect something like this to work:

async def async_gen_numbers():
  for i in range(10):
    await asyncio.sleep(1)
    yield str(i)

async def streaming():
    # option 1
    return async_gen_numbers(), 200, {'Content-Type': 'text/plain'}
    # option 2
    return ConnexionResponse(status_code=200, content_type='text/plain', body=async_gen_numbers(), headers={'transfer-encoding': 'chunked'}, is_streamed=True)

Actual behaviour

Neither of the two approaches listed above works. In both cases there's an error like this:

ERROR:    Request ID e5e552a9-7835-406f-a7fe-89501b09f73e - 'async_generator' object has no attribute 'encode'

Also I couldn't find any docs on how to do this with AsyncApp, only with FlaskApp.

Steps to reproduce

  1. Create a minimum connexion 3 app with AsyncApp.
  2. Use above code

Additional info:

Output of the commands:

  • python --version: 3.11.9
  • pip show connexion | grep "^Version\:": 3.0.6

mr-flannery avatar May 16 '24 15:05 mr-flannery

Hi @mr-flannery , I'll have to look into the is_streamed parameter, it seems that it is not correctly used.

In the meantime, can you try by using the underlying Starlette StreamingResponse?

Ruwann avatar May 16 '24 15:05 Ruwann

Hi @Ruwann ,

thanks for the quick response! Your suggestion did solve the problem!

from starlette.responses import StreamingResponse

# ...

async def streaming():
    return StreamingResponse(async_gen_numbers(), status_code=200, media_type='text/plain')

mr-flannery avatar May 16 '24 19:05 mr-flannery

Hi @Ruwann ,

unfortunately, my response was a bit premature. It looks like it doesn't actually work as expected. While the code does run, it does not actually stream. From a caller's perspective it's still synchronous.

When calling the endpoint with curl, it still blocks for 10 seconds before anything happens, i.e.:

$ curl -N http://localhost:8080/streaming
# nothing happens for 10 seconds
0123456789
$

I wrote an Express app to make sure this was actually the server's fault, not curls:

const Express = require('express');
const app = new Express();

async function* generateNumbers() {
  for (const i of [1, 2, 3, 4, 5]) {
    await new Promise(resolve => setTimeout(resolve, 1000));
    yield await Promise.resolve(i);
  }
}

app.get('/streaming', async (req, res) => {
  for await (const i of generateNumbers()) {
    res.write(`data: ${i}\n\n`);
  }
  res.end();
});

app.listen(3456, () => {
  console.log('Server is running on port 3456');
})

When using curl here, this works as expected, i.e. it sends a chunk of data every second.

Therefore, I believe there might actually be a bug with sending StreamingResponses in connexion3 + AsyncApp.

mr-flannery avatar May 21 '24 08:05 mr-flannery

I cannot seem to reproduce the issue. When I use StreamingResponse from starlette, the response is streamed to my terminal as expected.

I used the following sample code: https://github.com/Ruwann/connexion-streaming

Ruwann avatar May 22 '24 21:05 Ruwann

Hi @Ruwann ,

thanks for the repo, I was able to track it down. This is what causes the behavior to change:

app.add_api("openapi.yaml", validate_responses=True)

Once I add validate_responses=True to your code, it exhibits the same problem. Vice versa, if I remove it from my code, streaming works as expected.

mr-flannery avatar May 23 '24 11:05 mr-flannery

Hi @Ruwann ,

just wanted to check in with you and see if there are there any plans to fix this?

Best regards!

mr-flannery avatar Jul 11 '24 08:07 mr-flannery

An easy way to handle that would be to have the possibility to disable response body validation on per route basis

nmoreaud avatar Sep 10 '24 09:09 nmoreaud

This seem to work, though it is probably not future proof.

validator_map = {
    "response": MediaTypeDict(
        {
            'application/jsonlines+json': StreamResponseBodyValidator
        }
    ),
}

class StreamResponseBodyValidator(AbstractResponseBodyValidator):

    def wrap_send(self, send):
        """Disable validation, leaving stream untouched"""
        return send

    def _parse(self, stream: t.Generator[bytes, None, None]) -> t.Any:  # type: ignore
        return stream

    def _validate(self, body: dict):
        pass

nmoreaud avatar Sep 10 '24 09:09 nmoreaud