worker icon indicating copy to clipboard operation
worker copied to clipboard

Proposal: crashed worker recovery improvements

Open benjie opened this issue 3 years ago • 9 comments

Background

Graphile Worker is designed to be fault tolerant by nature, there's a lot of potential failures that can happen and Graphile Worker tries to handle all of them as best it can. One of the recovery mechanics that works but could be improved is recovery from a crashed job.

What is a crashed job?

A crashed job is a job (an entry in the jobs table) that has been "locked" (i.e. some worker claims they're working on it) but is never released.

How do we recover from crashed jobs currently?

We don't know that a job is crashed, only that it has been locked, so we expire locks after 4 hours. It is assumed that all jobs should complete in well under 4 hours, so assuming this assumption holds up this is a strong indication that the job is no longer in progress, and can be attempted again.

What causes a crashed job?

Other than a job taking more than 4 hours to execute (don't let this happen!), in production the cause of a crashed job is almost always a premature worker exit.

(In development you might put your machine to sleep whilst it's processing a job, this may cause the job to execute for more than 4 hours in wall time.)

What is a premature worker exit?

A premature worker exit is where the Graphile Worker process exits unexpectedly and doesn't have a chance to clean up after itself. By default Graphile Worker installs signal handlers so that it can clean up after itself if it receives SIGINT/SIGTERM/etc, but there are some situations in which it cannot do this, and these are referred to as "premature worker exits". Typical causes of these premature exits are:

  • code containing process.exit()
  • memory exhaustion
  • unhandled exceptions (we have try/catch in a lot of places, so this is generally unlikely with Graphile Worker CLI assuming you're adhering to best practices)
  • SIGKILL and other "force kill" events
  • the node that the worker is running on is terminated (power failure, instance teardown, etc)

What's wrong with the current recovery?

4 hours is a long time. If you're sending an email your job should only take a couple seconds, waiting 4 hours for it to retry might be too long. Graphile Worker's design expects these premature exits to be rare, however if you've designed your system to terminate Graphile Worker with SIGKILL (some Docker setups do this, I hear) or otherwise not allow Worker to do cleanup then you're going to face this a lot more often than you might if the only cause was hardware failure.

Proposal

In the current design we cannot be sure that a job is crashed until the 4 hour window has expired - all we know is that a job is locked and which worker locked it. This latter information, who locked it, is not currently used in the system - this proposal is that we leverage this information to detect crashed jobs earlier.

Worker "keepalive"

I propose that we add a new table to our schema to track the currently running workers. The table would track:

  • worker_id
  • first_active
  • last_active

Every minute or so (configurable) each worker would update their entry in this table to mark themselves as freshly active.

Periodically a "cleanup process" can look for jobs that are locked by a worker that is no longer active and can "unlock" them, releasing them to be reattempted.

Unlocking a job will still result in the attempts count being increased - effectively it'll be a job failure, where the reason was that the worker timed out. The reason for this is that the job itself might be causing the worker to crash, so we want to honour the exponential back-off rules otherwise we risk repeatedly killing our workers over and over again by reattempting the deadly job with no time to process other jobs.

How might this go wrong?

At first this seems like a brilliant solution; however networks are tricky beasts - enter CAP theorem.

CAP theorem [...] states that it is impossible for a distributed data store to simultaneously provide more than two out of the following three guarantees:

Consistency: Every read receives the most recent write or an error. Availability: Every request receives a (non-error) response, without the guarantee that it contains the most recent write. Partition tolerance: The system continues to operate despite an arbitrary number of messages being dropped (or delayed) by the network between nodes. -- https://en.wikipedia.org/wiki/CAP_theorem

In a standard production setup for Graphile Worker, you'll have multiple (1 or more) workers, each worker may be running on the same or different hardware, and they'll all be talking to one primary PostgreSQL server which will most likely be running on different hardware again.

It's possible for a worker to check out a job from the database and then lose connection to the database. The worker can continue to process its work (lets say doing a speach-to-text analysis of a long video) in the hopes the database connection will be restored by the time it completes. If our "cleanup process" runs before this happens then we might end up accidentally releasing and re-attempting the job when the worker is still in progress.

Reasons you might lose the ability to keepalive your running worker

  • Network or firewall issues (e.g. network-level inability to communicate with PostgreSQL, could even be kernel-level on one machine or the other)
  • PostgreSQL restart (not currently accepting network requests)
  • PostgreSQL too many connections
  • Instance migration (e.g. where a running virtualized environment is frozen on one machine, transferred to another, and then unfrozen without shutting down/booting up)
  • Instance sleep (e.g. you suspend your computer by closing the laptop lid)

What does this all mean?

I think the tolerances around this should be configured on a per-environment basis by people familiar with the systems. Variables that we should care about:

  • keepalive_interval - how frequently should each worker tell the PostgreSQL server it's still alive? (e.g. 1 minute)
  • sweep_interval - how frequently do we check for crashed jobs (e.g. 5 minutes)
  • keepalive_threshold - how long after the last keepalive is the worker treated as if it exited prematurely (e.g. 15 minutes)

The keepalive_threshold must be long enough to account for the likely duration of all the issues in "How might this go wrong?" above - e.g. it should be long enough to allow for the PostgreSQL server to undergo regular maintenance. If the PostgreSQL server is expected to undergo longer maintenance (e.g. an upgrade from v13 to v14) then it's expected that you will cleanly shut down all workers before doing so (as would be standard practice).

This might also help a little with performance, perhaps, because we get to remove an or from the job search query - we would no longer need the 4 hour job lock maximum. This would also allow us to handle jobs that take longer than a couple hours.

benjie avatar Sep 23 '21 09:09 benjie

This looks good to me! Just to clarify, what is the behavior of running await worker.stop()? We are currently intercepting kill commands (which for us is Kubernetes pod destruction) and awaiting worker.stop.

Will that either wait until jobs are completed or unlock the jobs?

We are still susceptible to abandoned jobs from unexpected exits, PostgreSQL failovers, etc. so this is still useful for us, but just wanted to clarify how that and this interact

ben-pr-p avatar Sep 23 '21 13:09 ben-pr-p

await worker.stop() has a very slow and clean exit - it basically tells the system to stop asking for/accepting new jobs, then waits for all existing (in progress) jobs on the node to complete before resolving. This is the preferred method of shutting down worker, but isn't currently possible in the CLI.

SIGTERM/etc is slightly less clean - we exit in a more hurried fashion so we fail all running jobs (releasing them) and then exit. I should probably make it so you can send worker a signal to exit in the slower fashion above.

benjie avatar Sep 24 '21 10:09 benjie

An API for the fast exit would be great too - we use https://github.com/gajus/lightship to manage startup and cleanup of our NodeJS pods, and having the fast exit as a option when controlling things ourselves would be useful.

ben-pr-p avatar Sep 24 '21 13:09 ben-pr-p

Feel free to raise an issue for that :+1:

benjie avatar Sep 24 '21 14:09 benjie

For this feature, will there be a core level job that gets scheduled via a core level crontab entry at sweep_interval minutes, and that checks for jobs/queues that have been locked for more than keepalive_threshold minutes? Or is there a different way of handling this instead of creating a core level job?

siftd106 avatar Mar 25 '22 15:03 siftd106

I don’t think a job necessarily needs to exist to do it, but we could piggyback the cron code perhaps

benjie avatar Mar 26 '22 08:03 benjie

Hi everyone!

My team is currently doing a proof of concept to switchover from SQS to graphile-worker, and I after reviewing the code and some of the PRs this one was a little concerning to me. Let me give a tiny bit of context and then explain why.

Reasons for choosing graphile-worker

  1. Simplicity, our dev, staging, and prod environments do not need complex SQS setups (with dead letter queues, etc)
  2. Backups - our database backups by default include any pending jobs, a DB restore also restores the job state (hard if not impossible to do with SQS).
  3. SQL API - we run both node.js and Go "services" and need the ability to queue and receive jobs. I know there is not a "Go consumer", but it is relatively straightforward to write as long as there are LISTEN / NOTIFY channels to listen on, and functions defined to fetch jobs.
  4. SQL API - a huuuuge benefit of postgres-based job queues is to be able to transactionally queue jobs, and to transactionally ack / reject them. For example, imagine you had 1000 customers and wanted to send them all an email. You queue a job that is "sendEmailToAllCustomers" on some kind of cron. Then you transactionally ack that job, AND transactionally queue 1000 "sendEmailToIndividualCustomer" jobs. However, graphile-worker only supports pg directly, however if you use sequelize (like us) you need to use the sequelize.Transaction object to send queries over a transaction. I assume it would be the same for Prisma, deno-postgres, or any other well-established SQL orm. So, the SQL API allows us to queue jobs inside a sequelize transaction. (vs with the JS api for addJob). This is admittedly a little clunky - it would be awesome if this particular use-case was better supported.
  5. Scheduled execution, SQS only allowed delayed execution up to 900 seconds
  6. Good-enough performance
  7. Typescript support, that along put it over the top of pg-boss

Non-reasons for choosing graphile-worker

  1. Extreme Performance
  2. Cron support (I believe using https://github.com/citusdata/pg_cron to call SELECT graphile_worker.add_job(...) and centralize the cron directly in the database is a lot simpler / easier to reason about)

We care a lot about simplicity, but additionally correctness is implicitly a requirement of a job queue. I believe this PR adds complexity, and also sacrifices correctness. I know this was already covered / pointed out in the proposal, I just was surprised to not see much other pushback on it.

The design of this system seems to be "lease" based, I acquire a lease on a job for 4 hours - I have 4 hours to specifically ack or specifically reject (or requeue) that job. If it takes me 3 hours and 59 minutes that is great, that is the contract between the job processor and the job queue. However, this change effectively turns the "lease time" down to 1 minute (or something unreasonably short), since it is impossible to tell the difference between a network hiccup and a failed worker process.

Not all background jobs do "database stuff" so a failure of the database connection for few minutes might be completely unrelated to a job failure (video transcoding, talking to third party APIs to do something, ML model training, etc).

This seems to specifically weakening the guarantees of this job queue to better support SIGKILL, OOM, or otherwise non-clean shutdowns. Which in my opinion should be a non-goal of this project.

I think a significantly better solution might be to enable unlocking all jobs previously locked from a certain worker_id. That along with support for stable worker ids would allow the same exact behavior, without any sacrifice of correctness. And given it is opt-in, it does not add any complexity either.

// pseudo-code
WORKER_ID=UUID()
forever in a loop {
  graphile-worker unlock-worker-jobs --workerId $WORKER_ID
  run_my_code --graphile-worker-id $WORKER_ID
}

That said, I think the "4 hour" global lease time is not flexible enough. I think per-job max-attempts is genius, and also would love per-job lock interval - which I think solves this is another way.

Thanks for your great work here - I'm really bullish on db-based queues, and this was the best out there!

skabbes avatar Sep 24 '22 21:09 skabbes

Thanks for the careful reasoning @skabbes I agree with the vast majority of what you have said. That’s why the various values should be configurable and have long timeouts. Generally I don’t recommend Worker for anything that takes more than an hour (so 4 hours is 4x max runtime) but in my experience the vast majority (like 99+%) of worker jobs complete in under 5 minutes. There is value for people who understand their systems to be able to spot crashed workers sooner, but I’m always careful to weigh complexity when adding features.

Incidentally not everyone can use pg_cron which is why worker has native cron features.

Stable worker IDs comes with a massive list of issues/management complexity, I don’t think we’ll ever add that.

benjie avatar Sep 24 '22 22:09 benjie

re: cron - completely understand, just wanted to provide some end-user insight to the maintainers.

That’s why the various values should be configurable and have long timeouts

Ok, awesome. For what it's worth - I think I'd still consider lock_interval and keepalive_threshold to be the same value essentially. Or rather - I'd treat my "how long do I have to complete this job" as min(lock_interval, keepalive_threshold), since that is the only guarantee that my network + worker can give. (So, if this were a democracy - I'd vote to not build this feature - but instead to build per-job expiry time - which is simpler AND more powerful I believe, probably only slightly worse perf).

There is value for people who understand their systems to be able to spot crashed workers sooner

Agree - just disagree that that should be a concern of this project. Your call though :) As an end-user being able to effectively ignore these would satisfy me. I think given the reasons I said above, I certainly won't be using the CLI for worker, just using it completely in library mode - so maybe that's why I don't see this as too valuable.

Generally I don’t recommend Worker for anything that takes more than an hour

Hmm, it is interesting to hear you say that. I agree that 99% of jobs complete with less than 5 minutes. But for those cases where a job does take a long time, this seems to be an ideal system as well. It does not hold a database transaction open (or even require one to be open at all during job execution - as you pointed out), and other than the 4 hour limit - I don't see any reason why workers can't run arbitrarily long. Do you care to elaborate a bit? (we have maybe 1 or 2 monthly jobs that run in 45-90 minutes or so - mostly financial reporting related).

Stable worker IDs comes with a massive list of issues/management complexity

Perhaps I misspoke, I didn't mean "stable worker ids" I just meant user-defined worker-ids (which could be stable or random). But as I said, this is really a non-concern for me, I was just grasping at straws to try to see if there was a solution that did not require job garbage collection.

skabbes avatar Sep 24 '22 23:09 skabbes

For what it's worth - I think I'd still consider lock_interval and keepalive_threshold to be the same value essentially. Or rather - I'd treat my "how long do I have to complete this job" as min(lock_interval, keepalive_threshold), since that is the only guarantee that my network + worker can give. (So, if this were a democracy - I'd vote to not build this feature - but instead to build per-job expiry time - which is simpler AND more powerful I believe, probably only slightly worse perf).

I think after a worker has been disconnected from the database for a certain period of time (say 30 minutes), it may well want to exit and abort all running tasks in the hopes that rebooting the worker will fix its connectivity issues. Having these jobs be restarted within a short window based on the last successful checkin is desirable - especially if we allow the maximum lock time of jobs to increase to 24 hours or whatever (which, in my mind, is a completely separate concern). Having a worker crash and not reattempting its jobs for 24 hours just because that's the max lock time seems undesirable.

Agree - just disagree that that should be a concern of this project.

Helping handle failure modes definitely feels within the remit of what a job queue should do. I'm not certain that my currently proposed solution is the best way of doing this (which is in part why I've not built it yet), but it definitely feels like we should have something to improve the status quo.

But for those cases where a job does take a long time, this seems to be an ideal system as well. It does not hold a database transaction open (or even require one to be open at all during job execution - as you pointed out), and other than the 4 hour limit - I don't see any reason why workers can't run arbitrarily long. Do you care to elaborate a bit? (we have maybe 1 or 2 monthly jobs that run in 45-90 minutes or so - mostly financial reporting related).

Mostly this is due to concerns regarding failure modes:

  • A long running job is more likely to have spikes that make it an even more long running job
  • A long running job on a worker that runs many jobs concurrently is more likely to have the worker crash due to a bug in one of the other task handlers
  • In my experience long running jobs are also more likely to have their own failure modes because by their nature of being long running they're much harder (more time consuming) to test

I like to have a bit of lee-way, and if a job normally takes 2 hours then it doesn't seem unreasonable that sometimes it might take 5 or 6 e.g. if network becomes slow, or database load is high, or you suddenly find youself executing with a "noisy neighbour" on your host machine, or whatever. That, of course, could break the 4 hour "auto-unlock" that's currently hardcoded.

I do plan at some point to give more flexibility to the 4 hour thing (most other things are configurable, why not this?!) but currently I don't recommend Worker for jobs longer than about an hour for this reason. Of course, if you're happy with the risk/failure mode - go ahead. And you may want to put a timeout into your job itself, e.g. a Promise.race or similar, to ensure if you go over the limit then you have something in place to handle that situation.

I just meant user-defined worker-ids (which could be stable or random)

User-defined worker ids are problematic because it's very easy to (accidentally) boot two workers up using the same worker ID - this would then interfere with various of the strategies that we might use for catching/handling/etc crashed workers. For example you may restart your worker every night but the old container fails to exit so now you have two workers using the same ID.

benjie avatar Sep 26 '22 12:09 benjie

re: cron - completely understand, just wanted to provide some end-user insight to the maintainers.

By the way, your insights/reasoning were really valuable and I discussed them with @jemgillam - thanks for sharing!

benjie avatar Sep 26 '22 12:09 benjie

Thanks for the thoughtful answer!

Long jobs - sounds like just sharp edges of doing long-running jobs in general (and I'd have those problems in any job queue - that has a 4 hour visibility timeout - not just worker issues). Thanks for clarifying!

As for the other contentious topic... reasonable job queues (and people) will disagree here I think :) It seems like Redis and SQS use a basic visibility_timeout mechanism, and AMQP (Rabbit) will fail proactively on a closed channel /connection. I dont' have experience with Celery or Sideqik, but I imagine this is configurable on those 2 (assuming the underlying queue supports it).

This proposal is kind of a middle ground between the 2. I'm sure my preference here is 100% based on my usage of SQS in the past, and the fact that OOMs and process.exits are things I can control, and network issues I cannot. Unexpectedly failed processes are very rare on our team, the only time in recent memory was a run-away regex, and that needed to be dealt with manually anyway.

Just this back and forth gives me confidence you are thinking through it carefully, and gives me more confidence in using worker on my team. Thanks for the back-and-forth, and thanks for humoring me!

skabbes avatar Sep 26 '22 19:09 skabbes

Hey everyone, awesome discussion here, can't wait for this feature :)

I wanted to share what we're doing at seam.co in case it's useful and because it's so similar to what is being proposed here:

We give each worker it's own process with concurrency set to 1 (a bit atypical). Each worker runs a health service giving in an http endpoint you can use to look at it's health (helps with blue-green deployments) and pings the database every 10 seconds to upsert an entry in a worker_heartbeat table. The worker_heartbeart table contains the following:

  • the worker id
  • Indicator for the version of the application the worker is running (we use version_build_time and version_commit_hash)
  • Boolean was_accepting_jobs, which indicates if the worker was accepting new jobs at the time of heartbeart
  • last_job_accepted_at: the last time a worker picked up a job
  • last_heartbeart_at: last time the heartbeat was received (essentially last_active as proposed)

This is the SQL for our table:

CREATE TABLE worker_heartbeat (
    worker_heartbeat_id uuid DEFAULT (gen_random_uuid ()) NOT NULL,
    graphile_worker_id text NOT NULL,
    version_build_time pg_catalog.timestamptz NOT NULL,
    version_commit_hash text NOT NULL,
    was_accepting_jobs boolean NOT NULL,
    last_job_accepted_at pg_catalog.timestamptz,
    last_heartbeat_at pg_catalog.timestamptz DEFAULT (CURRENT_TIMESTAMP) NOT NULL,
    created_at pg_catalog.timestamptz DEFAULT (CURRENT_TIMESTAMP) NOT NULL
);

seveibar avatar Dec 12 '22 22:12 seveibar

My temporary solution is:

update graphile_worker.jobs set locked_by = null, locked_at = null where locked_at IS NOT NULL and locked_by IS NOT NULL;
update graphile_worker.job_queues set locked_by = null, locked_at = null where locked_at IS NOT NULL and locked_by IS NOT NULL;

FlowerWrong avatar Dec 13 '22 06:12 FlowerWrong

Very curious @seveibar ! That's very different from how we use it.

I'm curious why you care about the health of an individual worker in the first place - what do you do with this worker_heartbeat table, and what do you do if an individual worker is unhealthy for some reason?

Or, is the purpose of running a worker per process (essentially a job per process, I suppose) to isolate a potential crash from other ongoing jobs run in the same process?

ben-pr-p avatar Dec 15 '22 20:12 ben-pr-p

I'm curious why you care about the health of an individual worker in the first place - what do you do with this worker_heartbeat table, and what do you do if an individual worker is unhealthy for some reason?

If a worker doesn't report a heartbeat but is locking a job, it usually means that the job crashed and we should unlock it.

Or, is the purpose of running a worker per process (essentially a job per process, I suppose) to isolate a potential crash from other ongoing jobs run in the same process?

We run a worker per process because it helps separate our logs and ensure there's not resource contention (we use a lot of memory per job)- so not too important/related to this issue :)

seveibar avatar Dec 15 '22 23:12 seveibar

This has been implemented via the Crashed Worker Recovery feature in Worker Pro: https://worker.graphile.org/docs/pro/recovery - it was clear building it into core would have been controversial, and this makes for a nice clean separation: core doesn't track workers, pro does.

Read more about v0.16 and all the nice new features it introduces here: https://worker.graphile.org/news/2023-12-01-016-release

benjie avatar Dec 11 '23 12:12 benjie