yii2-rabbitmq
yii2-rabbitmq copied to clipboard
Freezing consumer after few minutes
Hi everyone
At first, thanks for this useful repository
I have problem on consumer connection
It not work after few minutes, no error, no any response in console
I am using Supervisor
on docker
My supervisor config:
[supervisord]
logfile=/dev/stdout
logfile_maxbytes=0
logfile_backups=0
loglevel=info
nodaemon=true
[program:php-fpm]
command=php-fpm -F -y /usr/local/etc/php-fpm.conf -c /usr/local/etc/php/conf.d/local.ini -R
autorestart=true
stdout_events_enabled=true
stderr_events_enabled=true
stdout_logfile_maxbytes=0
stderr_logfile_maxbytes=0
stdout_logfile=/dev/stdout
stderr_logfile=/dev/stderr
[program:consume]
command=php /home/www/app/yii rabbitmq/consume shahkar
autorestart=true
stdout_events_enabled=true
stderr_events_enabled=true
stdout_logfile_maxbytes=0
stderr_logfile_maxbytes=0
stdout_logfile=/dev/stdout
stderr_logfile=/dev/stderr
My rabbit config:
[
//Config help:: https://github.com/mikemadisonweb/yii2-rabbitmq
'class' => \mikemadisonweb\rabbitmq\Configuration::class,
'auto_declare' => true,
'connections' =>
[
[
'name' => 'rabbit',
// You can pass these parameters as a single `url` option: https://www.rabbitmq.com/uri-spec.html
'host' => ENV::get('RABBIT_HOST'),
'port' => ENV::get('RABBIT_PORT'),
'user' => ENV::get('RABBIT_USER'),
'password' => ENV::get('RABBIT_PASSWORD'),
'vhost' => ENV::get('RABBIT_VHOST'),
'type' => AMQPLazyConnection::class,
'url' => null,
'connection_timeout' => 3,
'read_write_timeout' => 3,
'ssl_context' => null,
'keepalive' => false,
'heartbeat' => 0,
'channel_rpc_timeout' => 0.0
]
// When multiple connections is used you need to specify a `name` option for each one and define them in producer and consumer configuration blocks
],
'exchanges' =>
[
[
'name' => 'd_exchange',
'type' => 'direct',
'passive' => false,
'durable' => true,
'auto_delete' => false,
'internal' => false,
'nowait' => false,
'arguments' => null,
'ticket' => null,
],
],
'queues' =>
[
[
'name' => 'sms',
'passive' => false,
'durable' => true,
'exclusive' => false,
'auto_delete' => false,
'nowait' => false,
'arguments' => null,
'ticket' => null,
// Queue can be configured here the way you want it:
//'durable' => true,
//'auto_delete' => false,
],
[
'name' => 'log',
'passive' => false,
'durable' => true,
'exclusive' => false,
'auto_delete' => false,
'nowait' => false,
'arguments' => null,
'ticket' => null,
],
[
'name' => 'nationalID',
'passive' => false,
'durable' => true,
'exclusive' => false,
'auto_delete' => false,
'nowait' => false,
'arguments' => null,
'ticket' => null,
]
],
'bindings' =>
[
[
'exchange' => 'd_exchange',
'queue' => 'sms',
'to_exchange' => null,
'routing_keys' => ['sms'],
],
[
'exchange' => 'd_exchange',
'queue' => 'log',
'to_exchange' => null,
'routing_keys' => ['log'],
],
],
'producers' =>
[
[
'name' => 'producer',
'connection' => 'rabbit',
'safe' => true,
'content_type' => 'text/plain',
'delivery_mode' => 2,
'serializer' => 'serialize',
],
[
'name' => 'log',
'connection' => 'rabbit',
'safe' => true,
'content_type' => 'text/plain',
'delivery_mode' => 2,
'serializer' => 'serialize',
],
],
'consumers' =>
[
[
'name' => 'shahkar',
// Every consumer should define one or more callbacks for corresponding queues
'connection' => 'rabbit',
'qos' => [
'prefetch_size' => 0,
'prefetch_count' => 0,
'global' => false,
],
'idle_timeout' => 0,
'idle_timeout_exit_code' => null,
'proceed_on_exception' => false,
'deserializer' => 'unserialize',
'callbacks' =>
[
// queue name => callback class name
'sms' => ShahkarConsumer::class,
],
],
],
'on before_consume' => function ($event) {
if (\Yii::$app->has('db') && \Yii::$app->db->isActive) {
try {
\Yii::$app->db->createCommand('SELECT 1')->query();
} catch (\yii\db\Exception $exception) {
\Yii::$app->db->close();
}
}
},
'logger' => [
'log' => true,
'category' => 'application',
'print_console' => true,
'system_memory' => true,
],
]
My consumer class:
namespace app\consumers;
use app\base\BaseConsumer;
use app\models\ModuleCorporates;
use app\models\ModuleTemp;
use Faker\Factory;
use Faker\Provider\fa_IR\Address;
use Faker\Provider\fa_IR\Person;
use Kavenegar\KavenegarApi;
use mikemadisonweb\rabbitmq\components\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
class ShahkarConsumer extends BaseConsumer implements \mikemadisonweb\rabbitmq\components\ConsumerInterface
{
/**
* @inheritDoc
*/
public function execute(AMQPMessage $msg)
{
$data = json_decode($msg->body);
/**
* Data format for `request_corporate_details` is : Object
*[
* 'mode' => 'request_corporate_details',
* 'corporate_national_id' => '.........STRING..........',
* 'ceo_nation_code' => '.........STRING..........',
* 'ceo_uid' => '..........INT.............'
*]
*/
try {
$faker = Factory::create();
$person = new Person;
$model = new ModuleTemp();
$fields =
[
'name' => $person->name(),
'operating_license_number' => $faker->numberBetween(1111111, 9999999),
'year_of_operation' => $faker->numberBetween(1350, 1100),
'description' => $faker->text(255),
'address' => $faker->address,
'phone' => $faker->phoneNumber,
'economic_code' => $faker->numberBetween(1111111111, 21399999999),
'legal_mode' => $faker->numberBetween(1, 3),
'site' => $faker->url,
'field_id' => null,
'category_id' => null,
'ceo_uid' => $data->uid,
'region_id' => null,
'province_id' => null,
'city_id' => null,
'national_id' => $data->national_id,
'ceo_name' => $faker->name,
'ceo_phone' => $faker->phoneNumber,
'ceo_national_code' => $person::nationalCode(),
'postal_code' => Address::postcode(),
'production_plan' => null,
];
$model->key = 'corp_'.$data->ceo_uid;
$model->value = json_encode($fields);
$model->created_at = date('Y-m-d H:i:s');
$model->age = 10800;
$model->save();
} catch (\Exception $e) {
$this->logException($e);
return ConsumerInterface::MSG_REQUEUE;
}
return ConsumerInterface::MSG_ACK;
}
}
Top result in Linux after few minutes, when connection is freezed:
Mem: 7993764K used, 173456K free, 83884K shrd, 1878812K buff, 3713880K cached
CPU: 3% usr 1% sys 0% nic 95% idle 0% io 0% irq 0% sirq
Load average: 0.16 0.15 0.12 3/706 31
PID PPID USER STAT VSZ %VSZ CPU %CPU COMMAND
21 17 www-data S 169m 2% 0 0% php-fpm: pool www
22 17 www-data S 169m 2% 2 0% php-fpm: pool www
17 7 root S 169m 2% 2 0% php-fpm: master process (/usr/local/etc/php-fpm.conf)
7 1 root S 138m 2% 1 0% supervisord -c /home/www/.deploy/config/supervisor.conf
16 7 root S 47100 1% 1 0% php /home/www/app/yii rabbitmq/consume sms
25 0 root S 1676 0% 1 0% /bin/sh
1 0 root S 1608 0% 3 0% sh /entrypoint.sh
31 25 root R 1604 0% 3 0% top
After few minutes consumer is running but in fact it is freezed and dont work.
Any idea?
Hi @yiiman-dev . The same occurred with me.
I solved adding "heartbeat" and "read_write_timeout" fields in connection array with a number greater than 0 (I've used 60 for both).
Note that setting both heartbeat
and read_write_timeout
to the same value can cause a race condition where the connection is closed just before a heartbeat arrives.
It seems to be recommended to use 60 seconds as heartbeat
and 130 seconds as read_write_timeout
. This is actually the default of php-amqplib. See https://github.com/php-amqplib/php-amqplib/pull/648