jobqueue icon indicating copy to clipboard operation
jobqueue copied to clipboard

Job Queue Bundle for Symfony

Scrutinizer Code QualityCode Climate

Provides async queues implementation for Symfony (using mongodb as main storage).

Supported drivers (storages):

  • MongoDB
  • Redis
  • Custom

Config:

Register the bundle config and all available "Jobs"

sfcod_queue:
    drivers:
        redis: 'SfCod\QueueBundle\Connector\RedisConnector'
    connections:
        default: { driver: 'redis', collection: 'queue_jobs', queue: 'default', expire: 360, limit: 2 }  

services:
#    _instanceof:
#        SfCod\QueueBundle\Base\JobInterface:
#            tags: ['sfcod.jobqueue.job']
    App\Job\:
        resource: '../src/Job/*'
        tags: ['sfcod.jobqueue.job']

Adding jobs to the queue:

Create your own "job" which implements SfCod\QueueBundle\Base\JobInterface and run it:

public function someFunc(JobQueue $jobQueue) {
    $data = [...];
    $jobQueue->push(YourJob::class, $data);
}

where $data is a payload for your job

Commands:

Run worker daemon with console command:

$ php bin/console job-queue:work
$ php bin/console job-queue:retry --id=<Job ID>
$ php bin/console job-queue:run-job <Job ID>

Where:

  • work - command to run daemon in loop;
  • retry - command to move all failed jobs back into queue, can be used with --id param to retry only single job
  • run-job - command to run single job by id

Available events:

'job_queue_worker.raise_before_job': SfCod\QueueBundle\Event\JobProcessingEvent;
'job_queue_worker.raise_after_job': SfCod\QueueBundle\Event\JobProcessedEvent;
'job_queue_worker.raise_exception_occurred_job': SfCod\QueueBundle\Event\JobExceptionOccurredEvent;
'job_queue_worker.raise_failed_job': SfCod\QueueBundle\Event\JobFailedEvent;
'job_queue_worker.stop': SfCod\QueueBundle\Event\WorkerStoppingEvent;

Configurable services list (with default parameters):

JobQueue:
SfCod\QueueBundle\Service\JobQueue:
    public: true
    arguments:
        - '@SfCod\QueueBundle\Service\QueueManager'

SfCod\QueueBundle\Service\JobQueue: main job queue service

Worker
SfCod\QueueBundle\Worker\Worker:
    arguments:
        - '@SfCod\QueueBundle\Service\QueueManager'
        - '@SfCod\QueueBundle\Service\JobProcess'
        - '@SfCod\QueueBundle\Failer\FailedJobProviderInterface'
        - '@SfCod\QueueBundle\Handler\ExceptionHandlerInterface'
        - '@Symfony\Component\EventDispatcher\EventDispatcherInterface'

SfCod\QueueBundle\Worker\Worker: async worker for "work" command

JobProcess
SfCod\QueueBundle\Service\JobProcess:
    arguments:
        - 'console'
        - '%kernel.project_dir%/bin'
        - 'php'
        - ''

JobProcess: default config for jobs command processor in async queues, where:

  • 'console' - name of console command
  • '%kernel.project_dir%/bin' - path for console command
  • 'php' - binary script
  • '' - binary script arguments
Connector
SfCod\QueueBundle\Connector\ConnectorInterface:
    class: SfCod\QueueBundle\Connector\RedisConnector
    arguments:
        - '@SfCod\QueueBundle\Base\JobResolverInterface'
        - '@SfCod\QueueBundle\Base\RedisDriver'

SfCod\QueueBundle\Connector\ConnectorInterface: connector for queues' database

Failer
SfCod\QueueBundle\Failer\FailedJobProviderInterface:
    class: SfCod\QueueBundle\Failer\RedisFailedJobProvider
    arguments:
        - '@SfCod\QueueBundle\Service\RedisDriver'
        - 'queue_jobs_failed'

SfCod\QueueBundle\Failer\FailedJobProviderInterface: storage for failed jobs

Job resolver
SfCod\QueueBundle\Base\JobResolverInterface:
    class: SfCod\QueueBundle\Service\JobResolver
    arguments:
        - '@Symfony\Component\DependencyInjection\ContainerInterface'

SfCod\QueueBundle\Base\JobResolverInterface: resolver for jobs, it builds job using job's display name, for default jobs fetches from container as a public services.

Exception handler
SfCod\QueueBundle\Handler\ExceptionHandlerInterface:
    class: SfCod\QueueBundle\Handler\ExceptionHandler
    arguments:
        - '@Psr\Log\LoggerInterface'

SfCod\QueueBundle\Handler\ExceptionHandlerInterface: main exception handler, used for logging issues

Testing:

You can run tests using prepared configuration xml file:

php bin/phpunit --configuration ./vendor/sfcod/jobqueue/phpunit.xml.dist --bootstrap ./vendor/autoload.php