smart_open icon indicating copy to clipboard operation
smart_open copied to clipboard

Upload big io.Buffer to S3

Open ivanhigueram opened this issue 6 years ago • 10 comments

Problem description

I am requesting a set of files, zipping them, and then upload the zipped data to S3 using smart_open and a io.BytesIO() object. The size of the compressed data exceeds the 5 Gb S3 limit, and I know that in that case a multi-parts approach should be use (just like in boto3). I am using smart_open.s3.open() for doing this, but I do not completely understand how to configure the multi-part upload to avoid the EntityTooLarge error. I keep getting the error when using my code. Should I divide my file before hand or specify the number of parts? Checking the source code I don't see a num_parts option.

 (EntityTooLarge) when calling the UploadPart operation: Your proposed upload exceeds the maximum allowed size

My function is the following:

def stream_time_range_s3(start_date,
                         end_date,
                         aws_key,
                         aws_secret,
                         aws_bucket_name,
                         key,
                         max_workers,
                         delta):
    """
    Download individual month directory of .grd files to local directory.

    This function will download using the ftplib all the .grd files between the
    start_date and the end_date. All dates in the NOAA NARR server are
    stored following this order:
        data
        ├── year/month
            ├── year/month/day01
            ├── year/month/day02

    Here we download the monthly directory with the user-defined dates in the
    start and end dates. 

    Params:
        - start_year str: year to start download.
        - end_year str: year to stop download.
    """

    logger = logging.getLogger(__name__)

    if not isinstance(start_date, datetime):
        start_date = datetime.strptime(start_date, '%Y-%m-%d')
    else:
        ValueError(f'{start_date} is not in the correct format or not a valid type')


    session = boto3.Session(
        aws_access_key_id=aws_key,
        aws_secret_access_key=aws_secret
    )

    base_url = 'https://nomads.ncdc.noaa.gov/data/narr'
    time = ['0000', '0300', '0600', '0900', '1200', '1500', '1800', '2100']
 
    if delta is None:
        dates = datetime_range(start_date, end_date, {'days':1})
    else:
        dates = datetime_range(start_date, end_date, delta)

    urls_time_range = []
    for day, time in product(dates, time):
           file_name = f'narr-a_221_{day.strftime("%Y%m%d")}_{time}_000.grb'
           url = URL(base_url, day.strftime('%Y%m'), day.strftime('%Y%m%d'))
           urls_time_range.append(str(URL(url, file_name)))

    with multiprocessing.Pool(max_workers) as p:
        results = p.map(requests_to_s3, urls_time_range, chunksize=1)

        print('Finish download')
        buf = io.BytesIO()
        with zipfile.ZipFile(buf, mode='w', compression=zipfile.ZIP_DEFLATED, compresslevel=1) as zf:
            for content_file_name, content_file_result in results:
                try:
                    zf.writestr(content_file_name,
                                content_file_result)
                except Exception as exc:
                    print(exc)

        print('Finish zipping  - Upload Start')
        with smart_open.s3.open(aws_bucket_name, key, 'wb', session=session) as so:
            so.write(buf.getvalue())

    return None

You can test the function by running:

from datetime import datetime

a = stream_time_range_s3(start_date=datetime(2012, 1, 1),
end_date=datetime(2012, 2, 1),
aws_key=aws_key,
delta=None,
aws_secret=aws_secret,
aws_bucket_name=bucket_name,
key='wind_2012_test_parts.zip',
max_workers=10)

Versions

Darwin-18.7.0-x86_64-i386-64bit
Python 3.7.1 (default, Feb 27 2019, 18:57:54)
[Clang 10.0.0 (clang-1000.10.44.4)]
smart_open 1.8.4

ivanhigueram avatar Nov 07 '19 18:11 ivanhigueram

Thanks for reporting this.

Could you simplify the example a bit? It's too long and requires external data (wind_2012_test_parts.zip).

From your description of the problem, it sounds like the following should reproduce your problem:

import io
import smart_open
with open('some_large_file.bin', 'rb') as fin:
    with smart_open.open('s3://bucket/key.bin', 'wb') as fout:
        buf = fin.read(10e9) # read 10GiB into memory, oof
        fout.write(buf)

