Add download/upload progress callback support
I've started using cloudpathlib recently and its made what I'm working on so much easier. I would like to report download progress to the users if any of my cloud files need downloaded.
I'm using S3 and got it working for my needs in my personal PR https://github.com/blurstudio-forks/cloudpathlib/pull/1. It's fairly easy to configure by passing a callable to the new transfer_callable argument of S3Client. The callable must accept the arguments direction, state, cloud_path, bytes_sent). This uses the S3Client to enable callbacks for any CloudPath used if it happens to trigger download/uploads.
The first 3 arguments allow your callable to understand what is happening and update your UI as needed including keeping track of multiple transfers happening at the same time. It lets you know when a transfer starts and stops so you can show/hide UI.
A simple implementation example:
from itertools import islice
from cloudpathlib import CloudPath, S3Client
def progress_callback(direction, state, cloud_path, bytes_sent):
print("Progress", direction, state, cloud_path, bytes_sent)
client = S3Client(transfer_callable=progress_callback, no_sign_request=True)
ladi = CloudPath("s3://ladi/Images/FEMA_CAP/2020/70349", client=client)
a_file = next(islice(ladi.iterdir(), 1))
# If file needs downloaded, this will call `progress_callback` to report start, stop and download progress
print(a_file.fspath)
I have only worked with S3, but I hope the other services also provide a transfer progress reporting system. Is this something you would like to implement or should I develop my code enough to create a pull request?
A more complex example using the rich library to show download progress of several files downloaded at once.
import concurrent.futures
from functools import partial
from itertools import islice
from cloudpathlib import CloudPath, S3Client
from rich.progress import BarColumn, DownloadColumn, Progress, TransferSpeedColumn, TextColumn
callback_state: dict[CloudPath, dict] = {}
def progress_callback(progress, direction, state, cloud_path, bytes_sent):
if state == "start":
progress.start()
size = cloud_path.stat().st_size
task_id = progress.add_task("Downloading", total=size, filename=cloud_path.name)
callback_state[cloud_path] = dict(task_id=task_id, size=size)
elif state == "stop":
if cloud_path in callback_state:
del callback_state[cloud_path]
else:
info = callback_state[cloud_path]
progress.update(info["task_id"], advance=bytes_sent)
def download_url(cloud_path):
# Trigger download of non-cached files
cloud_path.fspath
def download(urls) -> None:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {}
for url in urls:
future_to_url[executor.submit(download_url, url)] = url
for future in concurrent.futures.as_completed(future_to_url):
try:
future.result()
except Exception as exc:
print('%r generated an exception: %s' % (future_to_url[future], exc))
progress.stop()
if __name__ == "__main__":
# Set up the progress bar and attach the callback to the S3Client
progress = Progress(
TextColumn("[blue]{task.fields[filename]}"),
BarColumn(),
DownloadColumn(),
TransferSpeedColumn(),
)
client = S3Client(
transfer_callable=partial(progress_callback, progress),
no_sign_request=True,
)
# Download the first first 5 files from this resource.
ladi = CloudPath("s3://ladi/Images/FEMA_CAP/2020/70349", client=client)
download(islice(ladi.iterdir(), 5))
I would very much like similar functionality with the Google Cloud Storage back-end. I do not have a proof of concept to offer, but if someone has ideas/pointers on how that could be done, I would happily take on implementing it. Love this library; it has proven indispensable. I would gladly contribute.