Using the Python buffer protocol in `pipe`
Is it possible to avoid memory copies when writing to S3 by taking advantage of the Python buffer protocol?
In particular, it would be great if we could use a memoryview object in calls to pipe().
Using bytes and bytearray work fine:
>>> import numpy as np
>>> import fsspec
>>> fs = fsspec.filesystem('s3')
>>> fs.pipe("s3://cubed-unittest/mem-array/s3test", b"hello") # works
[{'ResponseMetadata': {'RequestId': 'E54C5V2H9M1ZE1V1', 'HostId': 'pcEW+k8KZHBbYuohqfgGNZOac8Y7QzRAHaAxOJmMWQdEll75umxh+On2DXxWi8qKMcG2+aZmJ7I=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'pcEW+k8KZHBbYuohqfgGNZOac8Y7QzRAHaAxOJmMWQdEll75umxh+On2DXxWi8qKMcG2+aZmJ7I=', 'x-amz-request-id': 'E54C5V2H9M1ZE1V1', 'date': 'Thu, 10 Apr 2025 13:19:42 GMT', 'x-amz-expiration': 'expiry-date="Sat, 12 Apr 2025 00:00:00 GMT", rule-id="gc"', 'x-amz-server-side-encryption': 'AES256', 'etag': '"5d41402abc4b2a76b9719d911017c592"', 'x-amz-checksum-crc32': 'NhCmhg==', 'x-amz-checksum-type': 'FULL_OBJECT', 'content-length': '0', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'Expiration': 'expiry-date="Sat, 12 Apr 2025 00:00:00 GMT", rule-id="gc"', 'ETag': '"5d41402abc4b2a76b9719d911017c592"', 'ChecksumCRC32': 'NhCmhg==', 'ChecksumType': 'FULL_OBJECT', 'ServerSideEncryption': 'AES256'}]
>>> fs.pipe("s3://cubed-unittest/mem-array/s3test", bytearray(b"hello")) # works
[{'ResponseMetadata': {'RequestId': 'C58QYEAVJTBW96Y6', 'HostId': 'lwi6otZoaiTsPhLYApOdMmhFXphWEfhoYh/sRO4txe88uUnj8wLNHTgaleq7agiipYlyd8GHlzrxrZ8sAF1qQIuyXQUwm5Nq', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'lwi6otZoaiTsPhLYApOdMmhFXphWEfhoYh/sRO4txe88uUnj8wLNHTgaleq7agiipYlyd8GHlzrxrZ8sAF1qQIuyXQUwm5Nq', 'x-amz-request-id': 'C58QYEAVJTBW96Y6', 'date': 'Thu, 10 Apr 2025 13:20:04 GMT', 'x-amz-expiration': 'expiry-date="Sat, 12 Apr 2025 00:00:00 GMT", rule-id="gc"', 'x-amz-server-side-encryption': 'AES256', 'etag': '"5d41402abc4b2a76b9719d911017c592"', 'x-amz-checksum-crc32': 'NhCmhg==', 'x-amz-checksum-type': 'FULL_OBJECT', 'content-length': '0', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'Expiration': 'expiry-date="Sat, 12 Apr 2025 00:00:00 GMT", rule-id="gc"', 'ETag': '"5d41402abc4b2a76b9719d911017c592"', 'ChecksumCRC32': 'NhCmhg==', 'ChecksumType': 'FULL_OBJECT', 'ServerSideEncryption': 'AES256'}]
But using memoryview or just a numpy array (which supports the buffer protocol too) fail:
>>> fs.pipe("s3://cubed-unittest/mem-array/s3test", memoryview(np.frombuffer(bytearray(b"hello"), dtype="uint8")))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 118, in wrapper
return sync(self.loop, func, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 103, in sync
raise return_result
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 56, in _runner
result[0] = await coro
^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 418, in _pipe
return await _run_coros_in_chunks(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 268, in _run_coros_in_chunks
result, k = await done.pop()
^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 245, in _run_coro
return await asyncio.wait_for(coro, timeout=timeout), i
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/asyncio/tasks.py", line 452, in wait_for
return await fut
^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 1185, in _pipe_file
out = await self._call_s3(
^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 371, in _call_s3
return await _error_wrapper(
^^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 146, in _error_wrapper
raise err
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 114, in _error_wrapper
return await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/aiobotocore/client.py", line 369, in _make_api_call
request_dict = await self._convert_to_request_dict(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/aiobotocore/client.py", line 440, in _convert_to_request_dict
request_dict = self._serializer.serialize_to_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/botocore/validate.py", line 381, in serialize_to_request
raise ParamValidationError(report=report.generate_report())
botocore.exceptions.ParamValidationError: Parameter validation failed:
Invalid type for parameter Body, value: <memory at 0x102f99a80>, type: <class 'memoryview'>, valid types: <class 'bytes'>, <class 'bytearray'>, file-like object
This may be a limitation in the underlying boto library, but perhaps there is a way to avoid copying the source data (in this case a numpy array) that I'm missing.
cc @jakirkham who commented about this in https://github.com/zarr-developers/zarr-python/pull/2972#discussion_r2034110402
Thanks Tom! 🙏
It seems like the error is coming from botocore
The traceback cuts short of the indicating the specific function, but reading the tea leaves leads me to conclude this botocore input validation function is involved
Now fixing that could resolve the issue. OTOH there may be other assumptions made in the code that are less clear. That said, there usually is a path forward with memoryviews (usually with some tweaks)
Looking around found a similar upstream issue: https://github.com/boto/boto3/issues/3423
There is some work to fix that in PR: https://github.com/boto/botocore/pull/3107
Unfortunately there has not been a lot of engagement as of yet. Tried to do a rough review of the PR to see if that might help move things along
Thanks for digging into this @jakirkham!
I tried rebasing the botocore PR on main then running the fsspec test in this issue again. I got:
Traceback (most recent call last):
File "/Users/tom/workspace/botocore/botocore/awsrequest.py", line 537, in reset_stream
self.body.seek(0)
File "/Users/tom/workspace/botocore/botocore/httpchecksum.py", line 183, in seek
self._raw.seek(0)
^^^^^^^^^^^^^^
AttributeError: 'memoryview' object has no attribute 'seek'
This is because the body is a aiobotocore.httpchecksum.AioAwsChunkedWrapper, not just bytes. So I worked around that, then got:
Traceback (most recent call last):
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/aiohttp/client_reqrep.py", line 658, in write_bytes
await self.body.write(writer)
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/aiohttp/payload.py", line 494, in write
chunk = await self._iter.__anext__()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/aiobotocore/httpchecksum.py", line 68, in __anext__
return await self._make_chunk()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/aiobotocore/httpchecksum.py", line 49, in _make_chunk
raw_chunk = await resolve_awaitable(self._raw.read(self._chunk_size))
^^^^^^^^^^^^^^
AttributeError: 'memoryview' object has no attribute 'read'
So it looks like memoryview is not sufficient.
BTW I also stumbled across this article: https://pythonspeed.com/articles/bytesio-reduce-memory-usage/, with the intriguing suggestion that using BytesIO can avoid memory copies (if you start with a BytesIO object, at least). But afaict there's no way to go from a numpy array to BytesIO without a copy, which is what we need in the Zarr case that started this discussion...
I have a couple of suggestions to try:
- subclassing like
class ErsatzBytes(memoryview, bytes):
which might just "do the right thing", depending on what botocore actually does with it. This is enough to pass the isinstance check.
- using one of the classes like RustyBuffer in cramjam ( https://docs.rs/cramjam/latest/cramjam/io/struct.RustyBuffer.html ), which are designed to reflect numpy/memoryviews without copy, be buffer protocol compatible, and also have seek/read methods.
Thank you for the suggestions @martindurant!
I tried using cramjam's Buffer to wrap a NumPy array, but it seems to copy the bytes on construction. This also happens when using a NumPy array wrapped in a memoryview.
So that leaves ErsatzBytes which may be possible, but needs more investigation.
@milesgranger - is there a correct invocation to use one of the cramjam buffer types to pass numpy/memoryview data to a third party with zero copy? That third party officially accepts bytes, bytearray and BytesIO.
Cramjam's (only) Buffer type owns its data https://github.com/milesgranger/cramjam/blob/ebc9ce798c8f86871a32cb8bf9e515e45468f77c/src/io.rs#L342, but of course implements buffer protocol should one want to pass its data to a third-party with zero copy.
Anyhow, interesting and a very plausible implementation detail to support this behavior, but not directly applicable for cramjam as all the compression bits (which is cramjam's...jam.) implements zero-copy input and optionally zero-copy output w/ its _into functions, so not directly relevant for cramjam.Buffer to implement a zero copy overlay to numpy arrays or anything else for that matter.
Yeah, it seems like the pyo3 ecosystem is now at the point where it can provide some nice behind-the-scenes benefits like zero-copy where otherwise it wouldn't work. Not sure, then, how to actually make that happen!
I believe the rationale for having bytes (and str) own its memory and not be mutable, is that they han then be hashed in dict/sets. Many applications don't care about that, though.
I agree, and partly why I'm quite unmotivated to remove cramjam's current ability to overwrite bytes as an output destination... I should clarify, I'm definitely open to adding this functionality! Seems most of the pieces are already there and if even a few people would find it helpful, I think it's relatively low overhead to add it.
Well that's great! It still might not solve the original problem, though, depending on exactly what isinstance or similar checks are done within botocore. Still, once such a thing is possible, people ought to start using it.
FWIW the obstore.Bytes type allows externally-allocated memory via the buffer protocol (and via pyo3-bytes) and is slowly progressing to support all the methods on bytes.
We could probably mix and match that and the RustyBuffer methods to get you what you want.
Good to hear, @kylebarron . Does it inherit from Bytes too?
Does it inherit from Bytes too?
You mean subclass? That's not possible because it doesn't use builtins.bytes internally. But it can match the bytes methods as a protocol as closely as possible.
For the purposes of the use here, it might be useful to have a pretend subclass (obstore.Bytes, builtins.bytes) for the purposes of third-party libraries that use issubclass to make decisions.
I think that's a valid choice for downstream users but not something that obstore.Bytes should implement itself.
@tomwhite, although I'm sure some in the community may frown upon it, I have no such scruples and found it interesting enough to take a stab at it. pip install cramjam==2.11.0rc1 - cramjam.Buffer(..., copy=False)
v2.11.0-rc1 Open any related issues over there if you please.
Thank you for working on this @milesgranger!
I used copy=False and it now avoids a copy (verified using memray). But using it in s3fs still has problems, and unfortunately the stacktrace is not very helpful:
>>> import cramjam
>>> import numpy as np
>>> import fsspec
>>> fs = fsspec.filesystem('s3')
>>> buf = cramjam.Buffer(np.frombuffer(bytearray(b"hello"), dtype="uint8"), copy=False)
>>> fs.pipe("s3://cubed-unittest/mem-array/s3test", buf)
Future exception was never retrieved
future: <Future finished exception=ConnectionError('Connection lost')>
aiohttp.client_exceptions.ClientOSError: [Errno 32] Broken pipe
The above exception was the direct cause of the following exception:
ConnectionError: Connection lost
Future exception was never retrieved
future: <Future finished exception=ConnectionError('Connection lost')>
aiohttp.client_exceptions.ClientOSError: [Errno 32] Broken pipe
The above exception was the direct cause of the following exception:
ConnectionError: Connection lost
Future exception was never retrieved
future: <Future finished exception=ConnectionError('Connection lost')>
aiohttp.client_exceptions.ClientOSError: [Errno 32] Broken pipe
The above exception was the direct cause of the following exception:
ConnectionError: Connection lost
Traceback (most recent call last):
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 114, in _error_wrapper
return await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/aiobotocore/client.py", line 412, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (InternalError) when calling the PutObject operation (reached max retries: 4): We encountered an internal error. Please try again.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 118, in wrapper
return sync(self.loop, func, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 103, in sync
raise return_result
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 56, in _runner
result[0] = await coro
^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 418, in _pipe
return await _run_coros_in_chunks(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 268, in _run_coros_in_chunks
result, k = await done.pop()
^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/fsspec/asyn.py", line 245, in _run_coro
return await asyncio.wait_for(coro, timeout=timeout), i
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/asyncio/tasks.py", line 452, in wait_for
return await fut
^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 1185, in _pipe_file
out = await self._call_s3(
^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 371, in _call_s3
return await _error_wrapper(
^^^^^^^^^^^^^^^^^^^^^
File "/Users/tom/miniforge3/envs/memray-array/lib/python3.11/site-packages/s3fs/core.py", line 146, in _error_wrapper
raise err
OSError: [Errno 5] We encountered an internal error. Please try again.
@tomwhite : it makes it look like a network correction fault; just checking that writing a bytes does still work?
If yes, it would be helpful to somehow log what calls botocore is making on the Bytes object.
@tomwhite : it makes it look like a network correction fault; just checking that writing a
bytesdoes still work?
Yes, that still works.
>>> fs.pipe("s3://cubed-unittest/mem-array/s3test", b"hello") # works
[{'ResponseMetadata': {'RequestId': 'BVAGQZ7GA2R4KFRR', 'HostId': 'aL6mLMIU9xQ01MW0TYr29rN3RjsYLuT5063S9X+y2InjdhYNJRW6C92byjjYncQyFiH/2FgFPL0m2u2TlTp4zg==', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'aL6mLMIU9xQ01MW0TYr29rN3RjsYLuT5063S9X+y2InjdhYNJRW6C92byjjYncQyFiH/2FgFPL0m2u2TlTp4zg==', 'x-amz-request-id': 'BVAGQZ7GA2R4KFRR', 'date': 'Thu, 24 Apr 2025 13:20:09 GMT', 'x-amz-expiration': 'expiry-date="Sat, 26 Apr 2025 00:00:00 GMT", rule-id="gc"', 'x-amz-server-side-encryption': 'AES256', 'etag': '"5d41402abc4b2a76b9719d911017c592"', 'x-amz-checksum-crc32': 'NhCmhg==', 'x-amz-checksum-type': 'FULL_OBJECT', 'content-length': '0', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'Expiration': 'expiry-date="Sat, 26 Apr 2025 00:00:00 GMT", rule-id="gc"', 'ETag': '"5d41402abc4b2a76b9719d911017c592"', 'ChecksumCRC32': 'NhCmhg==', 'ChecksumType': 'FULL_OBJECT', 'ServerSideEncryption': 'AES256'}]
If yes, it would be helpful to somehow log what calls botocore is making on the Bytes object.
+1
I tried quick on a Minio deployment I had handy, slightly more informative but not by much, "ExcessData" error I'd never seen before. But the zero-copy buffer works fine with filesystem('file'); not sure what needs to be added/changed to make boto happy, don't have time at the moment to poke any deeper.
Note, that this also fails with io.BytesIO and the full-copy version of cramjam.Buffer
Traceback: `OSError: [Errno 5] An error occurred (ExcessData) when calling the PutObject operation: More data provided than indicated content length`
In [31]: fs.pipe("s3://test/mgra/test.txt", buf)
Future exception was never retrieved
future: <Future finished exception=ConnectionError('Connection lost')>
aiohttp.client_exceptions.ClientOSError: [Errno 32] Broken pipe
The above exception was the direct cause of the following exception:
ConnectionError: Connection lost
---------------------------------------------------------------------------
ClientError Traceback (most recent call last)
File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/s3fs/core.py:114, in _error_wrapper(func, args, kwargs, retries)
113 try:
--> 114 return await func(*args, **kwargs)
115 except S3_RETRYABLE_ERRORS as e:
File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/aiobotocore/client.py:412, in AioBaseClient._make_api_call(self, operation_name, api_params)
411 error_class = self.exceptions.from_code(error_code)
--> 412 raise error_class(parsed_response, operation_name)
413 else:
ClientError: An error occurred (ExcessData) when calling the PutObject operation: More data provided than indicated content length
The above exception was the direct cause of the following exception:
OSError Traceback (most recent call last)
Cell In[31], line 1
----> 1 fs.pipe("s3://test/mgra/test.txt", buf)
File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/fsspec/asyn.py:118, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
115 @functools.wraps(func)
116 def wrapper(*args, **kwargs):
117 self = obj or args[0]
--> 118 return sync(self.loop, func, *args, **kwargs)
File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/fsspec/asyn.py:103, in sync(loop, func, timeout, *args, **kwargs)
101 raise FSTimeoutError from return_result
102 elif isinstance(return_result, BaseException):
--> 103 raise return_result
104 else:
105 return return_result
File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/fsspec/asyn.py:56, in _runner(event, coro, result, timeout)
54 coro = asyncio.wait_for(coro, timeout=timeout)
55 try:
---> 56 result[0] = await coro
57 except Exception as ex:
58 result[0] = ex
File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/fsspec/asyn.py:430, in AsyncFileSystem._pipe(self, path, value, batch_size, **kwargs)
428 path = {path: value}
429 batch_size = batch_size or self.batch_size
--> 430 return await _run_coros_in_chunks(
431 [self._pipe_file(k, v, **kwargs) for k, v in path.items()],
432 batch_size=batch_size,
433 nofiles=True,
434 )
File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/fsspec/asyn.py:280, in _run_coros_in_chunks(coros, batch_size, callback, timeout, return_exceptions, nofiles)
278 done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
279 while done:
--> 280 result, k = await done.pop()
281 results[k] = result
283 return results
File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/fsspec/asyn.py:257, in _run_coros_in_chunks.<locals>._run_coro(coro, i)
255 async def _run_coro(coro, i):
256 try:
--> 257 return await asyncio.wait_for(coro, timeout=timeout), i
258 except Exception as e:
259 if not return_exceptions:
File ~/.local/share/uv/python/cpython-3.11.10-linux-x86_64-gnu/lib/python3.11/asyncio/tasks.py:452, in wait_for(fut, timeout)
449 loop = events.get_running_loop()
451 if timeout is None:
--> 452 return await fut
454 if timeout <= 0:
455 fut = ensure_future(fut, loop=loop)
File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/s3fs/core.py:1185, in S3FileSystem._pipe_file(self, path, data, chunksize, max_concurrency, mode, **kwargs)
1183 # 5 GB is the limit for an S3 PUT
1184 if size < min(5 * 2**30, 2 * chunksize):
-> 1185 out = await self._call_s3(
1186 "put_object", Bucket=bucket, Key=key, Body=data, **kwargs, **match
1187 )
1188 self.invalidate_cache(path)
1189 return out
File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/s3fs/core.py:371, in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs)
369 logger.debug("CALL: %s - %s - %s", method.__name__, akwarglist, kw2)
370 additional_kwargs = self._get_s3_method_kwargs(method, *akwarglist, **kwargs)
--> 371 return await _error_wrapper(
372 method, kwargs=additional_kwargs, retries=self.retries
373 )
File ~/Projects/cramjam/.venv/lib/python3.11/site-packages/s3fs/core.py:146, in _error_wrapper(func, args, kwargs, retries)
144 err = e
145 err = translate_boto_error(err)
--> 146 raise err
OSError: [Errno 5] An error occurred (ExcessData) when calling the PutObject operation: More data provided than indicated content length
Not sure if this is related:
>>> import cramjam
>>> import numpy as np
>>> data = cramjam.Buffer(np.frombuffer(bytearray(b"hello"), dtype="uint8"), copy=False)
>>> len(data)
5
>>> data.read(10)
b'hello\x00\x00\x00\x00\x00'
Shouldn't it read to EOF and not have the trailing null bytes?
Oof, ya. But since fs.pipe fails with copy=True (default) I sorta doubt that's part of this problem.
Is that bytes given by read() actually a view, then?
No, it returns bytes from its view.
Also fwiw cramjam.Buffer adds support for a writable file-like BytesIO object in Rust-allocated memory, but I just realized that if you only need a readable file-like BytesIO object, you can use BytesIO directly, since it supports buffer protocol input:
import obstore
from io import BytesIO
rust_bytes = obstore.Bytes(b"helloworld")
f = BytesIO(rust_bytes)
f.read() # b'helloworld'
but you shouldn't mutate the underlying rust buffer, because obstore.Bytes has invariants that assume the buffer won't be changing.
Figured it out, I have fsspec.filesystem('s3').pipe(...) w/ a zero-copy cramjam Buffer working, can install version 2.11.0rc2
For the read() -> bytes not being a subset view of the data, that was intentional as it's obviously a bit safer but namely things like aiobotocore reads in chunks when processing larger data so I assumed the small incremental reads would be fine.
I might be convinced to add a read(..., as_view=True) flag though; but I don't think that's what the original intention of this issue was - only to pass a "file-like-buffer-protocol-compatible-zero-copy-object" to pipe.
Figured it out, I have
fsspec.filesystem('s3').pipe(...)w/ a zero-copy cramjam Buffer working, can install version2.11.0rc2
Can confirm that it works now. Thanks @milesgranger!
>>> import cramjam
>>> import numpy as np
>>> import fsspec
>>> fs = fsspec.filesystem('s3')
>>> buf = cramjam.Buffer(np.frombuffer(bytearray(b"hello"), dtype="uint8"), copy=False)
>>> fs.pipe("s3://cubed-unittest/mem-array/s3test", buf)
[{'ResponseMetadata': {'RequestId': 'TTEVA2NQFPWG0WDW', 'HostId': 'dmxy778Io7a0Z2rEulQ6/ZDsnQx0q/xhxf8bBtbWOfFZ+erWexYMr0KMVzgdZ32WmMc1qkAsNLk=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'dmxy778Io7a0Z2rEulQ6/ZDsnQx0q/xhxf8bBtbWOfFZ+erWexYMr0KMVzgdZ32WmMc1qkAsNLk=', 'x-amz-request-id': 'TTEVA2NQFPWG0WDW', 'date': 'Fri, 25 Apr 2025 09:32:59 GMT', 'x-amz-expiration': 'expiry-date="Sun, 27 Apr 2025 00:00:00 GMT", rule-id="gc"', 'x-amz-server-side-encryption': 'AES256', 'etag': '"5d41402abc4b2a76b9719d911017c592"', 'x-amz-checksum-crc32': 'NhCmhg==', 'x-amz-checksum-type': 'FULL_OBJECT', 'content-length': '0', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'Expiration': 'expiry-date="Sun, 27 Apr 2025 00:00:00 GMT", rule-id="gc"', 'ETag': '"5d41402abc4b2a76b9719d911017c592"', 'ChecksumCRC32': 'NhCmhg==', 'ChecksumType': 'FULL_OBJECT', 'ServerSideEncryption': 'AES256'}]
aiobotocore reads in chunks when processing larger data so I assumed the small incremental reads would be fine
This is probably true. If it immediately does read() to get the whole thing as bytes, that would be annoying. It is supposed to support streaming in the sense you mean.
It is true:
In [17]: class Foo(io.BytesIO):
...: def read(self, *args, **kwargs):
...: print(f"{args} {kwargs}")
...: return super().read(*args, **kwargs)
...: def __len__(self):
...: return 10_000_000
...:
In [18]: buf = Foo(b"0" * 10_000_000)
In [19]: fs.pipe("s3://test/mgra/test.txt", buf)
(1048576,) {}
(1048576,) {}
(1048576,) {}
(1048576,) {}
(1048576,) {}
(1048576,) {}
(1048576,) {}
(1048576,) {}
(1048576,) {}
(1048576,) {}
(1048576,) {}
Out[19]:
[{'ResponseMetadata': {'RequestId': '18399DDCEC3FC5CB',
'HostId': '87231c9b37ead59585f11d59faffe7cad47b9dd2263df7b7fa91c6dbb4c40254',
'HTTPStatusCode': 200,
'HTTPHeaders': {'accept-ranges': 'bytes',
'content-length': '0',
'date': 'Fri, 25 Apr 2025 16:51:43 GMT',
'etag': '"5b77099fb249bf5b8f10a16999ccd51f"',
'server': 'MinIO',
'strict-transport-security': 'max-age=31536000; includeSubDomains',
'vary': 'Accept-Encoding',
'x-amz-checksum-crc32': 'JIgOEA==',
'x-amz-id-2': '87231c9b37ead59585f11d59faffe7cad47b9dd2263df7b7fa91c6dbb4c40254',
'x-amz-request-id': '18399DDCEC3FC5CB',
'x-content-type-options': 'nosniff',
'x-ratelimit-limit': '3535',
'x-ratelimit-remaining': '3535',
'x-xss-protection': '1; mode=block'},
'RetryAttempts': 0},
'ETag': '"5b77099fb249bf5b8f10a16999ccd51f"',
'ChecksumCRC32': 'JIgOEA=='}]