pebble
pebble copied to clipboard
Feature request: Generators in processes
User story
As a developer I would like to run a generator in another process, and get the results one by one to be able to process them in the main thread.
The function I'm running generates results which uses a lot of RAM (a list with many objects) and each object of the list can be processed individually. If I wouldn't be using a different process to run the function that generates these results, I would certainly use a generator function.
At the moment the functions being run in the sub-processes can only return one result: the list with all the objects. In an ideal scenario, if this gets implemented, future.result()
would return a generator which I could iterate over. Each time a result is produced by the sub-process function, it is pickled, sent to the pipe, unpickled and yield in the generator.
Notes
- I know I could do this manually somehow: use files to store the objects, use memcached, use an external queue system, etc.
- If it is already implemented, please let me know where :+1:
- If there is an easier way to do this, please let me know how :+1:
Are you aiming to reach any parallelism in here or is just for isolation purposes? In other words: is this a sequential operation ran in another process or do you need to split it into parallel computations?
Both isolation and parallelism.
Run a browser with custom JS, load an arbitrary site / page, extract data from it.
The browser might crash (isolation).
Multiple browsers will run faster than just one (rendering html is cpu bound)
I see what you mean and I think it's a pretty interesting feature. I am currently deep into another thing so I will start working on this a bit later (beginning of June?). Ofc pull requests are welcome.
In the meantime, you can use this code snippet as a reference for implementing such functionality in your application.
from pebble import concurrent
from multiprocessing import Queue
@concurrent.process(timeout=60)
def process_url(queue, url):
"""Processes a URL splitting the report in chunks and putting it in the given Queue."""
for chunk in process(url):
queue.put(chunk)
def process(url):
"""A function seemingly producing large data split in chunks."""
while True:
yield None
queue = Queue()
future = process_url(queue, "www.example.com")
while not future.done():
chunk = queue.get()
if future.exception() is not None:
print("Error while processing the URL")
Ah! I like the trick of using a queue!
The example code looks good and is a good inspiration for writing the PR. Depending on my client's needs, I might be writing the PR before June.
What about the interface? Do you think we should have two new methods in ProcessPool?
- schedule_generator()
- map_generator()
Both of them returning a tuple with future
and generator
? The generator would be a wrapper around the queue that people could use like this:
future, generator = pool.schedule_generator(function)
for result in generator:
print(result)
# Do some future error handling here