aiobotocore icon indicating copy to clipboard operation
aiobotocore copied to clipboard

S3 - get_object - Read doesn't work

Open Sharleedah opened this issue 2 years ago • 12 comments

Describe the bug I wanted to move to aiobotocore for my aws handling but unfortunately i am running in the following error:

UseCase: I want to fetch a object from S3 with the following code

session = get_session()
async with session.create_client('s3', region_name="eu-central-1") as client:
    try:
        obj = await client.get_object(Bucket=cls.bucket_name, Key=path)
    except ClientError as ex:
        if ex.response['Error']['Code'] != 'NoSuchKey': raise ex 
        raise ObjectMissingError("Template", path)
    finally:
        await client.close()

From the given obj i want to read the data and return it on my webserver with the following Code:

@user_router.get("/storage/{filename}", responses=response_get_file, summary="Get file")
async def get_file(filename:str = Path(..., description="Name of the file to be returned", example="test.jpg")):
    """Get file"""
    content = await AmazonStorageHandler.get_file(path=filename)
    return StreamingResponse(content= content["Body"].iter_chunks(), media_type=content["ContentType"])

Problem is, that the webserver stops replying and seems to be stuck in a inifinite while loop. The following code yields the same behaviour, hence its not sth the webserver is responsible for:

data=await obj.read()

Checklist

  • [x] I have reproduced in environment where pip check passes without errors
  • [x] I have provided pip freeze results
  • [x] I have provided sample code or detailed way to reproduce
  • [x] I have tried the same code in botocore to ensure this is an aiobotocore specific issue
  • [x] I have tried similar code in aiohttp to ensure this is is an aiobotocore specific issue
  • [x] I have checked the latest and older versions of aiobotocore/aiohttp/python to see if this is a regression / injection

pip freeze results

aioboto3==10.3.0 aiobotocore==2.4.1 aiofiles==22.1.0 aiohttp==3.8.3 aioitertools==0.11.0 aiosignal==1.3.1 aiosmtplib==2.0.1 aiosqlite==0.18.0 alembic==1.9.2 amqp==5.1.1 anyio==3.6.2 async-timeout==4.0.2 asyncpg==0.27.0 attrs==22.2.0 bcrypt==4.0.1 beautifulsoup4==4.11.1 billiard==3.6.4.0 boto3==1.24.59 botocore==1.27.59 celery==5.2.7 certifi==2022.12.7 charset-normalizer==2.1.1 click==8.1.3 click-didyoumean==0.3.0 click-plugins==1.1.1 click-repl==0.2.0 dnspython==2.3.0 email-validator==1.3.1 eventlet==0.33.3 fastapi==0.89.1 ffmpeg-python==0.2.0 frozenlist==1.3.3 future==0.18.3 greenlet==2.0.1 gunicorn==20.1.0 h11==0.14.0 html-sanitizer==1.9.3 idna==3.4 itsdangerous==2.1.2 Jinja2==3.1.2 jmespath==1.0.1 kombu==5.2.4 lorem==0.1.1 lxml==4.9.2 Mako==1.2.4 Markdown==3.4.1 MarkupSafe==2.1.2 multidict==6.0.4 passlib==1.7.4 Pillow==9.4.0 prompt-toolkit==3.0.36 psycopg2==2.9.5 pycountry==22.3.5 pydantic==1.10.4 PyJWT==2.6.0 python-dateutil==2.8.2 python-dotenv==0.21.1 python-multipart==0.0.5 pytz==2022.7.1 PyYAML==6.0 qrcode==7.3.1 redis==4.4.2 requests==2.28.2 s3transfer==0.6.0 six==1.16.0 sniffio==1.3.0 soupsieve==2.3.2.post1 SQLAlchemy==1.4.46 starlette==0.22.0 typing_extensions==4.4.0 urllib3==1.26.14 uvicorn==0.20.0 vine==5.0.0 wcwidth==0.2.6 wrapt==1.14.1 yarl==1.8.2

Environment:

  • Python Version: [e.g. 3.10]
  • Docker: python:3.10-slim

