boto3 icon indicating copy to clipboard operation
boto3 copied to clipboard

Threads calling S3 operations return RuntimeError (cannot schedule new futures after interpreter shutdown)

Open jpl-jengelke opened this issue 2 years ago • 17 comments

Describe the bug Basic S3 operations, like downloading or uploading files to buckets, when used in Python 3 threaded application methods, result in a RuntimeException. No bug reports are located here so this documents the error and requests a recommended workaround, if available.

Background Python 3.8 introduced some changes to how the concurrent futures module handled executor requests. Ostensibly, this prevents new tasks from being scheduled after the executor received a shutdown signal. The changes caused Boto3 versions (at least some) after 1.17.53 to yield the following exception:

cannot schedule new futures after interpreter shutdown
Traceback (most recent call last):
  File \"<some_file_calling_an_s3_operation>.py\", line 277, in <method_calling_an_s3_operation>
    s3_client.download_file(bucket_name, file_key, file_destination)
  File \"/usr/local/lib/python3.9/site-packages/boto3/s3/inject.py\", line 170, in download_file
    return transfer.download_file(
  File \"/usr/local/lib/python3.9/site-packages/boto3/s3/transfer.py\", line 304, in download_file
    future = self._manager.download(
  File \"/usr/local/lib/python3.9/site-packages/s3transfer/manager.py\", line 369, in download
    return self._submit_transfer(
  File \"/usr/local/lib/python3.9/site-packages/s3transfer/manager.py\", line 500, in _submit_transfer
    self._submission_executor.submit(
  File \"/usr/local/lib/python3.9/site-packages/s3transfer/futures.py\", line 467, in submit
    future = ExecutorFuture(self._executor.submit(task))
  File \"/usr/local/lib/python3.9/concurrent/futures/thread.py\", line 163, in submit
    raise RuntimeError('cannot schedule new futures after '
RuntimeError: cannot schedule new futures after interpreter shutdown

This impacted Apache Airflow to the extent that the solution was to disable threading in S3 operations. Similarly, there are other related bug reports. This has appeared sporadically in similar scenarios.

This ticket seeks guidance from the Boto3 team on how to best deal with this issue. (NOTE: Recommendations online suggest reverting to Boto3 1.17.53 [see above]. Another potential solution is disabling threading in S3 operations using TransferConfig. Another potential solution is using Thread.join() on the topmost thread, but that will result in waits and may not be readily possible, depending on architecture.

Steps to reproduce This was reproduced with the following application setup: Python 3.9.9 CentOS 7 botocore==1.20.112 boto3==1.17.112

Example Code:

#!/usr/bin/python3

import logging
from queue import Queue
import threading
import time

log = logging.getLogger(__main__)


def finalizer(some_queue):
    while True:  # loop to catch all items
        time.sleep(0.05)  # poor man's nice
        if not some_queue.empty():
            try:
                # application logic here
                method_that_performs_s3_operations()
                # application logic here
            except BaseException as be:
                log.exception(be)
    return


def processor(base_queue, some_queue):
    while True:  # loop to catch all items
        time.sleep(0.05)  # poor man's nice
        if not base_queue.empty():
            try:
                # application logic here
                method2_that_performs_s3_operations()
                add_to_some_queue()
                # application logic here
            except BaseException as be:
                log.exception(be)
    return


def collector(base_queue):
    while True:  # loop to catch all items
        time.sleep(0.05)  # poor man's nice
        if not base_queue.full():
            try:
                # application logic here
                add_to_base_queue()
                # application logic here
            except BaseException as be:
                log.exception(be)
    return


def main():
    base_queue = Queue(DEFAULT_QUEUE_SIZE)
    some_queue = Queue(DEFAULT_QUEUE_SIZE * 2)
    # define and run threads
    thread_collector = threading.Thread(target=collector, name='thread_collector',
                                         args=(base_queue))
    thread_processor = threading.Thread(target=processor, name='thread_processor',
                                      args=(base_queue, some_queue))
    thread_finalizer = threading.Thread(target=finalizer, name='thread_finalizer',
                                             args=(some_queue))
    # wait specific time to start processing threads
    time.sleep(30.0)
    thread_collector.start()
    thread_processor.start()
    thread_finalizer.start()
    return


if __name__ == '__main__':
    main()

Expected behavior S3 operations will proceed successfully to download/upload without any custom configuration. Exceptions relating to concurrency inside s3 code will not be thrown.

Debug logs Full stack trace by adding boto3.set_stream_logger('') to your code.

jpl-jengelke avatar Jan 17 '22 22:01 jpl-jengelke

Note that for various reasons, including the need to keep up with bug or security fixes, there's no desire to revert to previous versions of Boto3. Also, disabling threading in S3 or adding waits to individual thread processing is not desirable.

Also, note that Boto3 1.17.112 uses s3transfer 0.4.2 which appears functionally no different than s3transfer 0.5.0 (which appears to only deprecate Python 2). So upgrading s3transfer should not make a difference. Are there any changes in Boto3's inject.py or transfer.py that relieve this issue? Is it even a Boto3 issue?

Also note that downgrading the version and disabling threading in s3 operations allowed download/upload to work, but that feels nonperformant, especially for large, multipart files.

Thank you!

jpl-jengelke avatar Jan 17 '22 22:01 jpl-jengelke

Hi @jpl-jengelke, thanks for reaching out. I brought this up with the team and it is something that we’re looking into further. We will let you know when we have an update.

tim-finnigan avatar Jan 19 '22 23:01 tim-finnigan

@tim-finnigan Any update on this?

jpl-jengelke avatar Jun 10 '22 19:06 jpl-jengelke

Hi @jpl-jengelke thanks for following up and apologies that this fell off of our radar. I brought this up for discussion with the team again and they wanted to get some more information.

Regarding your original code snippet, could you tell us more about which S3 operations you were using?

Also you mentioned using Thread.join() as a possible solution. Could you elaborate a bit more on why you don’t think that would be an ideal approach here?

tim-finnigan avatar Jun 14 '22 17:06 tim-finnigan

Greetings! It looks like this issue hasn’t been active in longer than five days. We encourage you to check if this is still an issue in the latest release. In the absence of more information, we will be closing this issue soon. If you find that this is still a problem, please feel free to provide a comment or upvote with a reaction on the initial post to prevent automatic closure. If the issue is already closed, please feel free to open a new one.

github-actions[bot] avatar Nov 04 '22 07:11 github-actions[bot]

Hi @jpl-jengelke thanks for following up and apologies that this fell off of our radar. I brought this up for discussion with the team again and they wanted to get some more information.

Regarding your original code snippet, could you tell us more about which S3 operations you were using?

Also you mentioned using Thread.join() as a possible solution. Could you elaborate a bit more on why you don’t think that would be an ideal approach here?

Basically, I am running something like this:

def main(): 
   ...
    thread_a = threading.Thread(target=method_a, name='thread_a',
                                args=(u, v, x, y, z))
    thread_b = threading.Thread(target=method_b, name='thread_b',
                                args=(v, x, s, t))
    thread_c = threading.Thread(target=method_c, name='thread_c',
                                args=(s, q, r))

    # wait specific time to start processing threads
    time.sleep(30.0)
    thread_a.start()
    thread_b.start()
    thread_c.start()

    # bind main thread resources (attempt to address threading issues related to AWS tooling)
    thread_a.join()
    thread_b.join()
    thread_c.join()
    return

Within threads, specific other APIs are called that perform S3 bucket copies. ...

def method_a(u, v, x, y, z):
    ...
    do_s3_move(d, e, f, disable_threading=True)  # only works when threading disabled
    ...
def do_s3_move(d, e, f, disable_threading=False):
   ...
   if disable_threading:
      config = TransferConfig(use_threads=False)
   else:
      config = TransferConfig(use_threads=True)
   ...
   s3_client = boto3.client('s3')  # iam
   s3_client.upload_file(filepath, bucket_name, aws_key, Config=config)  # fails when threaded
   ...
   return success_flag

In our scenario, the Thread.join() op on the uppermost thread did not make a difference.

jpl-jengelke avatar Apr 04 '23 16:04 jpl-jengelke

@aBurmeseDev @tim-finnigan Your bot auto-closed the issue when it apparently still exists and was not fully addressed. It's a complicated issue but not necessarily an edge case. Please reopen, or should I create a new issue to track this?

jpl-jengelke avatar Apr 04 '23 16:04 jpl-jengelke

Hi @jpl-jengelke thanks for your patience. I brought this up again for discussion with the team and the consensus was that this likely requires a deeper dive investigation. Can you provide a minimal script to repro the issue? (I know you've already shared a few snippets here but I want to make sure we're using the best one for investigating this further.)

The only related GitHub issue I could find in our repos is https://github.com/boto/s3transfer/issues/197 which you commented on. I did see several Stack Overflow posts that reference the error and might be worth looking into. But given that you've already shared a workaround, I think we just need to isolate the conditions where this issue occurs to figure out what the options are for addressing it.

tim-finnigan avatar Apr 05 '23 20:04 tim-finnigan

Greetings! It looks like this issue hasn’t been active in longer than five days. We encourage you to check if this is still an issue in the latest release. In the absence of more information, we will be closing this issue soon. If you find that this is still a problem, please feel free to provide a comment or upvote with a reaction on the initial post to prevent automatic closure. If the issue is already closed, please feel free to open a new one.

github-actions[bot] avatar Apr 10 '23 21:04 github-actions[bot]

I am also encountering this issue using python 3.9 and the latest version of boto. My main thread is submitting upload jobs to an uploader that has a single background thread running to upload those jobs.

aidandonohue avatar Apr 13 '23 20:04 aidandonohue

There is exactly the same problem when we are download object via Pika in rabbitmq. More clearly when we create an consumer in rabbitmq, And if we try to download object in any way, this problem will happen. But if we are outside of consumer, there is no problem and everything works like a charm.

hougomartim avatar May 03 '23 03:05 hougomartim

Any updates on this thread?

asuglia-alana avatar Aug 03 '23 11:08 asuglia-alana

I've explained the underlying issue in https://github.com/boto/boto3/issues/3221#issuecomment-1169369661 which explains a bit more about this change in Python 3.9+.

Because of the bug fix, it's broken some of the ability for cleanup to work as intended. There isn't a robust fix to be done in S3transfer because this is an interprocess issue with how Python is doing cleanup.

The most straight forward fix is to place this import at the top level process/thread so a reference is kept. That will avoid the shutdown issues that are being encountered.

from concurrent.futures import ThreadPoolExecutor

nateprewitt avatar Aug 03 '23 19:08 nateprewitt

@nateprewitt

... There isn't a robust fix to be done in S3transfer because this is an interprocess issue with how Python is doing cleanup.

The most straight forward fix is to place this import at the top level process/thread so a reference is kept. That will avoid the shutdown issues that are being encountered.

from concurrent.futures import ThreadPoolExecutor

In earlier comments, I believe it was clear that the error was occurring within the actual S3 transfer codebase (specifically, in the upload_file() operation). I'm looking at the imports for the Boto3 code and I do not see anything about importing ThreadPoolExecutor. Is this a fix that the Boto3 team is planning to implement? Or is the solution to always put the import at the top of user code so that it is imported before any S3 Client or Transfer objects are imported?

Also, if the latter observation is true, should this be documented somewhere as it appears to be a requirement to run threaded S3 operations. More to the point, if user code is not using threading operations, maybe it should be added to S3 Transfer, anyway, to correct any potential issues within S3 Transfer threading. (Then an alternative solution, with the change, would be to import S3 Transfer modules before anything else -- since they would import ThreadPoolExecutor first.)

jpl-jengelke avatar Aug 03 '23 19:08 jpl-jengelke

fixed by disabling threading in S3 upload_file https://boto3.amazonaws.com/v1/documentation/api/1.17.53/guide/s3.html#threads

pRoy24 avatar Dec 21 '23 06:12 pRoy24

fixed by disabling threading in S3 upload_file https://boto3.amazonaws.com/v1/documentation/api/1.17.53/guide/s3.html#threads

That is a workaround documented here in this ticket. It's not really a fix, per se.

Obviously, others are still experiencing this issue in the wild.

jpl-jengelke avatar Dec 21 '23 14:12 jpl-jengelke

I think I just encountered this issue with the following setup while uploading a large file: Ubuntu 22.04 Python 3.10.12 boto3 1.28.66

The code calling the upload seems straight forward:

internal_client = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)
botoresponse = internal_client.upload_file(file_path, bucket, s3_file_path)

StackTrace: File "/home/steve/.local/lib/python3.10/site-packages/boto3/s3/inject.py", line 143, in upload_file return transfer.upload_file( File "/home/steve/.local/lib/python3.10/site-packages/boto3/s3/transfer.py", line 288, in upload_file future = self._manager.upload( File "/home/steve/.local/lib/python3.10/site-packages/s3transfer/manager.py", line 333, in upload return self._submit_transfer( File "/home/steve/.local/lib/python3.10/site-packages/s3transfer/manager.py", line 528, in _submit_transfer self._submission_executor.submit( File "/home/steve/.local/lib/python3.10/site-packages/s3transfer/futures.py", line 474, in submit future = ExecutorFuture(self._executor.submit(task)) File "/usr/lib/python3.10/concurrent/futures/thread.py", line 169, in submit raise RuntimeError('cannot schedule new futures after ' RuntimeError: cannot schedule new futures after interpreter shutdown

GiantZOC avatar Dec 21 '23 20:12 GiantZOC