kuzu icon indicating copy to clipboard operation
kuzu copied to clipboard

[Draft PR] Parallel Utils

Open anuchak opened this issue 9 months ago • 0 comments

This is a draft PR for the Parallel Utils interface to be used for Algorithm library. I am reusing some of the demo functions added by Xiyang. Since the front end interface is not ready, I have a defined an operator AlgorithmRunner that orchestrates execution of the function.

Main files added for parallel utils:

(1) parallel_utils.cpp (2) algorithm_runner.cpp (3) demo_algorithm.cpp

Currently table functions have a - "tableFunc" [defined as std::function<common::offset_t(TableFuncInput&, TableFuncOutput&)>;] that gets executed at runtime (like a function pointer). I am reusing the demo algorithm's tableFunc and providing it to the parallelutils to execute in parallel.

ParallelUtils is defined as a shared state defined inside the AlgorithmRunner operator, accessible by all threads executing the pipeline. The pipeline starts with AlgorithmRunner operator -

ResultCollector    ↑ Projection    ↑ AlgorithmRunner <---- source operator

At a high level, the functionality of the master thread and worker threads is as follows:

(1) Master thread

  • The master will call the main compute() function of the graph algorithm (inside algorithm_runner operator's getNextTuplesInternal). It holds a reference to the parallelUtils shared state and the function that needs to be executed.
  • To execute the function in parallel, the doParallel func is invoked passing a reference to the function. After that the master enters an infinite loop to monitor the progress. A new task is created and pushed into the pendingQueue of tasks. The master is either interrupted if the task is complete, or if some thread has encountered an error.
  • If there is an error, the task is removed from the queue and the isActive flag is set to false to interrupt all worker threads. The main exception is rethrown by the master.
  • If the task is "complete" (meaning no more morsels are available) - the master removes the task from the queue, but does not stop monitoring (does not exit the loop) since some other worker may also end up throwing an error. The master can only exit if the total threads registered for the task becomes zero --> indicating no thread is attached to the task.
  • After the function has been executed successfully, the master can call the parallelUtils terminate function that sets the isActive flag to false and notifies all worker threads to interrupt their infinite loop.

(2) Worker thread

  • Similar to the master, each thread assigned to be a worker enters the runWorkerThread() function and spins in a loop until it is notified that there is a pending task in queue.
  • Once notified, the task is fetched using the getTaskFromQueue func which returns the reference to the task. The function can be executed by passing a reference to the tablefuncinput and tablefuncoutput (hold the reference to the value vectors into which results need to be written).
  • The worker thread returns true if the return value of the tableFunc is nonzero, indicating it received a morsel of non-zero size. It deregisters from the task and exits from the loop.
  • If the return value is zero, it indicates no more morsels are left to execute for this function and it tries to complete this function (set isComplete to true for the task) and notifies the master about the completion.
  • If any worker encounters an error it immediately sets the exception_ptr with a reference to the error and exits from the loop. No worker thread throws the error directly, it only communicates to the master. It is the master thread's responsibility to remove the task from the queue safely, interrupt all threads safely and then rethrow the exception.

anuchak avatar May 08 '24 18:05 anuchak