meteor-job-collection
meteor-job-collection copied to clipboard
Confusion about multiple workers and job types
Say I have a demanding job type that takes a while to process, and I decide to allocate multiple workers to handle it. I'm unclear as to how or even if multiple workers can listen for the same job type and not get the same job. Is this somehow possible?
I see the following in the docs:
Note: if you are running multiple Meteor instances that share access to a single job collection, you can set the time each instance waits to promote to N * milliseconds , where N is the number of Meteor instances. The instances will each take turns promoting jobs at 1/Nth of the desired rate.
Do you mean the number of instances, or the number of the instance? If, say, I have 2 instances, then it would seem that setting them all to promote at 2*1000 would make them promote at the same or similar time. Granted, they won't all start simultaneously, but if I have 15 instances, they'll all promote every 15 seconds since n would be 15. Very confused. :)Thanks.
Hi, in that quote from the docs, the instances refer to multiple job-collection servers, backed by the same (perhaps replicated) mongoDB. It's a scalability feature. This is almost certainly not what you are wondering about...
In job-collection, there are two kinds of agents:
- Server: Always a Meteor server, it acts upon any requests made using the job-collection API. Only the server talks directly to the MongoDB backing store for the job-collection. Somewhat confusingly it is possible for there to be more than one server "instance" managing the same job-collection. This is no different than having more than one Meteor server instance hosting the same application (backed by the same database). Multiple instances may be deployed for redundancy and/or scalability. A small fraction of the job-collection API is only available on the server(s) hosting that collection. The "promote" feature is one such call. All it does is determine how often the server should wake-up and check if any waiting jobs (those scheduled to run in the future) are now ready to run because their time has come.
- Worker: Workers can be a plain node.js program (using the meteor-job npm package), a Meteor client (running in a browser) or a Meteor server (which may or may not be hosting the job-collection itself in the server role.) There can be any number of workers, and each worker determines what type(s) of jobs it works on and how many it can handle at the same time (concurrency).
A given job can only ever be running on a single worker (if the job fails or the worker goes down, it can be reassigned to another worker). Of course if that worker has multiple cores, it can parallelize a large job to exploit multiple cores, although then that worker should only request one job at a time. Alternatively, jobs can be created to structure the work so that each piece can be processed independently on a single core. In that case, a machine with N cores can choose to concurrently process N jobs at a time. In a way this can be more efficient than the single multi-core job, because each piece of work can be completed independently.
job-collection supports this type of divide and conquer processing via two cool features:
- Jobs can create other jobs (with authorization). So you can create a job that splits your big job into 100 smaller jobs.
- Jobs can be made dependent on the successful completion of other jobs. So the above "splitter" job can also create a "merger" job that depends on the completion of all of the sub-part jobs, and then merges their work and performs any clean-up.
Hope that helps.
Ah, that is helpful. Thanks, wonder if it might be added to the README? So it looks like the job server doesn't perform work, but they can be running in the same process (say if I know I want a job queuing system for a given requirement but don't want to break it out until I need to.) Thanks for disambiguating the server/worker distinction.
How does this system ensure that jobs only run on a single worker? Say I have a "provision" job type that takes lots of time to run, so I decide to create several workers polling for provision jobs. If I have several Meteor job servers, and submit a provision job to one, how do I ensure that several workers polling for provision jobs don't get the same job and start performing the same work?
Thanks for the quick response.
I'm always open to PRs for documentation improvements... :smile:
So it looks like the job server doesn't perform work, but they can be running in the same process (say if I know I want a job queuing system for a given requirement but don't want to break it out until I need to.)
Right, while prototyping your worker can live in your server. Or maybe even better, it can be a separate node.js process that runs on the same machine as the server in a different process. That way when it's time to scale up you can just replicate that worker code on N cores/machines.
I guess this is a good place to mention that if you run a single node.js process on a box with 8 cores and set that worker to concurrently process 8 jobs, it doesn't magically become a multi-threaded worker. You can either run 8 node.js processes, each with concurrency 1, or you can run one node.js process that forks or swawns up to 8 child processes to actually do the work. Ditto if your worker runs inside of your Meteor server.
How does this system ensure that jobs only run on a single worker?
Atomic MongoDB updates. Workers request work when they are free. A job of the requested type that is "ready" will atomically be updated to be "running". A job that is already "running" is never sent to another worker.
Additionally, each running job is assigned a unique runId at the time it goes from ready to running. So if a job is assigned to a worker "A", and that worker, say, loses network connectivity but continues working on the job. Later, the server decides that worker "A" is dead and fails that run so the job can be retried by another worker "B". The job gets a new runId so that if worker "A" finishes its work and finds that its network connection is back up, it can no longer update or finish the job because it doesn't know the current runId.
BTW, you can see an example of spawning concurrent sub workers to exploit a multi-core machine here:
https://github.com/vsivsi/meteor-file-job-sample-app/blob/master/sample.coffee#L516-L568
This sample app uses graphicsMagick to create thumbnails for uploaded images. The last line sets the concurrency. The gm
npm library invokes graphicsMagic in a separate process for each call.
@vsivsi Hi, I also been wondering about this, What if many workers looking for jobs using the observe
method it means all workers will get called to work on the job, But only the first one available and the one who does jc.update({status: 'running'})
will run the job? Based if this was a successful update? or how does it do that? Thanks!
The code is here: https://github.com/vsivsi/meteor-job-collection/blob/master/src/shared.coffee#L458
Only one worker can successfully complete that update
. It works because single document updates (findAndModify) are atomic in MongoDB.
And even if, somehow, multiple workers thought they got the job, only one would get the final valid runId
, which would cause any subsequent calls from the other worker(s) on that job to fail.
@vsivsi that's really interesting thanks for explaining, findAndModify
looks really useful in this case. But the observe
method means it will notify all workers at once right? If i got 100 workers they all get notified? Anyway to stop this? It can do some heavy load on the database, each worker verifing the runId
Thanks
I'm not sure what observe
method you are referring to. There are no observes built into job-collection. Workers get work by asking for it, either by calling getWork
directly, or by running processJobs
to set up a local queue (which calls getWork
internally). Any observes that may drive either of those mechanisms need to be implemented by you, and so you are in full control of whatever overhead they generate.
So in general, workers do not see all of the jobs in the collection, and they do not need to "verify the runId" or perform any such checks. Workers don't need a local minimongo collection that is subscribed to a published server collection to function. So by default, workers are very lean, and can be implemented with a pure node.js script (without the Meteor environment) and just the DDP npm package (for authentication and method invocation) and the meteor-job package, to implement the Job object needed to interact with the Job server remotely. That's it.
@vsivsi i mean this kind of observer from the docs
// Simple observe based queue
q = jc.processJobs(
// Type of job to request
// Can also be an array of job types
'jobType',
{
pollInterval: 1000000000, // Don't poll
},
function (job, callback) {
// Only called when there is a valid job
job.done();
callback();
}
);
jc.find({ type: 'jobType', status: 'ready' })
.observe({
added: function () { q.trigger(); }
});
I think i get it right if i have many workers or meteor instances with the same code, The observe for added
will be called on all of them, and they all will trigger q.trigger();
and a race is going to happen on the first worker who gets hold of findAndModify
or I'm getting this wrong?
That's right, but you don't have to implement it that way if you don't want to. You can set your workers up to just periodically poll for work using the pollInterval
. They only poll getWork
when their local queue isn't full, and they automatically check for new work whenever a job finishes locally, so the polling overhead isn't too bad.
Basically you can do it any way you want. You can poll, you can observe, you can even do a hybrid of the two approaches. If you have a very busy queue and most workers are busy, then calling q.trigger()
in every worker won't result in most of those workers actually calling getWork
because only workers with free slots in their local queue will request more work.