s3fs icon indicating copy to clipboard operation
s3fs copied to clipboard

Using the Python buffer protocol in `pipe`

Open tomwhite opened this issue 9 months ago • 28 comments

Is it possible to avoid memory copies when writing to S3 by taking advantage of the Python buffer protocol?

In particular, it would be great if we could use a memoryview object in calls to pipe().

Using bytes and bytearray work fine:

>>> import numpy as np
>>> import fsspec
>>> fs = fsspec.filesystem('s3')
>>> fs.pipe("s3://cubed-unittest/mem-array/s3test", b"hello") # works
[{'ResponseMetadata': {'RequestId': 'E54C5V2H9M1ZE1V1', 'HostId': 'pcEW+k8KZHBbYuohqfgGNZOac8Y7QzRAHaAxOJmMWQdEll75umxh+On2DXxWi8qKMcG2+aZmJ7I=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'pcEW+k8KZHBbYuohqfgGNZOac8Y7QzRAHaAxOJmMWQdEll75umxh+On2DXxWi8qKMcG2+aZmJ7I=', 'x-amz-request-id': 'E54C5V2H9M1ZE1V1', 'date': 'Thu, 10 Apr 2025 13:19:42 GMT', 'x-amz-expiration': 'expiry-date="Sat, 12 Apr 2025 00:00:00 GMT", rule-id="gc"', 'x-amz-server-side-encryption': 'AES256', 'etag': '"5d41402abc4b2a76b9719d911017c592"', 'x-amz-checksum-crc32': 'NhCmhg==', 'x-amz-checksum-type': 'FULL_OBJECT', 'content-length': '0', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'Expiration': 'expiry-date="Sat, 12 Apr 2025 00:00:00 GMT", rule-id="gc"', 'ETag': '"5d41402abc4b2a76b9719d911017c592"', 'ChecksumCRC32': 'NhCmhg==', 'ChecksumType': 'FULL_OBJECT', 'ServerSideEncryption': 'AES256'}]
>>> fs.pipe("s3://cubed-unittest/mem-array/s3test", bytearray(b"hello")) # works
[{'ResponseMetadata': {'RequestId': 'C58QYEAVJTBW96Y6', 'HostId': 'lwi6otZoaiTsPhLYApOdMmhFXphWEfhoYh/sRO4txe88uUnj8wLNHTgaleq7agiipYlyd8GHlzrxrZ8sAF1qQIuyXQUwm5Nq', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'lwi6otZoaiTsPhLYApOdMmhFXphWEfhoYh/sRO4txe88uUnj8wLNHTgaleq7agiipYlyd8GHlzrxrZ8sAF1qQIuyXQUwm5Nq', 'x-amz-request-id': 'C58QYEAVJTBW96Y6', 'date': 'Thu, 10 Apr 2025 13:20:04 GMT', 'x-amz-expiration': 'expiry-date="Sat, 12 Apr 2025 00:00:00 GMT", rule-id="gc"', 'x-amz-server-side-encryption': 'AES256', 'etag': '"5d41402abc4b2a76b9719d911017c592"', 'x-amz-checksum-crc32': 'NhCmhg==', 'x-amz-checksum-type': 'FULL_OBJECT', 'content-length': '0', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'Expiration': 'expiry-date="Sat, 12 Apr 2025 00:00:00 GMT", rule-id="gc"', 'ETag': '"5d41402abc4b2a76b9719d911017c592"', 'ChecksumCRC32': 'NhCmhg==', 'ChecksumType': 'FULL_OBJECT', 'ServerSideEncryption': 'AES256'}]

But using memoryview or just a numpy array (which supports the buffer protocol too) fail:

>>> fs.pipe("s3://cubed-unittest/mem-array/s3test", memoryview(np.frombuffer(bytearray(b"hello"), dtype="uint8")))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 118, in wrapper
    return sync(self.loop, func, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 103, in sync
    raise return_result
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 56, in _runner
    result[0] = await coro
                ^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 418, in _pipe
    return await _run_coros_in_chunks(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 268, in _run_coros_in_chunks
    result, k = await done.pop()
                ^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 245, in _run_coro
    return await asyncio.wait_for(coro, timeout=timeout), i
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/asyncio/tasks.py", line 452, in wait_for
    return await fut
           ^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 1185, in _pipe_file
    out = await self._call_s3(
          ^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 371, in _call_s3
    return await _error_wrapper(
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 146, in _error_wrapper
    raise err
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 114, in _error_wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/aiobotocore/client.py", line 369, in _make_api_call
    request_dict = await self._convert_to_request_dict(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/aiobotocore/client.py", line 440, in _convert_to_request_dict
    request_dict = self._serializer.serialize_to_request(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/botocore/validate.py", line 381, in serialize_to_request
    raise ParamValidationError(report=report.generate_report())
botocore.exceptions.ParamValidationError: Parameter validation failed:
Invalid type for parameter Body, value: <memory at 0x102f99a80>, type: <class 'memoryview'>, valid types: <class 'bytes'>, <class 'bytearray'>, file-like object

This may be a limitation in the underlying boto library, but perhaps there is a way to avoid copying the source data (in this case a numpy array) that I'm missing.

cc @jakirkham who commented about this in https://github.com/zarr-developers/zarr-python/pull/2972#discussion_r2034110402

tomwhite avatar Apr 10 '25 13:04 tomwhite

Thanks Tom! 🙏

It seems like the error is coming from botocore

The traceback cuts short of the indicating the specific function, but reading the tea leaves leads me to conclude this botocore input validation function is involved

Now fixing that could resolve the issue. OTOH there may be other assumptions made in the code that are less clear. That said, there usually is a path forward with memoryviews (usually with some tweaks)

Looking around found a similar upstream issue: https://github.com/boto/boto3/issues/3423

There is some work to fix that in PR: https://github.com/boto/botocore/pull/3107

Unfortunately there has not been a lot of engagement as of yet. Tried to do a rough review of the PR to see if that might help move things along

jakirkham avatar Apr 12 '25 04:04 jakirkham

Thanks for digging into this @jakirkham!

I tried rebasing the botocore PR on main then running the fsspec test in this issue again. I got:

Traceback (most recent call last):
  File "/Users/tom/workspace/botocore/botocore/awsrequest.py", line 537, in reset_stream
    self.body.seek(0)
  File "/Users/tom/workspace/botocore/botocore/httpchecksum.py", line 183, in seek
    self._raw.seek(0)
    ^^^^^^^^^^^^^^
AttributeError: 'memoryview' object has no attribute 'seek'

This is because the body is a aiobotocore.httpchecksum.AioAwsChunkedWrapper, not just bytes. So I worked around that, then got:

Traceback (most recent call last):
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/aiohttp/client_reqrep.py", line 658, in write_bytes
    await self.body.write(writer)
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/aiohttp/payload.py", line 494, in write
    chunk = await self._iter.__anext__()
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/aiobotocore/httpchecksum.py", line 68, in __anext__
    return await self._make_chunk()
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/aiobotocore/httpchecksum.py", line 49, in _make_chunk
    raw_chunk = await resolve_awaitable(self._raw.read(self._chunk_size))
                                        ^^^^^^^^^^^^^^
AttributeError: 'memoryview' object has no attribute 'read'

So it looks like memoryview is not sufficient.

BTW I also stumbled across this article: https://pythonspeed.com/articles/bytesio-reduce-memory-usage/, with the intriguing suggestion that using BytesIO can avoid memory copies (if you start with a BytesIO object, at least). But afaict there's no way to go from a numpy array to BytesIO without a copy, which is what we need in the Zarr case that started this discussion...

tomwhite avatar Apr 14 '25 11:04 tomwhite

I have a couple of suggestions to try:

  • subclassing like
class ErsatzBytes(memoryview, bytes):

which might just "do the right thing", depending on what botocore actually does with it. This is enough to pass the isinstance check.

  • using one of the classes like RustyBuffer in cramjam ( https://docs.rs/cramjam/latest/cramjam/io/struct.RustyBuffer.html ), which are designed to reflect numpy/memoryviews without copy, be buffer protocol compatible, and also have seek/read methods.

martindurant avatar Apr 14 '25 16:04 martindurant

Thank you for the suggestions @martindurant!

I tried using cramjam's Buffer to wrap a NumPy array, but it seems to copy the bytes on construction. This also happens when using a NumPy array wrapped in a memoryview.

So that leaves ErsatzBytes which may be possible, but needs more investigation.

tomwhite avatar Apr 15 '25 08:04 tomwhite

@milesgranger - is there a correct invocation to use one of the cramjam buffer types to pass numpy/memoryview data to a third party with zero copy? That third party officially accepts bytes, bytearray and BytesIO.

martindurant avatar Apr 15 '25 11:04 martindurant

Cramjam's (only) Buffer type owns its data https://github.com/milesgranger/cramjam/blob/ebc9ce798c8f86871a32cb8bf9e515e45468f77c/src/io.rs#L342, but of course implements buffer protocol should one want to pass its data to a third-party with zero copy.

Anyhow, interesting and a very plausible implementation detail to support this behavior, but not directly applicable for cramjam as all the compression bits (which is cramjam's...jam.) implements zero-copy input and optionally zero-copy output w/ its _into functions, so not directly relevant for cramjam.Buffer to implement a zero copy overlay to numpy arrays or anything else for that matter.

milesgranger avatar Apr 15 '25 17:04 milesgranger

Yeah, it seems like the pyo3 ecosystem is now at the point where it can provide some nice behind-the-scenes benefits like zero-copy where otherwise it wouldn't work. Not sure, then, how to actually make that happen!

I believe the rationale for having bytes (and str) own its memory and not be mutable, is that they han then be hashed in dict/sets. Many applications don't care about that, though.

martindurant avatar Apr 15 '25 17:04 martindurant

I agree, and partly why I'm quite unmotivated to remove cramjam's current ability to overwrite bytes as an output destination... I should clarify, I'm definitely open to adding this functionality! Seems most of the pieces are already there and if even a few people would find it helpful, I think it's relatively low overhead to add it.

milesgranger avatar Apr 15 '25 17:04 milesgranger

Well that's great! It still might not solve the original problem, though, depending on exactly what isinstance or similar checks are done within botocore. Still, once such a thing is possible, people ought to start using it.

martindurant avatar Apr 15 '25 17:04 martindurant

FWIW the obstore.Bytes type allows externally-allocated memory via the buffer protocol (and via pyo3-bytes) and is slowly progressing to support all the methods on bytes.

We could probably mix and match that and the RustyBuffer methods to get you what you want.

kylebarron avatar Apr 21 '25 14:04 kylebarron

Good to hear, @kylebarron . Does it inherit from Bytes too?

martindurant avatar Apr 21 '25 18:04 martindurant

Does it inherit from Bytes too?

You mean subclass? That's not possible because it doesn't use builtins.bytes internally. But it can match the bytes methods as a protocol as closely as possible.

kylebarron avatar Apr 21 '25 18:04 kylebarron

For the purposes of the use here, it might be useful to have a pretend subclass (obstore.Bytes, builtins.bytes) for the purposes of third-party libraries that use issubclass to make decisions.

martindurant avatar Apr 21 '25 18:04 martindurant

I think that's a valid choice for downstream users but not something that obstore.Bytes should implement itself.

kylebarron avatar Apr 21 '25 18:04 kylebarron

@tomwhite, although I'm sure some in the community may frown upon it, I have no such scruples and found it interesting enough to take a stab at it. pip install cramjam==2.11.0rc1 - cramjam.Buffer(..., copy=False)

v2.11.0-rc1 Open any related issues over there if you please.

milesgranger avatar Apr 24 '25 08:04 milesgranger

Thank you for working on this @milesgranger!

I used copy=False and it now avoids a copy (verified using memray). But using it in s3fs still has problems, and unfortunately the stacktrace is not very helpful:

>>> import cramjam
>>> import numpy as np
>>> import fsspec
>>> fs = fsspec.filesystem('s3')
>>> buf = cramjam.Buffer(np.frombuffer(bytearray(b"hello"), dtype="uint8"), copy=False)
>>> fs.pipe("s3://cubed-unittest/mem-array/s3test", buf)
Future exception was never retrieved
future: <Future finished exception=ConnectionError('Connection lost')>
aiohttp.client_exceptions.ClientOSError: [Errno 32] Broken pipe

The above exception was the direct cause of the following exception:

ConnectionError: Connection lost
Future exception was never retrieved
future: <Future finished exception=ConnectionError('Connection lost')>
aiohttp.client_exceptions.ClientOSError: [Errno 32] Broken pipe

The above exception was the direct cause of the following exception:

ConnectionError: Connection lost
Future exception was never retrieved
future: <Future finished exception=ConnectionError('Connection lost')>
aiohttp.client_exceptions.ClientOSError: [Errno 32] Broken pipe

The above exception was the direct cause of the following exception:

ConnectionError: Connection lost
Traceback (most recent call last):
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 114, in _error_wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/aiobotocore/client.py", line 412, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (InternalError) when calling the PutObject operation (reached max retries: 4): We encountered an internal error. Please try again.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 118, in wrapper
    return sync(self.loop, func, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 103, in sync
    raise return_result
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 56, in _runner
    result[0] = await coro
                ^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 418, in _pipe
    return await _run_coros_in_chunks(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 268, in _run_coros_in_chunks
    result, k = await done.pop()
                ^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 245, in _run_coro
    return await asyncio.wait_for(coro, timeout=timeout), i
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/asyncio/tasks.py", line 452, in wait_for
    return await fut
           ^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 1185, in _pipe_file
    out = await self._call_s3(
          ^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 371, in _call_s3
    return await _error_wrapper(
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 146, in _error_wrapper
    raise err
OSError: [Errno 5] We encountered an internal error. Please try again.

tomwhite avatar Apr 24 '25 12:04 tomwhite

@tomwhite : it makes it look like a network correction fault; just checking that writing a bytes does still work?

If yes, it would be helpful to somehow log what calls botocore is making on the Bytes object.

martindurant avatar Apr 24 '25 13:04 martindurant

@tomwhite : it makes it look like a network correction fault; just checking that writing a bytes does still work?

Yes, that still works.

>>> fs.pipe("s3://cubed-unittest/mem-array/s3test", b"hello") # works
[{'ResponseMetadata': {'RequestId': 'BVAGQZ7GA2R4KFRR', 'HostId': 'aL6mLMIU9xQ01MW0TYr29rN3RjsYLuT5063S9X+y2InjdhYNJRW6C92byjjYncQyFiH/2FgFPL0m2u2TlTp4zg==', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'aL6mLMIU9xQ01MW0TYr29rN3RjsYLuT5063S9X+y2InjdhYNJRW6C92byjjYncQyFiH/2FgFPL0m2u2TlTp4zg==', 'x-amz-request-id': 'BVAGQZ7GA2R4KFRR', 'date': 'Thu, 24 Apr 2025 13:20:09 GMT', 'x-amz-expiration': 'expiry-date="Sat, 26 Apr 2025 00:00:00 GMT", rule-id="gc"', 'x-amz-server-side-encryption': 'AES256', 'etag': '"5d41402abc4b2a76b9719d911017c592"', 'x-amz-checksum-crc32': 'NhCmhg==', 'x-amz-checksum-type': 'FULL_OBJECT', 'content-length': '0', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'Expiration': 'expiry-date="Sat, 26 Apr 2025 00:00:00 GMT", rule-id="gc"', 'ETag': '"5d41402abc4b2a76b9719d911017c592"', 'ChecksumCRC32': 'NhCmhg==', 'ChecksumType': 'FULL_OBJECT', 'ServerSideEncryption': 'AES256'}]

If yes, it would be helpful to somehow log what calls botocore is making on the Bytes object.

+1

tomwhite avatar Apr 24 '25 13:04 tomwhite

I tried quick on a Minio deployment I had handy, slightly more informative but not by much, "ExcessData" error I'd never seen before. But the zero-copy buffer works fine with filesystem('file'); not sure what needs to be added/changed to make boto happy, don't have time at the moment to poke any deeper.

Note, that this also fails with io.BytesIO and the full-copy version of cramjam.Buffer

Traceback: `OSError: [Errno 5] An error occurred (ExcessData) when calling the PutObject operation: More data provided than indicated content length`
In [31]: fs.pipe("s3://test/mgra/test.txt", buf)
Future exception was never retrieved
future: <Future finished exception=ConnectionError('Connection lost')>
aiohttp.client_exceptions.ClientOSError: [Errno 32] Broken pipe

The above exception was the direct cause of the following exception:

ConnectionError: Connection lost
---------------------------------------------------------------------------
ClientError                               Traceback (most recent call last)
File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/s3fs/core.py:114, in _error_wrapper(func, args, kwargs, retries)
    113 try:
--> 114     return await func(*args, **kwargs)
    115 except S3_RETRYABLE_ERRORS as e:

File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/aiobotocore/client.py:412, in AioBaseClient._make_api_call(self, operation_name, api_params)
    411     error_class = self.exceptions.from_code(error_code)
--> 412     raise error_class(parsed_response, operation_name)
    413 else:

ClientError: An error occurred (ExcessData) when calling the PutObject operation: More data provided than indicated content length

The above exception was the direct cause of the following exception:

OSError                                   Traceback (most recent call last)
Cell In[31], line 1
----> 1 fs.pipe("s3://test/mgra/test.txt", buf)

File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/fsspec/asyn.py:118, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
    115 @functools.wraps(func)
    116 def wrapper(*args, **kwargs):
    117     self = obj or args[0]
--> 118     return sync(self.loop, func, *args, **kwargs)

File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/fsspec/asyn.py:103, in sync(loop, func, timeout, *args, **kwargs)
    101     raise FSTimeoutError from return_result
    102 elif isinstance(return_result, BaseException):
--> 103     raise return_result
    104 else:
    105     return return_result

File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/fsspec/asyn.py:56, in _runner(event, coro, result, timeout)
     54     coro = asyncio.wait_for(coro, timeout=timeout)
     55 try:
---> 56     result[0] = await coro
     57 except Exception as ex:
     58     result[0] = ex

File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/fsspec/asyn.py:430, in AsyncFileSystem._pipe(self, path, value, batch_size, **kwargs)
    428     path = {path: value}
    429 batch_size = batch_size or self.batch_size
--> 430 return await _run_coros_in_chunks(
    431     [self._pipe_file(k, v, **kwargs) for k, v in path.items()],
    432     batch_size=batch_size,
    433     nofiles=True,
    434 )

File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/fsspec/asyn.py:280, in _run_coros_in_chunks(coros, batch_size, callback, timeout, return_exceptions, nofiles)
    278     done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
    279     while done:
--> 280         result, k = await done.pop()
    281         results[k] = result
    283 return results

File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/fsspec/asyn.py:257, in _run_coros_in_chunks.<locals>._run_coro(coro, i)
    255 async def _run_coro(coro, i):
    256     try:
--> 257         return await asyncio.wait_for(coro, timeout=timeout), i
    258     except Exception as e:
    259         if not return_exceptions:

File ~/.local/share/uv/python/cpython-3.11.10-linux-x86_64-gnu/lib/python3.11/asyncio/tasks.py:452, in wait_for(fut, timeout)
    449 loop = events.get_running_loop()
    451 if timeout is None:
--> 452     return await fut
    454 if timeout <= 0:
    455     fut = ensure_future(fut, loop=loop)

File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/s3fs/core.py:1185, in S3FileSystem._pipe_file(self, path, data, chunksize, max_concurrency, mode, **kwargs)
   1183 # 5 GB is the limit for an S3 PUT
   1184 if size < min(5 * 2**30, 2 * chunksize):
-> 1185     out = await self._call_s3(
   1186         "put_object", Bucket=bucket, Key=key, Body=data, **kwargs, **match
   1187     )
   1188     self.invalidate_cache(path)
   1189     return out

File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/s3fs/core.py:371, in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs)
    369 logger.debug("CALL: %s - %s - %s", method.__name__, akwarglist, kw2)
    370 additional_kwargs = self._get_s3_method_kwargs(method, *akwarglist, **kwargs)
--> 371 return await _error_wrapper(
    372     method, kwargs=additional_kwargs, retries=self.retries
    373 )

File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/s3fs/core.py:146, in _error_wrapper(func, args, kwargs, retries)
    144         err = e
    145 err = translate_boto_error(err)
--> 146 raise err

OSError: [Errno 5] An error occurred (ExcessData) when calling the PutObject operation: More data provided than indicated content length

milesgranger avatar Apr 24 '25 14:04 milesgranger

Not sure if this is related:

>>> import cramjam
>>> import numpy as np
>>> data = cramjam.Buffer(np.frombuffer(bytearray(b"hello"), dtype="uint8"), copy=False)
>>> len(data)
5
>>> data.read(10)
b'hello\x00\x00\x00\x00\x00'

Shouldn't it read to EOF and not have the trailing null bytes?

tomwhite avatar Apr 24 '25 16:04 tomwhite

Oof, ya. But since fs.pipe fails with copy=True (default) I sorta doubt that's part of this problem.

milesgranger avatar Apr 24 '25 16:04 milesgranger

Is that bytes given by read() actually a view, then?

martindurant avatar Apr 24 '25 16:04 martindurant

No, it returns bytes from its view.

milesgranger avatar Apr 24 '25 16:04 milesgranger

Also fwiw cramjam.Buffer adds support for a writable file-like BytesIO object in Rust-allocated memory, but I just realized that if you only need a readable file-like BytesIO object, you can use BytesIO directly, since it supports buffer protocol input:

import obstore
from io import BytesIO

rust_bytes = obstore.Bytes(b"helloworld")

f = BytesIO(rust_bytes)
f.read() # b'helloworld'

but you shouldn't mutate the underlying rust buffer, because obstore.Bytes has invariants that assume the buffer won't be changing.

kylebarron avatar Apr 24 '25 21:04 kylebarron

Figured it out, I have fsspec.filesystem('s3').pipe(...) w/ a zero-copy cramjam Buffer working, can install version 2.11.0rc2

For the read() -> bytes not being a subset view of the data, that was intentional as it's obviously a bit safer but namely things like aiobotocore reads in chunks when processing larger data so I assumed the small incremental reads would be fine.

I might be convinced to add a read(..., as_view=True) flag though; but I don't think that's what the original intention of this issue was - only to pass a "file-like-buffer-protocol-compatible-zero-copy-object" to pipe.

milesgranger avatar Apr 25 '25 07:04 milesgranger

Figured it out, I have fsspec.filesystem('s3').pipe(...) w/ a zero-copy cramjam Buffer working, can install version 2.11.0rc2

Can confirm that it works now. Thanks @milesgranger!

>>> import cramjam
>>> import numpy as np
>>> import fsspec
>>> fs = fsspec.filesystem('s3')
>>> buf = cramjam.Buffer(np.frombuffer(bytearray(b"hello"), dtype="uint8"), copy=False)
>>> fs.pipe("s3://cubed-unittest/mem-array/s3test", buf)
[{'ResponseMetadata': {'RequestId': 'TTEVA2NQFPWG0WDW', 'HostId': 'dmxy778Io7a0Z2rEulQ6/ZDsnQx0q/xhxf8bBtbWOfFZ+erWexYMr0KMVzgdZ32WmMc1qkAsNLk=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'dmxy778Io7a0Z2rEulQ6/ZDsnQx0q/xhxf8bBtbWOfFZ+erWexYMr0KMVzgdZ32WmMc1qkAsNLk=', 'x-amz-request-id': 'TTEVA2NQFPWG0WDW', 'date': 'Fri, 25 Apr 2025 09:32:59 GMT', 'x-amz-expiration': 'expiry-date="Sun, 27 Apr 2025 00:00:00 GMT", rule-id="gc"', 'x-amz-server-side-encryption': 'AES256', 'etag': '"5d41402abc4b2a76b9719d911017c592"', 'x-amz-checksum-crc32': 'NhCmhg==', 'x-amz-checksum-type': 'FULL_OBJECT', 'content-length': '0', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'Expiration': 'expiry-date="Sun, 27 Apr 2025 00:00:00 GMT", rule-id="gc"', 'ETag': '"5d41402abc4b2a76b9719d911017c592"', 'ChecksumCRC32': 'NhCmhg==', 'ChecksumType': 'FULL_OBJECT', 'ServerSideEncryption': 'AES256'}]

tomwhite avatar Apr 25 '25 09:04 tomwhite

aiobotocore reads in chunks when processing larger data so I assumed the small incremental reads would be fine

This is probably true. If it immediately does read() to get the whole thing as bytes, that would be annoying. It is supposed to support streaming in the sense you mean.

martindurant avatar Apr 25 '25 14:04 martindurant

It is true:

In [17]: class Foo(io.BytesIO):
    ...:     def read(self, *args, **kwargs):
    ...:         print(f"{args} {kwargs}")
    ...:         return super().read(*args, **kwargs)
    ...:     def __len__(self):
    ...:         return 10_000_000
    ...:

In [18]: buf = Foo(b"0" * 10_000_000)

In [19]: fs.pipe("s3://test/mgra/test.txt", buf)
(1048576,) {}
(1048576,) {}
(1048576,) {}
(1048576,) {}
(1048576,) {}
(1048576,) {}
(1048576,) {}
(1048576,) {}
(1048576,) {}
(1048576,) {}
(1048576,) {}
Out[19]:
[{'ResponseMetadata': {'RequestId': '18399DDCEC3FC5CB',
   'HostId': '87231c9b37ead59585f11d59faffe7cad47b9dd2263df7b7fa91c6dbb4c40254',
   'HTTPStatusCode': 200,
   'HTTPHeaders': {'accept-ranges': 'bytes',
    'content-length': '0',
    'date': 'Fri, 25 Apr 2025 16:51:43 GMT',
    'etag': '"5b77099fb249bf5b8f10a16999ccd51f"',
    'server': 'MinIO',
    'strict-transport-security': 'max-age=31536000; includeSubDomains',
    'vary': 'Accept-Encoding',
    'x-amz-checksum-crc32': 'JIgOEA==',
    'x-amz-id-2': '87231c9b37ead59585f11d59faffe7cad47b9dd2263df7b7fa91c6dbb4c40254',
    'x-amz-request-id': '18399DDCEC3FC5CB',
    'x-content-type-options': 'nosniff',
    'x-ratelimit-limit': '3535',
    'x-ratelimit-remaining': '3535',
    'x-xss-protection': '1; mode=block'},
   'RetryAttempts': 0},
  'ETag': '"5b77099fb249bf5b8f10a16999ccd51f"',
  'ChecksumCRC32': 'JIgOEA=='}]

milesgranger avatar Apr 25 '25 16:04 milesgranger