Sharleedah avatar Jan 24 '23 18:01 Sharleedah

  1. tip: should use except client.exceptions.NoSuchKey:
  2. you can't close the aiobotocore client until after the response has fully streamed to the client. client should be on the server, and not per response. Remember each client has by default 10 connectors that can be used in parallel...this can be configured via the Config object.
  3. I think you just want StreamingResponse(content= content["Body"].content, .... as content is a "payload" IIRC.

thehesiod avatar Jan 25 '23 04:01 thehesiod

thanks for your answer! You are absolutely right, about the second one. But are you sure about the third one? Shouldnt be the data chunked in order to use a StreamResponse? Ive tried it with:

StreamingResponse(content=content["Body"], ... # loads only first chunk and then seems to be stuck in while loop
StreamingResponse(content=content["Body"].content, ... # Problem: no iterator
StreamingResponse(content=content["Body"].iter_chunks(),... # loads only first chunk and then seems to be stuck in while loop

aswell as writing my own generator like this

async def stream():
   async for chunk in content["Body"]:
      yield chunk
return StreamingResponse(content=sream(),...

But i got it to work like this (which is not a valid solution in the end cause it reads everything first):

@user_router.get("/storage/{filename}", responses=response_get_file, summary="Get file")
async def get_file(filename:str = Path(..., description="Name of the file to be returned", example="test.png")):
    """Get file"""
    content = await AmazonStorageHandler.get_file(path=filename)
    data = await content["Body"].read()
    return Response(content=data, media_type=content["ContentType"])

Do you have any idea how i can make it work with a Streaming Response if using a generator doesn't seem to work on the objects i receive from aiobotocore?

Sharleedah avatar Jan 25 '23 07:01 Sharleedah

whoops ya, looking again, StreamResponse wants you to write the data bit by bit so that it streams back to the client while you write. If you look at aiohttp.web_response.Response, on the body setter it checks to see if the body is of a payload type. IIRC .content is a payload type, so you could do something like response = Response(...), followed by response.body = content["Body"].content and I think it should allow you to stream the payload back to the client

thehesiod avatar Jan 26 '23 02:01 thehesiod

do report back as I'd like to keep that trick in my back pocket :)

thehesiod avatar Jan 26 '23 02:01 thehesiod

Thanks again for the fast response! Ive tried the aiohttp.web_response.Response with content["Body"].content, but unfortunately it didn't work (Client only sent 630 Bytes ish instead of the 90 KB the file had for example).

Do you have any idea how to make StreamResponse work? Normally it should work with .iter_chunks() as it did already with botocore native.

Sharleedah avatar Jan 26 '23 09:01 Sharleedah

will try to check on this again asap

thehesiod avatar Mar 07 '23 04:03 thehesiod

I believe that I've hit this same issue attempting to stream the partial contents of a file with aibotocore. My use case is that I am extracting a file from an uncompressed zip stored in an S3 bucket (testing using a local Minio container) via a range request. I get the full content of the file when downloading it all into memory as in the following example snippet:

response = await client.get_object(
        Bucket=bucket, Key=key, Range=f"bytes={start}-{end}"
    )
content = await response["Body"].read()
...

However this is inefficient for large files and so I would like to be able to stream the contents of the response body via an async generator.

If I change the above to:

response = await client.get_object(
        Bucket=bucket, Key=key, Range=f"bytes={start}-{end}"
    )
stream = response["Body"]
return stream_generator(stream)

async def stream_generator(stream):
    async for line in stream.iter_lines():
        parsed_line = do_stuff_to_line(line)
        yield parsed_line

I get the first ~60-100 lines of the file (out of ~2800) and then the stream suddenly stops. Same thing if I switch to using stream.iter_chunks() - with default chunk_size I see it receive about 10 chunks, then a partial chunk, then nothing.

In my case this async generator is then streamed via a fastapi.responses.StreamingResponse, but the problem seems to occur before that point and I can reproduce without fastapi.

tw4l avatar Mar 30 '23 18:03 tw4l

anyone have a sec to write a server/client demo in one python file? If not I'll take some time to write one to help debug soon

thehesiod avatar Jul 07 '23 05:07 thehesiod

Seems that we've faced the same issue trying to stream contents of a response body with aiohttp to another service. Under the hood our code does following:

# create a client instance on app startup
# ...
response = await client.get_object(Bucket=bucket, Key=key)
content = response['Body']

async with aiohttp.ClientSession() as session:
    async with session.post('http://downstream', data=content) as resp:
        # process response

Sometimes the client just stops working and hangs until the app is restarted.

The problem is that StreamingBody actually wraps ClientResponse and the latter should be explicitly closed at the end to free underlying connection. It is stated in README.md, however this might be unclear for the cases when AsyncIterator protocol is applied to the response body.

In my case the code above may be modified as following:

async with content:    # <- enter underlying ClientResponse cm
    async with aiohttp.ClientSession() as session:    
        async with session.post('http://...', data=content) as resp:
            # process response

Hope this helps!

katichev avatar Sep 15 '23 10:09 katichev

technically it's supposed to GC the connection when it's no longer referenced, so there may be a bug in aiohttp

thehesiod avatar Sep 15 '23 16:09 thehesiod

UPD I created a small test server which ignores response body. For every request:

  1. call get_object for some huge file
  2. put StreamingBody.__wrapped__ into WeakValueDictionary in global scope.
  3. respond with 200 ok

After it gets stuck I check my weakref dict with objgraph. That's what it shows:

refs

As far as I can see it cannot be GC-ed, right?

aiobotocore==2.6.0, aiohttp==3.8.5, python3.11

katichev avatar Sep 20 '23 10:09 katichev

btw in the original code you're closing the client, you can't close the client if you're going to stream the object back out. Also don't need to call close on the client as you're already using the context manager. Could you update your example please

thehesiod avatar Oct 25 '23 16:10 thehesiod