laravel-queue-rabbitmq icon indicating copy to clipboard operation
laravel-queue-rabbitmq copied to clipboard

clear queues after restarting

Open housewi opened this issue 4 years ago • 2 comments

  • Laravel version: 8.22.0
  • RabbitMQ version: 3.8.9
  • Package version: 11

Describe the bug I want to consume the queue of another application. When stop and restart php artisan rabbitmq:consume reload and consume again the old queue.

Steps To Reproduce .env

BROADCAST_DRIVER=redis
CACHE_DRIVER=file
QUEUE_CONNECTION=rabbitmq
SESSION_DRIVER=file
SESSION_LIFETIME=120

MEMCACHED_HOST=memcached

REDIS_HOST=localhost
REDIS_PASSWORD=null
REDIS_PORT=6379

queue.php

'rabbitmq' => [

            'driver' => 'rabbitmq',
            'queue' => env('RABBITMQ_QUEUE', 'test'),
            'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class,

            'hosts' => [
                [
                    'host' => env('RABBITMQ_HOST', 'xxx.xxx.xxx.xxx'),
                    'port' => env('RABBITMQ_PORT', 5672),
                    'user' => env('RABBITMQ_USER', 'xxxx'),
                    'password' => env('RABBITMQ_PASSWORD', 'xxxx'),
                    'vhost' => env('RABBITMQ_VHOST', 'xxxxx'),
                ],
            ],

            'options' => [
                'ssl_options' => [
                    'cafile' => env('RABBITMQ_SSL_CAFILE', null),
                    'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null),
                    'local_key' => env('RABBITMQ_SSL_LOCALKEY', null),
                    'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true),
                    'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null),
                ],
                'queue' => [
//                    'job' => VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class,
                    'job' => \App\Queue\Job\RabbitMQJob::class
                ],
            ],

            /*
             * Set to "horizon" if you wish to use Laravel Horizon.
             */
            'worker' => env('RABBITMQ_WORKER', 'horizon'),

        ],

database.php

    'redis' => [

        'client' => env('REDIS_CLIENT', 'predis'),

        'options' => [
            'cluster' => env('REDIS_CLUSTER', 'redis'),
            'prefix' => env('REDIS_PREFIX', Str::slug(env('APP_NAME', 'laravel'), '_').'_database_'),
        ],

        'default' => [
            'url' => env('REDIS_URL'),
            'host' => env('REDIS_HOST', '127.0.0.1'),
            'password' => env('REDIS_PASSWORD', null),
            'port' => env('REDIS_PORT', '6379'),
            'database' => env('REDIS_DB', '0'),
        ],

        'cache' => [
            'url' => env('REDIS_URL'),
            'host' => env('REDIS_HOST', '127.0.0.1'),
            'password' => env('REDIS_PASSWORD', null),
            'port' => env('REDIS_PORT', '6379'),
            'database' => env('REDIS_CACHE_DB', '1'),
        ],

    ],

app/Queue/Jobs/RabbitMQJob.php


namespace App\Queue\Job;
use App\Jobs\QueueJob;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob;

class RabbitMQJob extends BaseJob
{

    /**
     * Fire the job.
     *
     * @return void
     */
    public function fire()
    {
        $payload = $this->payload();

        $class = QueueJob::class;
        $method = 'handle';

        ($this->instance = $this->resolve($class))->{$method}($this, $payload);

    }


    /**
     * Get the decoded body of the job.
     *
     * @return array
     */
    public function payload()
    {
        return [
            'job'  => 'App\Jobs\QueueJob@handle',
            'body' => json_decode($this->getRawBody(), true)
        ];
    }
}

app/Queue/Jobs.php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class QueueJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle($job, $payload)
    {
        dump($payload);
    }
}

Ok this package is great, but where is suspend the cosume message (CTRL + C) and restart php artisan rabbitmq:consume reload all queue already sent and I'm forced to set $job->delete() in handle JOB, but it's correct.

Why when resume reload all queue message consumed?

Thanks all...

Example: Launch php artisan rabbitmq:consume

[2021-01-14 08:14:14][] Processing: App\Jobs\QueueJob@handle
array:2 [
  "job" => "App\Jobs\QueueJob@handle"
  "body" => array:1 [
    "message" => "hi"
  ]
]
[2021-01-14 08:14:14][] Processed:  App\Jobs\QueueJob@handle

Stop the consume (CTRL + C)

relaunch php artisan rabbitmq:consume

[2021-01-14 08:14:14][] Processing: App\Jobs\QueueJob@handle
array:2 [
  "job" => "App\Jobs\QueueJob@handle"
  "body" => array:1 [
    "message" => "hi"
  ]
]
[2021-01-14 08:14:14][] Processed:  App\Jobs\QueueJob@handle

housewi avatar Jan 14 '21 09:01 housewi

Same issue.

Could it be due to the fact that no ack or nack is done? How should the ak or nack be done?

viandanteoscuro avatar Jan 14 '21 16:01 viandanteoscuro

I think u are using horizon as a queue manager and manage worker processes through horizon. When you mix your artisan commands and horizon you're in a trail and error pattern. Ofcourse you can mix diffrent types of workers but you have to be carefull and know what your are doing...

First try this without horizon as a worker. Default

This is most likely because horizon depents on alot of meta data in redis and your Job payload does not contain this meta data nor is in the redis database.

adm-bome avatar Feb 19 '21 23:02 adm-bome

This issue is being closed due to its age. If you still are encountering the issue, please re-open so we can assist.

khepin avatar Feb 04 '23 00:02 khepin