Can you confirm whether the above reproduces your problem? If not, let's look into reducing your original example, it's a bit too much for me to look at.

mpenkov avatar Nov 09 '19 02:11 mpenkov

I can confirm that I'm encountering this error when trying to upload a file over 5GB via fout.write(buf) as you've stated in the simplified example.

This stack overflow article appears to explain the cause: https://stackoverflow.com/questions/26319815/entitytoolarge-error-when-uploading-a-5g-file-to-amazon-s3

davidparks21 avatar Mar 25 '21 05:03 davidparks21

@davidparks21 Thank you for confirming the problem.

I think we can resolve the issue by ensuring that a single write call never puts more than 5GB. If there is more data, then subsequent write calls should handle it.

Are you able to make a PR?

mpenkov avatar Mar 25 '21 12:03 mpenkov

Oh, so just raise an exception when one fout.write(buff) is called with a buff > 5GB?

That would be an easy solution to deal with for me. I think I could do a PR for that. There was one other small thing I wanted to do a PR for too, so this would probably get me off my butt to do both.

davidparks21 avatar Mar 25 '21 15:03 davidparks21

so just raise an exception when one fout.write(buff) is called with a buff > 5GB?

smart_open's promise is to handle large uploads (and downloads) transparently. So instead of raising an exception, isn't it better to split the chunk into multipart pieces, each smaller than 5GB?

IIRC smart_open is already handling multipart uploads transparently under the hood, so this should be no different.

piskvorky avatar Mar 25 '21 15:03 piskvorky

I have a similar issue with trying to stream/write large files to S3 via pickle.dump(obj, fout, protocol=4) as smart_open is trying to upload parts that are each greater than 5GB.

Is this still a 'needs-info' or is the problem understood?

JamalRahman avatar Oct 21 '21 02:10 JamalRahman

I think we understand the problem, now we "just" need to fix it.

mpenkov avatar Oct 21 '21 06:10 mpenkov

@JamalRahman @pythric @ivanhigueram could you help with a fix, prepare the PR?

piskvorky avatar Oct 21 '21 08:10 piskvorky

@mpenkov is it still an open issue? I found this when considering smart_open to upload 5TB file to S3. @davidparks21 how did you resolve the problem in your case? Could you share some code snippet?

tweglinski avatar Feb 10 '22 09:02 tweglinski

I started working on a solution, but it wasn't a very trivial change the way it's currently coded. I ran out of time and abandoned the effort back when I posted. I'm not sure about the current status, but my solution was to simply chunk the calls to file.write(n) which was pretty trivial in my use case.

davidparks21 avatar Feb 11 '22 21:02 davidparks21

I've been chugging away at this, and finally hit upon a solution where it wouldn't need to make any extra copies to the buffer of the data - which would be a significant improvement when dealing with the size of files we're talking about, but unfortunately https://github.com/boto/boto3/issues/3423 stopped me from such a perfect solution.

I'll be opening a PR soon with a compromise solution though, but if my PR to botocore is accepted and it's released it'll open up not needing to buffer the data at all before sending (unless smaller than min-size-upload writes are involved).

jakkdl avatar Jan 26 '24 15:01 jakkdl

I think I'm running into a very similar problem:

src = 's3://commoncrawl/projects/hyperlinkgraph/cc-main-2017-feb-mar-apr-hostgraph/vertices.txt.gz'
dst_file = src.rstrip(".gz") + ".bz2"
tp = {'client': s3client, 'min_part_size': 2 * 1024**2}
with open(src, 'r', transport_params=tp) as f:
        with open(dst_file, 'w', transport_params=tp) as g:
            g.write(f.read())

I seem to be running out of memory on a small core machine with plenty of /tmp space. So, thinking I need to buffer the write/read? I though this was handled with the tp?

leeprevost avatar Feb 14 '24 17:02 leeprevost

I seem to be running out of memory on a small core machine with plenty of /tmp space. So, thinking I need to buffer the write/read? I though this was handled with the tp?

Solved this. I was missing the write iterator.

leeprevost avatar Feb 14 '24 19:02 leeprevost