pebble icon indicating copy to clipboard operation
pebble copied to clipboard

Feature request: Generators in processes

Open andresriancho opened this issue 6 years ago • 4 comments

User story

As a developer I would like to run a generator in another process, and get the results one by one to be able to process them in the main thread.

The function I'm running generates results which uses a lot of RAM (a list with many objects) and each object of the list can be processed individually. If I wouldn't be using a different process to run the function that generates these results, I would certainly use a generator function.

At the moment the functions being run in the sub-processes can only return one result: the list with all the objects. In an ideal scenario, if this gets implemented, future.result() would return a generator which I could iterate over. Each time a result is produced by the sub-process function, it is pickled, sent to the pipe, unpickled and yield in the generator.

Notes

  • I know I could do this manually somehow: use files to store the objects, use memcached, use an external queue system, etc.
  • If it is already implemented, please let me know where :+1:
  • If there is an easier way to do this, please let me know how :+1:

andresriancho avatar Apr 13 '18 17:04 andresriancho

Are you aiming to reach any parallelism in here or is just for isolation purposes? In other words: is this a sequential operation ran in another process or do you need to split it into parallel computations?

noxdafox avatar Apr 16 '18 08:04 noxdafox

Both isolation and parallelism.

Run a browser with custom JS, load an arbitrary site / page, extract data from it.

The browser might crash (isolation).

Multiple browsers will run faster than just one (rendering html is cpu bound)

andresriancho avatar Apr 16 '18 21:04 andresriancho

I see what you mean and I think it's a pretty interesting feature. I am currently deep into another thing so I will start working on this a bit later (beginning of June?). Ofc pull requests are welcome.

In the meantime, you can use this code snippet as a reference for implementing such functionality in your application.

from pebble import concurrent
from multiprocessing import Queue

@concurrent.process(timeout=60)
def process_url(queue, url):
    """Processes a URL splitting the report in chunks and putting it in the given Queue."""
    for chunk in process(url):
        queue.put(chunk)

def process(url):
    """A function seemingly producing large data split in chunks."""
    while True:
        yield None
       
queue = Queue()
future = process_url(queue, "www.example.com")

while not future.done():
    chunk = queue.get()

if future.exception() is not None:
    print("Error while processing the URL")

noxdafox avatar Apr 20 '18 07:04 noxdafox

Ah! I like the trick of using a queue!

The example code looks good and is a good inspiration for writing the PR. Depending on my client's needs, I might be writing the PR before June.

What about the interface? Do you think we should have two new methods in ProcessPool?

  • schedule_generator()
  • map_generator()

Both of them returning a tuple with future and generator? The generator would be a wrapper around the queue that people could use like this:

future, generator = pool.schedule_generator(function)

for result in generator:
    print(result)

# Do some future error handling here

andresriancho avatar Apr 20 '18 14:04 andresriancho