laravel-queue-rabbitmq
laravel-queue-rabbitmq copied to clipboard
clear queues after restarting
- Laravel version: 8.22.0
- RabbitMQ version: 3.8.9
- Package version: 11
Describe the bug I want to consume the queue of another application. When stop and restart php artisan rabbitmq:consume reload and consume again the old queue.
Steps To Reproduce .env
BROADCAST_DRIVER=redis
CACHE_DRIVER=file
QUEUE_CONNECTION=rabbitmq
SESSION_DRIVER=file
SESSION_LIFETIME=120
MEMCACHED_HOST=memcached
REDIS_HOST=localhost
REDIS_PASSWORD=null
REDIS_PORT=6379
queue.php
'rabbitmq' => [
'driver' => 'rabbitmq',
'queue' => env('RABBITMQ_QUEUE', 'test'),
'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class,
'hosts' => [
[
'host' => env('RABBITMQ_HOST', 'xxx.xxx.xxx.xxx'),
'port' => env('RABBITMQ_PORT', 5672),
'user' => env('RABBITMQ_USER', 'xxxx'),
'password' => env('RABBITMQ_PASSWORD', 'xxxx'),
'vhost' => env('RABBITMQ_VHOST', 'xxxxx'),
],
],
'options' => [
'ssl_options' => [
'cafile' => env('RABBITMQ_SSL_CAFILE', null),
'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
],
'queue' => [
// 'job' => VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class,
'job' => \App\Queue\Job\RabbitMQJob::class
],
],
/*
* Set to "horizon" if you wish to use Laravel Horizon.
*/
'worker' => env('RABBITMQ_WORKER', 'horizon'),
],
database.php
'redis' => [
'client' => env('REDIS_CLIENT', 'predis'),
'options' => [
'cluster' => env('REDIS_CLUSTER', 'redis'),
'prefix' => env('REDIS_PREFIX', Str::slug(env('APP_NAME', 'laravel'), '_').'_database_'),
],
'default' => [
'url' => env('REDIS_URL'),
'host' => env('REDIS_HOST', '127.0.0.1'),
'password' => env('REDIS_PASSWORD', null),
'port' => env('REDIS_PORT', '6379'),
'database' => env('REDIS_DB', '0'),
],
'cache' => [
'url' => env('REDIS_URL'),
'host' => env('REDIS_HOST', '127.0.0.1'),
'password' => env('REDIS_PASSWORD', null),
'port' => env('REDIS_PORT', '6379'),
'database' => env('REDIS_CACHE_DB', '1'),
],
],
app/Queue/Jobs/RabbitMQJob.php
namespace App\Queue\Job;
use App\Jobs\QueueJob;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob;
class RabbitMQJob extends BaseJob
{
/**
* Fire the job.
*
* @return void
*/
public function fire()
{
$payload = $this->payload();
$class = QueueJob::class;
$method = 'handle';
($this->instance = $this->resolve($class))->{$method}($this, $payload);
}
/**
* Get the decoded body of the job.
*
* @return array
*/
public function payload()
{
return [
'job' => 'App\Jobs\QueueJob@handle',
'body' => json_decode($this->getRawBody(), true)
];
}
}
app/Queue/Jobs.php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class QueueJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct()
{
//
}
/**
* Execute the job.
*
* @return void
*/
public function handle($job, $payload)
{
dump($payload);
}
}
Ok this package is great, but where is suspend the cosume message (CTRL + C) and restart php artisan rabbitmq:consume reload all queue already sent and I'm forced to set $job->delete() in handle JOB, but it's correct.
Why when resume reload all queue message consumed?
Thanks all...
Example: Launch php artisan rabbitmq:consume
[2021-01-14 08:14:14][] Processing: App\Jobs\QueueJob@handle
array:2 [
"job" => "App\Jobs\QueueJob@handle"
"body" => array:1 [
"message" => "hi"
]
]
[2021-01-14 08:14:14][] Processed: App\Jobs\QueueJob@handle
Stop the consume (CTRL + C)
relaunch php artisan rabbitmq:consume
[2021-01-14 08:14:14][] Processing: App\Jobs\QueueJob@handle
array:2 [
"job" => "App\Jobs\QueueJob@handle"
"body" => array:1 [
"message" => "hi"
]
]
[2021-01-14 08:14:14][] Processed: App\Jobs\QueueJob@handle
Same issue.
Could it be due to the fact that no ack or nack is done? How should the ak or nack be done?
I think u are using horizon as a queue manager and manage worker processes through horizon. When you mix your artisan commands and horizon you're in a trail and error pattern. Ofcourse you can mix diffrent types of workers but you have to be carefull and know what your are doing...
First try this without horizon as a worker. Default
This is most likely because horizon depents on alot of meta data in redis and your Job payload does not contain this meta data nor is in the redis database.
This issue is being closed due to its age. If you still are encountering the issue, please re-open so we can assist.