S3 write seems to be affecting the next S3 read
Problem description
Be sure your description clearly answers the following questions:
-
What are you trying to achieve? S3 to separate S3 copy. i.e. an object copy between completely separate instances of S3
-
What is the expected result? Object copied, with same etag
-
What are you seeing instead? The next S3 read's buffer affected by the S3.write call. Object copied is the same length, but corrupted.
- First ran the code, and found the source URI's ETag did not match calculated one (def etag()).
- Commented out ' s3_destination.write(piece)' and the calculated read ETag did match the source.
Then added printing of MD5's of each buffer read. Without the ' s3_destination.write(piece)' line, I get read buffer MD5s of: 30963b3c242ab8a4c18bba7ef90886a0 82e724fb9fbf2031c2a347ca355d20ef 536edc577ada5da5cc077ef285e9de4a 0e3a29f3939dd0d53b333056d6da323a And the object's ETag "182529afe71f46cc7438ccff13bd4993-4" Which is the correct result. It matches the object ETag in the source S3
With the ' s3_destination.write(piece)' line, read buffer's MD5 sums differ from the second read 30963b3c242ab8a4c18bba7ef90886a0 947a39703a2aff25f5667840aedb8a38 #Bad from here on c2e5eabeb2160b97ccb72509abe56ded cd380c872bd9e2b6a8ae61ba1426c8ce And the objects ETag "1760413afef893d8e16c4d47efb8b20a-4" Which is wrong
The s3_distination.write of the read buffer is somehow affecting the next S3 read.
Steps/code to reproduce the problem
Code
#!/usr/bin/python3
import os
import boto3
from smart_open import open
import hashlib
CHUNK_SIZE = 1073741824 #1G
S3_1_ENDPOINT = 'https://src.x'
S3_2_ENDPOINT = 'https://dest.x'
S3_1_BUCKET = 'x'
S3_2_BUCKET = 'y'
# AWS Credentials
s3_1_session = boto3.Session(
aws_access_key_id='x',
aws_secret_access_key='x'
)
s3_2_session = boto3.Session(
aws_access_key_id='y',
aws_secret_access_key='y'
)
def read_in_chunks(file_object, chunk_size=1024):
"""Lazy function (generator) to read a file piece by piece.
Default chunk size: 1k."""
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
def etag(md5_array):
"""Generate ETag to check it is the same as the original"""
if len(md5_array) < 1:
return '"{}"'.format(hashlib.md5().hexdigest())
if len(md5_array) == 1:
return '"{}"'.format(md5_array[0].hexdigest())
digests = b''.join(m.digest() for m in md5_array)
digests_md5 = hashlib.md5(digests)
return '"{}-{}"'.format(digests_md5.hexdigest(), len(md5_array))
def s3copys3(src_session, src_endpoint, src_bucket, dest_session, dest_endpoint, dest_bucket, url):
"""Read from source S3, in chunks, and write these to the destination S3"""
source_s3_url = 's3://{}/{}'.format(src_bucket, url)
dest_s3_url = 's3://{}/{}/{}'.format(dest_bucket, src_bucket, url)
md5s = [] #DEBUG code to calculate the Etag of the source object
with open(source_s3_url, 'rb', transport_params={'session': src_session, 'buffer_size': CHUNK_SIZE, 'resource_kwargs': { 'endpoint_url': src_endpoint}}, ignore_ext=True) as s3_source:
with open(dest_s3_url, 'wb', transport_params={'session': dest_session, 'buffer_size': CHUNK_SIZE, 'resource_kwargs': { 'endpoint_url': dest_endpoint}}, ignore_ext=True) as s3_destination:
for piece in read_in_chunks(s3_source, CHUNK_SIZE):
md5 = hashlib.md5(piece) #DEBUG The last read buffers md5
print(md5.hexdigest()) #DEBUG print the last read buffers md5
md5s.append(md5) #DEBUG Append to md5s array, so we can generate the ETag of the source
s3_destination.write(piece) #Correct without this line. Corrupts next read, if this line is run
print(etag(md5s)) #DEBUG code to calculate the Etag of the source object
s3copys3(src_session=s3_1_session, src_endpoint=S3_1_ENDPOINT, src_bucket=S3_1_BUCKET,
dest_session=s3_2_session, dest_endpoint=S3_2_ENDPOINT, dest_bucket=S3_2_BUCKET,
url = "x/x/x/x/x/x.x.gz")
Versions
Linux-4.15.0-48-generic-x86_64-with-Ubuntu-18.04-bionic Python 3.6.9 (default, Apr 18 2020, 01:56:04) [GCC 8.4.0] smart_open 2.0.0
Going through a temporary file, so there is only ever one S3 connection active at a time, works fine.
def s3copys3(src_session, src_endpoint, src_bucket, dest_session, dest_endpoint, dest_bucket, url):
source_s3_url = 's3://{}/{}'.format(src_bucket, url)
dest_s3_url = 's3://{}/{}/{}'.format(dest_bucket, src_bucket, url)
with open(source_s3_url, 'rb', transport_params={'session': src_session, 'buffer_size': CHUNK_SIZE, 'resource_kwargs': { 'endpoint_url': src_endpoint}}, ignore_ext=True) as s3_source:
with open('/tmp/s3_tmp', 'wb') as fout:
for piece in read_in_chunks(s3_source, CHUNK_SIZE):
fout.write(piece)
with open('/tmp/s3_tmp', 'rb') as fin:
with open(dest_s3_url, 'wb', transport_params={'session': dest_session, 'buffer_size': CHUNK_SIZE, 'resource_kwargs': { 'endpoint_url': dest_endpoint}}, ignore_ext=True) as s3_destination:
for piece in read_in_chunks(fin, CHUNK_SIZE):
s3_destination.write(piece)
Where this corrupts the read buffer, after the first write.
def s3copys3(src_session, src_endpoint, src_bucket, dest_session, dest_endpoint, dest_bucket, url):
source_s3_url = 's3://{}/{}'.format(src_bucket, url)
dest_s3_url = 's3://{}/{}/{}'.format(dest_bucket, src_bucket, url)
with open(source_s3_url, 'rb', transport_params={'session': src_session, 'buffer_size': CHUNK_SIZE, 'resource_kwargs': { 'endpoint_url': src_endpoint}}, ignore_ext=True) as s3_source:
with open(dest_s3_url, 'wb', transport_params={'session': dest_session, 'buffer_size': CHUNK_SIZE, 'resource_kwargs': { 'endpoint_url': dest_endpoint}}, ignore_ext=True) as s3_destination:
for piece in read_in_chunks(s3_source, CHUNK_SIZE):
s3_destination.write(piece)
@mpenkov What is the status of this bug? I have a similar need to read from one s3 account/profile and pipe the output to a separate s3 account/profile without writing to a temp file. Or is there another way of accomplishing this besides the OP example? The files I'm trying to copy are huge (tens of GB) so I would really like to avoid disk I/O.
I haven't had time to reproduce nor resolve this. If you're able, please investigate.
I solved the researchers problem by using the boto3 S3 implementation directly, so haven't been tracking this issue.
For multipart, the checksum is the MD5 of the concatenated MD5s of each part. So the checksum varies with the part size.