ccs-calendarserver icon indicating copy to clipboard operation
ccs-calendarserver copied to clipboard

SQL-based Work Queue enhancements

Open macosforgebot opened this issue 11 years ago • 2 comments

@cyrusdaboo originally submitted this as ticket:839


txext.enterprise.queue implements a database-based distributed work queue. The basic behavior is this:

  • Each host registers itself in an SQL NODE_INFO table, and listens on a port for AMP connections.
  • An item of work is defined by a row in an SQL table, with multiple tables used for different work types.
  • An item of work is enqueued using txn.enqueue() which includes an argument for a notBefore time. A work item will not execute until notBefore.
  • The process that enqueues the work item sets up a reactor.callLater() to trigger at the notBefore time, with the callback executing the work item in that process. If the work fails, it is “orphaned” (see next item).
  • Each host has a master process that regularly polls (default 1 minute interval) all work queue tables to see if there are any “orphaned” work items (ones which are past their notBefore time by some amount - current 10 minutes). Any items found in this sweep are dispatched to a local process based on the current work load of each process. If there is too much local load, it will poll the other hosts (using the AMP protocol) to find one willing to accept the work and hand it off to that one.
  • Work items have a “group” class property that is used to create a lock when the work item is dequeued, preventing concurrent execution of work with the same id.
  • Work items can coalesce by scanning their work table for other matching items and delete those during execution of the selected work item.

There are several issues with the current implementation that we need to address:

  1. Dequeue locking: when an item of work is dequeued (see txext.enterprise.queue.ultimatelyPerform) first its group id is locked using a NamedLock, then its record (row) is deleted from the work table. Both of those actions will block if some other process is already processing a work item in the same group, or if there is no group, if the work item itself is being processed. The problem with that is that it will block the “orphaned” work item polling loop - and it is possible for all hosts to end up blocked beyond one long-lived item at the “top” of a work table.
  2. There needs to be a way to enqueue a work item without scheduling it via callLater in the current process. For work items known to require significant work, it would be better to have them scheduled on a node/process with low load - there is no guarantee the process creating the work item will have low load at the time the callLater triggers.
  3. Item (2) also suggests we need some way to indicate priority and estimated load for different types of work item. e.g. we know push notification work is a light load and needs to occur close to real-time (within 3 secs of being scheduled). Whereas other work, scheduling, group cacher etc, involve heavier loads and typically execute a minute or hours after being enqueued.
  4. The default “orphan” notBefore limit seems high - waiting for 10 minutes for a high priority item is not good, particularly if we fix item (2) where we would be deliberately “orphaning” work.
  5. Scaleability - some types of work may involve process of very large data sets - e.g. scans over calendar homes, all attachments etc. Dumping 100K work items into the queue system is not ideal - particularly in the absence of any kind of prioritization mechanism. A better approach would be to create a limited set (pool) of work items that run in parallel and process the overall (large) work set. However, that needs to be done in a way to minimize lock contention (i.e., address item (1) in some fashion).

macosforgebot avatar Feb 14 '14 17:02 macosforgebot

@wsanchez originally submitted this as comment:1:⁠ticket:839

  • Owner changed from @wsanchez to @cyrusdaboo
  • Milestone set to Sooner

macosforgebot avatar Feb 24 '14 19:02 macosforgebot

@cyrusdaboo originally submitted this as comment:2:⁠ticket:839


Experimented with the following.

Use “select … for update” and find a way to skip rows already locked and return the first unlocked row.

Postgres:

create or replace function next_job() returns integer as $$
declare
  result integer;
begin
  select ID into result from JOB where pg_try_advisory_xact_lock(ID) limit 1 for update;
  return result;
end
$$ LANGUAGE plpgsql;

To get the next locked ID value:

select * from next_job();

Oracle:

create or replace function next_job return integer is
  cursor c1 is select ID from JOB for update skip locked;
  result integer;
begin
  open c1;
  fetch c1 into result;
  select ID from JOB where ID = result for update;
  return result;
end;
/

To get the next locked ID value:

var r number;
call next_job() into :r;
select :r from dual;

With each of the above, multiple sessions each get there next available ID without being blocked on existing sessions. For our purposes we need to have a single JOB table that lists all outstanding jobs (work tables still exist to describe each type of work):

create table JOB (
  ID     integer primary key default nextval('JOB_SEQ') not null, --implicit index
  CLS    varchar(255) not null,
  PRIORITY    integer default 0,
  WEIGHT      integer default 0,
  NOT_BEFORE  timestamp default null,
  NOT_AFTER   timestamp default null
);

When a job is enqueued an entry in the JOB table and class/type specific work queue table is made (the later references the ID column in the JOB table - or not in the case where there is a bunch of work to be done by a smaller pool of “workers”). Each job includes priority/weight and timing details (notBefore holds the job until the specified time, notAfter will cause the job priority to bump up if after that time).

To dequeue the next job, the next_job() stored procedure is used. That will need to define the logic for sorting the JOB rows so as to pick the next job taking priority/weight/notBefore/notAfter into account.

macosforgebot avatar Feb 24 '14 20:02 macosforgebot