smart_open icon indicating copy to clipboard operation
smart_open copied to clipboard

S3 write seems to be affecting the next S3 read

Open rbur004 opened this issue 5 years ago • 4 comments

Problem description

Be sure your description clearly answers the following questions:

  • What are you trying to achieve? S3 to separate S3 copy. i.e. an object copy between completely separate instances of S3

  • What is the expected result? Object copied, with same etag

  • What are you seeing instead? The next S3 read's buffer affected by the S3.write call. Object copied is the same length, but corrupted.

  1. First ran the code, and found the source URI's ETag did not match calculated one (def etag()).
  2. Commented out ' s3_destination.write(piece)' and the calculated read ETag did match the source.

Then added printing of MD5's of each buffer read. Without the ' s3_destination.write(piece)' line, I get read buffer MD5s of: 30963b3c242ab8a4c18bba7ef90886a0 82e724fb9fbf2031c2a347ca355d20ef 536edc577ada5da5cc077ef285e9de4a 0e3a29f3939dd0d53b333056d6da323a And the object's ETag "182529afe71f46cc7438ccff13bd4993-4" Which is the correct result. It matches the object ETag in the source S3

With the ' s3_destination.write(piece)' line, read buffer's MD5 sums differ from the second read 30963b3c242ab8a4c18bba7ef90886a0 947a39703a2aff25f5667840aedb8a38 #Bad from here on c2e5eabeb2160b97ccb72509abe56ded cd380c872bd9e2b6a8ae61ba1426c8ce And the objects ETag "1760413afef893d8e16c4d47efb8b20a-4" Which is wrong

The s3_distination.write of the read buffer is somehow affecting the next S3 read.

Steps/code to reproduce the problem

Code

#!/usr/bin/python3
import os
import boto3
from smart_open import open
import hashlib

CHUNK_SIZE = 1073741824 #1G

S3_1_ENDPOINT = 'https://src.x'
S3_2_ENDPOINT = 'https://dest.x'
S3_1_BUCKET = 'x'
S3_2_BUCKET = 'y'

# AWS Credentials
s3_1_session = boto3.Session(
     aws_access_key_id='x',
     aws_secret_access_key='x'
)

s3_2_session = boto3.Session(
     aws_access_key_id='y',
     aws_secret_access_key='y'
)

def read_in_chunks(file_object, chunk_size=1024):
  """Lazy function (generator) to read a file piece by piece.
  Default chunk size: 1k."""
  while True:
      data = file_object.read(chunk_size)
      if not data:
          break
      yield data

def etag(md5_array):
  """Generate ETag to check it is the same as the original"""
  if len(md5_array) < 1:
    return '"{}"'.format(hashlib.md5().hexdigest())

  if len(md5_array) == 1:
    return '"{}"'.format(md5_array[0].hexdigest())

  digests = b''.join(m.digest() for m in md5_array)
  digests_md5 = hashlib.md5(digests)
  return '"{}-{}"'.format(digests_md5.hexdigest(), len(md5_array))


def s3copys3(src_session, src_endpoint, src_bucket, dest_session, dest_endpoint, dest_bucket, url):
  """Read from source S3, in chunks, and write these to the destination S3"""
  source_s3_url = 's3://{}/{}'.format(src_bucket, url)
  dest_s3_url = 's3://{}/{}/{}'.format(dest_bucket, src_bucket, url)

  md5s = [] #DEBUG code to calculate the Etag of the source object

  with open(source_s3_url, 'rb', transport_params={'session': src_session,  'buffer_size': CHUNK_SIZE, 'resource_kwargs': { 'endpoint_url': src_endpoint}}, ignore_ext=True) as s3_source:
    with open(dest_s3_url, 'wb', transport_params={'session': dest_session,  'buffer_size': CHUNK_SIZE, 'resource_kwargs': { 'endpoint_url': dest_endpoint}}, ignore_ext=True) as s3_destination:
      for piece in read_in_chunks(s3_source, CHUNK_SIZE):
         md5 = hashlib.md5(piece) #DEBUG The last read buffers md5
         print(md5.hexdigest()) #DEBUG print the last read buffers md5
         md5s.append(md5) #DEBUG Append to md5s array, so we can generate the ETag of the source
         s3_destination.write(piece) #Correct without this line. Corrupts next read, if this line is run

  print(etag(md5s)) #DEBUG code to calculate the Etag of the source object

