paco
paco copied to clipboard
Current limitations of map and each
Hi! First of I think paco is a very nice library and would like to help improve it. That said I have a particular problem: I need to download millions of images as fast as possible. I looked into these resources:
- Making 100 million requests with Python aiohttp: Solves the problem but its very code is very specific not very generic.
- paco.each: generic but is not memory bound
Using paco
my initial code was:
import os
import aiohttp
import aiofiles
import paco
urls = [
"https://static.pexels.com/photos/67843/splashing-splash-aqua-water-67843.jpeg",
"https://cdn.pixabay.com/photo/2016/10/27/22/53/heart-1776746_960_720.jpg",
"http://www.qygjxz.com/data/out/240/4321276-wallpaper-images-download.jpg"
] * 1000000
def download_file(path):
async def do_download_file(url):
filename = os.path.basename(url)
filepath = os.path.join(path, filename)
print(f"Downloading {url}")
async with aiohttp.request("GET", url) as resp:
context = await resp.read()
print(f"Completed {filename}")
async with aiofiles.open(filepath, "wb") as f:
await f.write(context)
return do_download_file
coro = paco.each(download_file(path))
loop = asyncio.get_event_loop()
loop.run_until_complete(coro)
I like the API of paco.each
but when testing it my computer froze as its memory blew up while trying to create 1 million coroutines. The main problem is in these lines of code:
# paco/each.py
for index, value in enumerate(iterable):
pool.add(collector(index, value))
I observe the following:
- It creates all the objects in memory before starting their tasks
- It also assumes that the collection fits in memory
- It also assumes that the collection is fast to iterate over
- Preserves order (nice to have)
Since my problem speed and memory then 1 to 3 are more relevant. I recreated the map
and each
using asyncio.Queue
and limiting the amount of tasks to exist at the same time. This involved creating and structure I called Stream
that just holds a coroutine and a Queue. My API enforces the limit
on each
to not surpass that amount of objects in memory.
urls = [
"https://static.pexels.com/photos/67843/splashing-splash-aqua-water-67843.jpeg",
"https://cdn.pixabay.com/photo/2016/10/27/22/53/heart-1776746_960_720.jpg",
"http://www.qygjxz.com/data/out/240/4321276-wallpaper-images-download.jpg"
] * 1000000
path = "/data/tmp/images"
stream = from_iterable(urls)
coro = each(download_file(path), stream, limit = 10)
loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_async_exception)
loop.run_until_complete(coro)
Both the new from_iterable
and map
functions have queue_maxsize
parameter that further limits how the data flows and enforces a back-pressure mechanism. The code is at the end. I wanted to share the experiment and also open the possibility of creating a paco.stream
module to continue the life of this code.
from collections import namedtuple
import asyncio
DONE = object()
Stream = namedtuple("Stream", "coroutine queue")
def _active_tasks(tasks):
return [ task for task in tasks if not task.done() ]
def _f_wrapper(f, queue = None):
async def __f_wrapper(x):
y = f(x)
if hasattr(y, "__await__"):
y = await y
if queue is not None:
await queue.put(y)
return __f_wrapper
async def _task_limit(tasks, limit):
tasks = _active_tasks(tasks)
while len(tasks) >= limit:
await asyncio.sleep(0)
tasks = _active_tasks(tasks)
return tasks
def map(f, stream, limit = 0, queue_maxsize = 0):
coroin = stream.coroutine
qin = stream.queue
qout = asyncio.Queue(maxsize = queue_maxsize)
async def _map(f):
coroin_task = asyncio.ensure_future(coroin)
tasks = []
f = _f_wrapper(f, queue = qout)
x = await qin.get()
while x is not DONE:
if limit:
tasks = await _task_limit(tasks, limit)
fcoro = f(x)
ftask = asyncio.ensure_future(fcoro)
tasks.append(ftask)
x = await qin.get()
# await tasks
tasks = _active_tasks(tasks)
while len(tasks) > 0:
await asyncio.sleep(0)
tasks = _active_tasks(tasks)
await qout.put(DONE)
await coroin_task
return Stream(_map(f), qout)
def from_iterable(iterable, queue_maxsize = 0):
qout = asyncio.Queue(maxsize=queue_maxsize)
async def _from_iterable():
for x in iterable:
await qout.put(x)
await qout.put(DONE)
return Stream(_from_iterable(), qout)
def each(f, stream, limit = 0):
coroin = stream.coroutine
qin = stream.queue
async def _each(f):
coroin_task = asyncio.ensure_future(coroin)
tasks = []
f = _f_wrapper(f)
x = await qin.get()
while x is not DONE:
if limit:
tasks = await _task_limit(tasks, limit)
fcoro = f(x)
ftask = asyncio.ensure_future(fcoro)
tasks.append(ftask)
x = await qin.get()
# await tasks
tasks = _active_tasks(tasks)
while len(tasks) > 0:
await asyncio.sleep(0)
tasks = _active_tasks(tasks)
await coroin_task
return _each(f)
def run(stream):
return stream.coroutine
@cgarciae Thanks for sharing your implementation!
What do you think of the alternative interface for the same task? It's basically a further generalization of paco.gather:
def igather(coros_or_futures, limit=0, loop=None, timeout=None,
return_exceptions=False):
"""
Arguments:
coros_or_futures (iterable|asynchronousiterable): iterator yielding
coroutines functions.
limit (int): max concurrency limit. Use ``0`` for no limit.
loop (asyncio.BaseEventLoop): optional event loop to use.
timeout (int|float): timeout can be used to control the maximum number
of seconds to wait before returning. timeout can be an int or
float. If timeout is not specified or None, there is no limit to
the wait time.
return_exceptions (bool): returns exceptions as valid results.
Returns:
asynchronousiterable: sequence of values yielded by coroutines,
as completed
Making the result ordered should be also possible, albeit a bit harder to implement and memory-hungry in the worst case.
An implementation sketch inspired by https://bugs.python.org/issue30782#msg336237:
async def igather(tasks, limit=None):
async def submit(tasks, buf):
# TODO: additionally support async iterators
for task in tasks:
await buf.put(asyncio.create_task(task))
await buf.put(None)
async def consume(buf):
while True:
task = await buf.get()
if task:
yield await asyncio.wait_for(task, None)
else:
break
buf = asyncio.Queue(limit or 0)
asyncio.create_task(submit(tasks, buf))
async for result in consume(buf):
yield result
It preserves task submission order in efficient way, but lacks proper exception handling.