flowpipe
flowpipe copied to clipboard
How to properly shutdown, mid-process, a multithreaded graph?
Greetings,
First, great library, we are making a pyqt app to handle data processing and flowpipe has been immensely useful.
I'd like to know if you have any advices on how to properly stop an app while a graph is running in threading mode.
Right now we are getting the following exception:
raise RuntimeError('cannot schedule new futures after ' RuntimeError: cannot schedule new futures after interpreter shutdown
My first idea was to add a flag inside the _evaluate_threaded function to quit when set to True. But doing it this way doesn't allow my nodes to properly close and they might be working on writing a file which would corrupt it.
I could wait for the graph to end before quitting but some of our processes could take a long time and that wouldn't be great for the user. Or maybe I could broadcast a message to raise an exception in all nodes so they can quite gracefully.
I wonder if you have any advice on how to handle that.
Thanks!
Hi Dave, I'm glad you like flowpipe!
My first instinct is to go for a threading.Event
and have all nodes react to that in whatever manner they need to (e.g. closing files before quitting). This might require some re-engineering of your nodes, though.
An easier to implement solution would be to track the event in _evaluate_threaded
and shut down the ThreadPoolExecutor using executor.shutdown(wait=True, cancel_futures=True)
method (https://docs.python.org/3.9/library/concurrent.futures.html#concurrent.futures.Executor.shutdown). This will block until the running nodes finish, though, which is what you didn't want.
I don't know how brutal executor.shutdown()
is in cancelling running threads. I believe all it does it continue the control flow in the main thread, leaving all currently running threads to their devices. Thus, even in such a situation I don't think you'd end up with corrupted files. Do test this a bit in some toy samples before trusting it, though - I'm really not 100% sure of Python's behavior here.
The pull request at https://github.com/PaulSchweizer/flowpipe/pull/154 might be interesting for you. I revamped the way various evaluation modes are implemented, for easier extension / manipulation of the Graph's manner of being computed. The functionality is there, I just haven't gotten around to updating the documentation / code samples to it. Seeing that users consider modifying the evaluation behavior gives me more motivation to complete that soon, since that's exactly what that feature's supposed to enable.
Hi neuneck,
I'll have a look threading.event, re-engineering our nodes shouldn't be too bad at this point of our project. I like that each node can have a specific reaction of a shutdown event, some can quit at any time while others are more sensitive to early shutdown.
#154, is something we've been pondering lately. For exemple, how to begin another run of the graph, to process another file, while the current one is still running in order to maximize CPU uses.
Thanks for the reply!
Let me (us) know how/what worked for you in the end. I think an interruption feature and/or timeout on Graph evaluation would be very useful.
About starting another run of the Graph - is it an option to set up a second graph?
What we're doing to run multiple graphs in parallel is create the graphs from a factory function (e.g. make_graph(file_path)
) and then run these graphs in parallel (implicitly in different greenlets, as they get created and executed within gunicorn workers in our case, but threads or processes should work just the same).
In our case it's not that straightforward because what handles the files is just another node inside the graph. We have multiple nodes to handle different types of files. Depending of the process we need to do, we use the appropriate "file node" at the beginning of the pipeline and we run the pipeline as long as the file node still has files to process.
That said, we could spawn a new graph as long as the file node isn't done and initialize it with an iteration number so it knows which file to process. Food for thought...
Hi @DaveMtl Were you able to get the behavior you wanted? If so, I'd be interested to hear about your solution.
Hi @neuneck, For the parallel graphs, I added the concept of a master node. Before starting a graph I check if a master node is in the graph. Then after the graph is finished I check if the master node is done or not, if it's not, I start the graph again. Since the app also work in console mode, I could also execute multiple app in parallel on a cluster where each instance of the app process just 1 file.
I haven't implemented to stop function yet, I'll let you know what I end up doing.