s3copys3(src_session=s3_1_session, src_endpoint=S3_1_ENDPOINT, src_bucket=S3_1_BUCKET, 
         dest_session=s3_2_session, dest_endpoint=S3_2_ENDPOINT, dest_bucket=S3_2_BUCKET, 
         url = "x/x/x/x/x/x.x.gz")

Versions

Linux-4.15.0-48-generic-x86_64-with-Ubuntu-18.04-bionic Python 3.6.9 (default, Apr 18 2020, 01:56:04) [GCC 8.4.0] smart_open 2.0.0

rbur004 avatar Jun 26 '20 00:06 rbur004

Going through a temporary file, so there is only ever one S3 connection active at a time, works fine.

def s3copys3(src_session, src_endpoint, src_bucket, dest_session, dest_endpoint, dest_bucket, url):
  source_s3_url = 's3://{}/{}'.format(src_bucket, url)
  dest_s3_url = 's3://{}/{}/{}'.format(dest_bucket, src_bucket, url)

  with open(source_s3_url, 'rb', transport_params={'session': src_session,  'buffer_size': CHUNK_SIZE, 'resource_kwargs': { 'endpoint_url': src_endpoint}}, ignore_ext=True) as s3_source:
    with open('/tmp/s3_tmp', 'wb') as fout:
      for piece in read_in_chunks(s3_source, CHUNK_SIZE):
         fout.write(piece)

  with open('/tmp/s3_tmp', 'rb') as fin:
    with open(dest_s3_url, 'wb', transport_params={'session': dest_session,  'buffer_size': CHUNK_SIZE, 'resource_kwargs': { 'endpoint_url': dest_endpoint}}, ignore_ext=True) as s3_destination:
      for piece in read_in_chunks(fin, CHUNK_SIZE):
         s3_destination.write(piece)

Where this corrupts the read buffer, after the first write.

def s3copys3(src_session, src_endpoint, src_bucket, dest_session, dest_endpoint, dest_bucket, url):
  source_s3_url = 's3://{}/{}'.format(src_bucket, url)
  dest_s3_url = 's3://{}/{}/{}'.format(dest_bucket, src_bucket, url)

  with open(source_s3_url, 'rb', transport_params={'session': src_session,  'buffer_size': CHUNK_SIZE, 'resource_kwargs': { 'endpoint_url': src_endpoint}}, ignore_ext=True) as s3_source:
    with open(dest_s3_url, 'wb', transport_params={'session': dest_session,  'buffer_size': CHUNK_SIZE, 'resource_kwargs': { 'endpoint_url': dest_endpoint}}, ignore_ext=True) as s3_destination:
      for piece in read_in_chunks(s3_source, CHUNK_SIZE):
         s3_destination.write(piece)

rbur004 avatar Jun 26 '20 02:06 rbur004

@mpenkov What is the status of this bug? I have a similar need to read from one s3 account/profile and pipe the output to a separate s3 account/profile without writing to a temp file. Or is there another way of accomplishing this besides the OP example? The files I'm trying to copy are huge (tens of GB) so I would really like to avoid disk I/O.

kpdude7 avatar Mar 12 '21 04:03 kpdude7

I haven't had time to reproduce nor resolve this. If you're able, please investigate.

mpenkov avatar Mar 12 '21 07:03 mpenkov

I solved the researchers problem by using the boto3 S3 implementation directly, so haven't been tracking this issue.

For multipart, the checksum is the MD5 of the concatenated MD5s of each part. So the checksum varies with the part size.

rbur004 avatar Sep 13 '23 23:09 rbur004