kuzu
kuzu copied to clipboard
[Draft PR] Parallel Utils
This is a draft PR for the Parallel Utils interface to be used for Algorithm library. I am reusing some of the demo functions added by Xiyang. Since the front end interface is not ready, I have a defined an operator AlgorithmRunner that orchestrates execution of the function.
Main files added for parallel utils:
(1) parallel_utils.cpp (2) algorithm_runner.cpp (3) demo_algorithm.cpp
Currently table functions have a - "tableFunc" [defined as std::function<common::offset_t(TableFuncInput&, TableFuncOutput&)>;] that gets executed at runtime (like a function pointer). I am reusing the demo algorithm's tableFunc and providing it to the parallelutils to execute in parallel.
ParallelUtils is defined as a shared state defined inside the AlgorithmRunner operator, accessible by all threads executing the pipeline. The pipeline starts with AlgorithmRunner operator -
ResultCollector ↑ Projection ↑ AlgorithmRunner <---- source operator
At a high level, the functionality of the master thread and worker threads is as follows:
(1) Master thread
- The master will call the main
compute()
function of the graph algorithm (inside algorithm_runner operator'sgetNextTuplesInternal
). It holds a reference to the parallelUtils shared state and the function that needs to be executed. - To execute the function in parallel, the
doParallel
func is invoked passing a reference to the function. After that the master enters an infinite loop to monitor the progress. A new task is created and pushed into thependingQueue
of tasks. The master is either interrupted if the task is complete, or if some thread has encountered an error. - If there is an error, the task is removed from the queue and the
isActive
flag is set to false to interrupt all worker threads. The main exception is rethrown by the master. - If the task is "complete" (meaning no more morsels are available) - the master removes the task from the queue, but does not stop monitoring (does not exit the loop) since some other worker may also end up throwing an error. The master can only exit if the total threads registered for the task becomes zero --> indicating no thread is attached to the task.
- After the function has been executed successfully, the master can call the parallelUtils
terminate
function that sets the isActive flag to false and notifies all worker threads to interrupt their infinite loop.
(2) Worker thread
- Similar to the master, each thread assigned to be a worker enters the
runWorkerThread()
function and spins in a loop until it is notified that there is a pending task in queue. - Once notified, the task is fetched using the
getTaskFromQueue
func which returns the reference to the task. The function can be executed by passing a reference to the tablefuncinput and tablefuncoutput (hold the reference to the value vectors into which results need to be written). - The worker thread returns
true
if the return value of the tableFunc is nonzero, indicating it received a morsel of non-zero size. It deregisters from the task and exits from the loop. - If the return value is zero, it indicates no more morsels are left to execute for this function and it tries to complete this function (set
isComplete
to true for the task) and notifies the master about the completion. - If any worker encounters an error it immediately sets the
exception_ptr
with a reference to the error and exits from the loop. No worker thread throws the error directly, it only communicates to the master. It is the master thread's responsibility to remove the task from the queue safely, interrupt all threads safely and then rethrow the exception.