aiobotocore
aiobotocore copied to clipboard
S3 - get_object - Read doesn't work
Describe the bug I wanted to move to aiobotocore for my aws handling but unfortunately i am running in the following error:
UseCase: I want to fetch a object from S3 with the following code
session = get_session()
async with session.create_client('s3', region_name="eu-central-1") as client:
try:
obj = await client.get_object(Bucket=cls.bucket_name, Key=path)
except ClientError as ex:
if ex.response['Error']['Code'] != 'NoSuchKey': raise ex
raise ObjectMissingError("Template", path)
finally:
await client.close()
From the given obj
i want to read the data and return it on my webserver with the following Code:
@user_router.get("/storage/{filename}", responses=response_get_file, summary="Get file")
async def get_file(filename:str = Path(..., description="Name of the file to be returned", example="test.jpg")):
"""Get file"""
content = await AmazonStorageHandler.get_file(path=filename)
return StreamingResponse(content= content["Body"].iter_chunks(), media_type=content["ContentType"])
Problem is, that the webserver stops replying and seems to be stuck in a inifinite while loop. The following code yields the same behaviour, hence its not sth the webserver is responsible for:
data=await obj.read()
Checklist
- [x] I have reproduced in environment where
pip check
passes without errors - [x] I have provided
pip freeze
results - [x] I have provided sample code or detailed way to reproduce
- [x] I have tried the same code in botocore to ensure this is an aiobotocore specific issue
- [x] I have tried similar code in aiohttp to ensure this is is an aiobotocore specific issue
- [x] I have checked the latest and older versions of aiobotocore/aiohttp/python to see if this is a regression / injection
pip freeze results
aioboto3==10.3.0 aiobotocore==2.4.1 aiofiles==22.1.0 aiohttp==3.8.3 aioitertools==0.11.0 aiosignal==1.3.1 aiosmtplib==2.0.1 aiosqlite==0.18.0 alembic==1.9.2 amqp==5.1.1 anyio==3.6.2 async-timeout==4.0.2 asyncpg==0.27.0 attrs==22.2.0 bcrypt==4.0.1 beautifulsoup4==4.11.1 billiard==3.6.4.0 boto3==1.24.59 botocore==1.27.59 celery==5.2.7 certifi==2022.12.7 charset-normalizer==2.1.1 click==8.1.3 click-didyoumean==0.3.0 click-plugins==1.1.1 click-repl==0.2.0 dnspython==2.3.0 email-validator==1.3.1 eventlet==0.33.3 fastapi==0.89.1 ffmpeg-python==0.2.0 frozenlist==1.3.3 future==0.18.3 greenlet==2.0.1 gunicorn==20.1.0 h11==0.14.0 html-sanitizer==1.9.3 idna==3.4 itsdangerous==2.1.2 Jinja2==3.1.2 jmespath==1.0.1 kombu==5.2.4 lorem==0.1.1 lxml==4.9.2 Mako==1.2.4 Markdown==3.4.1 MarkupSafe==2.1.2 multidict==6.0.4 passlib==1.7.4 Pillow==9.4.0 prompt-toolkit==3.0.36 psycopg2==2.9.5 pycountry==22.3.5 pydantic==1.10.4 PyJWT==2.6.0 python-dateutil==2.8.2 python-dotenv==0.21.1 python-multipart==0.0.5 pytz==2022.7.1 PyYAML==6.0 qrcode==7.3.1 redis==4.4.2 requests==2.28.2 s3transfer==0.6.0 six==1.16.0 sniffio==1.3.0 soupsieve==2.3.2.post1 SQLAlchemy==1.4.46 starlette==0.22.0 typing_extensions==4.4.0 urllib3==1.26.14 uvicorn==0.20.0 vine==5.0.0 wcwidth==0.2.6 wrapt==1.14.1 yarl==1.8.2
Environment:
- Python Version: [e.g. 3.10]
- Docker: python:3.10-slim
- tip: should use
except client.exceptions.NoSuchKey:
- you can't close the aiobotocore client until after the response has fully streamed to the client.
client
should be on the server, and not per response. Remember each client has by default 10 connectors that can be used in parallel...this can be configured via theConfig
object. - I think you just want
StreamingResponse(content= content["Body"].content, ...
. ascontent
is a "payload" IIRC.
thanks for your answer! You are absolutely right, about the second one. But are you sure about the third one? Shouldnt be the data chunked in order to use a StreamResponse? Ive tried it with:
StreamingResponse(content=content["Body"], ... # loads only first chunk and then seems to be stuck in while loop
StreamingResponse(content=content["Body"].content, ... # Problem: no iterator
StreamingResponse(content=content["Body"].iter_chunks(),... # loads only first chunk and then seems to be stuck in while loop
aswell as writing my own generator like this
async def stream():
async for chunk in content["Body"]:
yield chunk
return StreamingResponse(content=sream(),...
But i got it to work like this (which is not a valid solution in the end cause it reads everything first):
@user_router.get("/storage/{filename}", responses=response_get_file, summary="Get file")
async def get_file(filename:str = Path(..., description="Name of the file to be returned", example="test.png")):
"""Get file"""
content = await AmazonStorageHandler.get_file(path=filename)
data = await content["Body"].read()
return Response(content=data, media_type=content["ContentType"])
Do you have any idea how i can make it work with a Streaming Response if using a generator doesn't seem to work on the objects i receive from aiobotocore?
whoops ya, looking again, StreamResponse wants you to write the data bit by bit so that it streams back to the client while you write. If you look at aiohttp.web_response.Response
, on the body setter it checks to see if the body is of a payload type. IIRC .content
is a payload type, so you could do something like response = Response(...), followed by response.body = content["Body"].content
and I think it should allow you to stream the payload back to the client
do report back as I'd like to keep that trick in my back pocket :)
Thanks again for the fast response! Ive tried the aiohttp.web_response.Response with content["Body"].content, but unfortunately it didn't work (Client only sent 630 Bytes ish instead of the 90 KB the file had for example).
Do you have any idea how to make StreamResponse work? Normally it should work with .iter_chunks() as it did already with botocore native.
will try to check on this again asap
I believe that I've hit this same issue attempting to stream the partial contents of a file with aibotocore. My use case is that I am extracting a file from an uncompressed zip stored in an S3 bucket (testing using a local Minio container) via a range request. I get the full content of the file when downloading it all into memory as in the following example snippet:
response = await client.get_object(
Bucket=bucket, Key=key, Range=f"bytes={start}-{end}"
)
content = await response["Body"].read()
...
However this is inefficient for large files and so I would like to be able to stream the contents of the response body via an async generator.
If I change the above to:
response = await client.get_object(
Bucket=bucket, Key=key, Range=f"bytes={start}-{end}"
)
stream = response["Body"]
return stream_generator(stream)
async def stream_generator(stream):
async for line in stream.iter_lines():
parsed_line = do_stuff_to_line(line)
yield parsed_line
I get the first ~60-100 lines of the file (out of ~2800) and then the stream suddenly stops. Same thing if I switch to using stream.iter_chunks()
- with default chunk_size
I see it receive about 10 chunks, then a partial chunk, then nothing.
In my case this async generator is then streamed via a fastapi.responses.StreamingResponse
, but the problem seems to occur before that point and I can reproduce without fastapi.
anyone have a sec to write a server/client demo in one python file? If not I'll take some time to write one to help debug soon
Seems that we've faced the same issue trying to stream contents of a response body with aiohttp to another service. Under the hood our code does following:
# create a client instance on app startup
# ...
response = await client.get_object(Bucket=bucket, Key=key)
content = response['Body']
async with aiohttp.ClientSession() as session:
async with session.post('http://downstream', data=content) as resp:
# process response
Sometimes the client just stops working and hangs until the app is restarted.
The problem is that StreamingBody
actually wraps ClientResponse
and the latter should be explicitly closed at the end to free underlying connection. It is stated in README.md, however this might be unclear for the cases when AsyncIterator protocol is applied to the response body.
In my case the code above may be modified as following:
async with content: # <- enter underlying ClientResponse cm
async with aiohttp.ClientSession() as session:
async with session.post('http://...', data=content) as resp:
# process response
Hope this helps!
technically it's supposed to GC the connection when it's no longer referenced, so there may be a bug in aiohttp
UPD I created a small test server which ignores response body. For every request:
- call
get_object
for some huge file - put
StreamingBody.__wrapped__
into WeakValueDictionary in global scope. - respond with 200 ok
After it gets stuck I check my weakref dict with objgraph. That's what it shows:
As far as I can see it cannot be GC-ed, right?
aiobotocore==2.6.0, aiohttp==3.8.5, python3.11
btw in the original code you're closing the client, you can't close the client if you're going to stream the object back out. Also don't need to call close on the client as you're already using the context manager. Could you update your example please