redun icon indicating copy to clipboard operation
redun copied to clipboard

PostgreSQL Queue Executor (Distributed executor without Amazon/k8s)

Open gabriel-v opened this issue 1 year ago • 1 comments

This project is great - but there's no simple way to distribute work on self-hosted hardware, only Amazon this and Amazon that. There are executors that run Docker and K8s, but I'm working in an environment where that is managed elsewhere.

Then, I remembered this article / ad about postgres work queues.

The article above is missing a small detail about waiting for new work - we can use postgres's LISTEN and NOTIFY to have the workers wait with select.select(conn) instead of continuously polling for new changes.

Code:

But the scheduler doesn't just want to execute code - it also wants results back! That's why I used a second queue table to return the results.

Then I found that the redun.executor.* classes are very easy to work with - thanks for that! I did a Frankenstein between the Local Executor and the Docker Executor. The monitor thread logic is similar to the Docker executor, and everything else is cloned from the Local excutor.

Some things of interest you might want me to change:

  • all tasks are run in sub-process to guarantee clean python context for all tasks: processify(func)(...)
  • the worker loop itself is in yet another subprocess, to really restart it forever, in case of OOM: processify(_run_worker_forever)
  • there are no long-running DB connections because I want this to survive database crashes and restarts.
    • I am worried about https://github.com/insitro/redun/issues/56 and the fact that the scheduler expects the single PG connection to survive the duration of the run
    • The workers would really benefit from a long-running cursor to monitor the LISTEN/NOTIFY - otherwise we miss messages and still have to poll every few seconds
  • The executor needs to know how to unpack the message format created by the worker
  • The executor will possibly run on a different database then the redun backend, so that needs to be configured separately. The code currently doesn't use your database schema and migration systems - let me know if you want me to use that instead of running CREATE TABLE IF NOT EXISTS

I want to clean these experiments and submit a PR with the new executor. Before that happens, I think I need to clean some things up:

  • [x] queue tables: remove timestamps - copypasted from article but not used
  • [x] queue tables: use Binary field instead of Text field that we fill with base64 - text field came from the article, was lazy to change it
  • [x] queue table: make new column with redun run ID, so concurrent runs on the same queue don't interfere with each other
  • [x] fix SQL injection opportunities - all queries are using f-string instead of using c.execute("bla %s bla", (arg,))
  • [x] refactor all the queue management stuff as methods of the PgExecutor class
  • [x] add correct Type annotations and docstrings
  • [x] use more long-running DB connections in workers and monitor for LISTEN, while still refreshing the connection if it died
  • [x] put the executor code in redun/redun/executors/PgExecutor.py and write some extra tests
  • [x] put all the options (db connection info, table names, timeouts, limits) using the Executor Options logic, so it can be configured from the ini file / config dict

After this is cleaned up and merged, I think there will be some more discussion points on this:

  • could the Promise class that the scheduler uses be implemented using PG queues too? That would let us put the redun job object on the payload, so we would not need to keep all the pending tasks in memory on the scheduler
  • If the promise info is stored on SQL, then must the scheduler still be single-threaded? We could distribute it over all the workers, removing the _monitor thread singleton alltogether - the workers could run the scheduler on a completed task immediately instead of queuing it back

Let me know what you think, and if you'd want to merge this work

gabriel-v avatar Aug 18 '23 10:08 gabriel-v

got CI execution here: https://github.com/gabriel-v/redun/actions/runs/6113659282/job/16593587764?pr=3

no new tests so far

PR https://github.com/insitro/redun/pull/84

gabriel-v avatar Sep 07 '23 18:09 gabriel-v