queue icon indicating copy to clipboard operation
queue copied to clipboard

Calling enqueueWithData: when queue is not running can cause duplicate calls to processJob: delegate method

Open ruddct opened this issue 12 years ago • 7 comments

Repro steps:

// With [EDQueue sharedInstance] stopped
[[EDQueue sharedInstance] enqueueWithData:@{"test" : "data"} forTask:@"task"];
[[EDQueue sharedInstance] enqueueWithData:@{"test" : "data"} forTask:@"task2"];
[[EDQueue sharedInstance] start];

Expected: The queue should ask its delegate to execute "task", wait for that to be completed, then follow by asking its delegate to complete "task2".

Actual: A race condition occurs which can cause the queue to ask its delegate to process "task" and/or "task2" multiple times.

Bug description: Here's my understanding of why this is happening:

  • enqueueWithData: calls tick after inserting the job into the storage engine.
  • start also calls tick after setting isRunning to true.
  • tick dispatches a block to a background queue that asks its delegate to execute that job. isRunning is checked inside that async call to determine whether the delegate actually gets asked to complete the job.
  • Calling enqueueWithData: before calling start dispatches multiple async blocks that all can ask the delegate to process at the first job in the queue. The blocks that are executed after isRunning is set to true will all ask their delegate to process the first job in the queue (i.e. it duplicates calls for the same job). This causes duplicate calls to the delegate for the same job.

It seems like the solution is to wrap the tick call in enqueueWithData, but would love feedback on whether that would break use-cases I haven't thought about.

Here's a modified enqueueWithData:

if (data == nil) data = @{};
    [self.engine createJob:data forTask:task];
    if (self.isRunning)
        [self tick];

ruddct avatar Oct 21 '13 19:10 ruddct

Thanks so much for the detailed issue. I'll dig into this asap.

thisandagain avatar Oct 25 '13 06:10 thisandagain

Hello. I suspect this bug still exists in the project. I have given this type of problem a lot of thought over the years through my experiences with queue_classic; so I will submit a few ideas for consideration.

One initial idea is to make some adjustments to the database model such that we eliminate potential race conditions. In most cases I would curse a RDBMS for a lack of concurrency, but in our case, sqlite's non-concurrent promise comes in handy. As you will notice, we are promised that data updates are serialized by sqlite. Therefore, we could add an additional column to the queue table that would indicate wether a job is locked. We can use a transaction to select an unlocked job, update it's locked column, then commit the transaction.

Assuming we use a data model which handles locking at the database level, we can then start to simplify and potentially improve the method in which jobs are processed. For instance, consider the case in which several jobs have failed (possibly due to a bad network connection) and consequently our queue depth has increased. The current technique we use to process jobs is such that we would attempt to work off the jobs one by one. But if we have a data model that ensures that jobs can only be locked by a single, logical process, we could take a different approach to processing jobs. We could have a job scheduler, whose only responsibility is to lock a job, and then dispatch the job on a thread to be worked. In the aforementioned scenario, we could be working the backlog off in parallel.

I am sure there are some details that need to be addressed with my meager proposal, but I trust that I have demonstrated that by moving towards a more robust data model we can open the door to faster, safer job processing.

ryandotsmith avatar Apr 04 '14 04:04 ryandotsmith

I will also say that if @thisandagain is interested in this direction, I would be more than happy to start working on a pull request.

ryandotsmith avatar Apr 04 '14 04:04 ryandotsmith

@ryandotsmith Totally agree... "locking" rows in the data model is a nice / simple way to address this and guarantee some safety and add some flexibility. :+1:

thisandagain avatar Apr 04 '14 22:04 thisandagain

Therefore, we could add an additional column to the queue table that would indicate wether a job is locked

We should definitely do that

We can use a transaction to select an unlocked job, update it's locked column, then commit the transaction.

But we can do something much simpler. This is actually what the popular delayed_job ruby gem does. https://github.com/collectiveidea/delayed_job_active_record/blob/master/lib/delayed/backend/active_record.rb

We can just perform lock updates with the where clause.

UPDATE jobs SET locked_at = <time> WHERE locked_at IS NULL

If this statement is executed concurrently by two ore more threads/processes only one of them can succeed because locked_at is than not null anymore

dre-hh avatar May 27 '14 12:05 dre-hh

Great feedback. Thanks @dre-hh

thisandagain avatar Jun 07 '14 17:06 thisandagain

A similar problem is happened when I create task concurrently. Finally I simply patch the EDQueue.m file and the tick instance method to active task in sequence.

  // EDQueue.m
  - dispatch_queue_t gcd = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0);
  + dispatch_queue_t gcd = dispatch_get_main_queue();

I think that threads read the same isActive property value, which is false, in the same instant.

tin4g avatar Aug 31 '16 11:08 tin4g