rabbitmq-cli-consumer icon indicating copy to clipboard operation
rabbitmq-cli-consumer copied to clipboard

Process stops processing messages after certain amount of time

Open alsar opened this issue 9 years ago • 35 comments

I'm using your library in combination with Symfony console commands. I set it up, configured it added it to Supervisor and started 2 processes. Everything works fine, but after a few hours (around 3-5) it just stops processing messages from the queue. I then have to restart the processes and then it starts processing messages again. But again, just for few hours.

I looked into log files but its nothing there. Have you any idea why this could be happening?

My config file:

[rabbitmq]
host = localhost
username = user
password = pass
vhost = /media
queue = media-converter
compression = Off

[exchange]
name = media-converter
type = direct
durable = On

[logs]
error = /home/user/log/rabbitmq-cli-error.log
info = /home/user/log/rabbitmq-cli-info.log

I'm using the v1.1 (apt package) on Ubuntu 14.04.

alsar avatar Feb 04 '16 12:02 alsar

Could you also post your supervisord config? And both logs are completely empty? Not a single line?

ricbra avatar Feb 04 '16 15:02 ricbra

[program:project_dev_rabbitmq-cli-convert]
command=/usr/bin/rabbitmq-cli-consumer -e "/home/user/development/project/current/app/console convert-upload --rabbitmq" -c /home/user/config/rabbitmq_cli_project_dev.conf
process_name=%(program_name)s_%(process_num)02d
numprocs=2
autostart=true
autorestart=unexpected
user=user
stdout_logfile=/home/user/log/project_dev/convert-out.log
stderr_logfile=/home/user/log/project_dev/convert-error.log

I cleared all logs today before I started testing. After the process stopped processing messages I checked the logs from rabbitmq-cli-consumer and supervisord and nothing was in it.

alsar avatar Feb 04 '16 16:02 alsar

The problem seems to be in the Supervisor config. The minimal config seems to work now:

[inet_http_server]
port = 127.0.0.1:9001
username=user
password=pass

[unix_http_server]
file=/var/run/supervisor.sock
chmod=0700

[supervisord]
logfile=/tmp/supervisord.log
pidfile=/var/run/supervisord.pid

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[supervisorctl]
serverurl=unix:///var/run/supervisor.sock

[include]
files = /etc/supervisor/conf.d/*.conf

Before that i had a much bigger config, but I didn't investigate further, which line was causing the problem. For now it works - after 15 hours.

alsar avatar Feb 08 '16 08:02 alsar

It seems that the problem still exists. It worked for like 20 hours, but then stopped to processing messages. I restarted the Supervisor process and then again stopped processing after 5 hours.

I setup a test script that I run through Supervisor and it runs without a problem for 2 days now. So Supervisor should not be the problem. I think the problem lies somewhere in the rabbitmq-cli-consumer.

alsar avatar Feb 10 '16 10:02 alsar

What do you mean with "stopped processing messages"? Is the consumer still connected to rabbitmq? Or does it die out and supervisor stops restarting it after it fails too many times?

Also, I really need some logs from rabbitmq-cli-consumer before I can say anything useful about the problem you describe.

ricbra avatar Feb 10 '16 10:02 ricbra

The error log is empty and in the stdout log is just

Waiting for messages...
2016/02/09 11:22:04 Processing message...
2016/02/09 11:22:09 Processed!
2016/02/09 11:22:09 Processing message...
2016/02/09 11:22:13 Processed!
...

By "stopped processing messages" I mean that the process is still running, but no messages are processed anymore.

I just discovered that the consumer disconnects from RabbitMQ. So the process is running, but after certain amount of time (few hours) it disconnects from RabbitMQ.

I'll keep an eye on the RabbitMQ log to see what happens when the rabbitmq-cli-consumer disconnects.

alsar avatar Feb 10 '16 11:02 alsar

There should indeed be some info in the RabbitMQ log when rabbitmq-cli-consumer disconnects.

ricbra avatar Feb 10 '16 12:02 ricbra

After 21 hours it stoped working and I got this in the RabbitMQ log:

=WARNING REPORT==== 11-Feb-2016::09:08:34 ===
closing AMQP connection <0.342.0> (127.0.0.1:50993 -> 127.0.0.1:5672):
client unexpectedly closed TCP connection

Supervisor still shows that the rabbitmq-cli-consumer is running.

alsar avatar Feb 11 '16 08:02 alsar

What is your server setup? OS, version, multiple servers?

Is it possible to create a vagrant environment in which I can reproduce the errors? We have rabbitmq-cli-consumers processes which are running for months, so I think something in your environment is triggering this behaviour.

ricbra avatar Feb 11 '16 09:02 ricbra

It's a Ubuntu 14.04 machine. rabbitmq-cli-consumer v1.1.0 is installed via the APT repository. Supervisor 3.2.1 installed via pip. RabbitMQ is v3.6.0.

Locally on my Vagrant environment I didn't encounter this problem.

alsar avatar Feb 11 '16 19:02 alsar

I setup another server (Ubuntu 15.10) and there it currently works for two days. I hope that it will work from now on. Thanks for your help.

alsar avatar Feb 15 '16 18:02 alsar

@alsar I'm having the same problems. I have ~5 consumers and they all seem to die after 2-3 days. Did you find anything else that could lead to solving this problem?

raul-dan avatar Feb 15 '16 22:02 raul-dan

@lrauldan can you tell me about your situation? What OS etc?

We're on Debian Wheezy and RabbitMQ 3.5.1. Our consumers are running for months with no restart at all. Our queues are quiet only during the nights.

We need to find out how to reproduce this problem.

ricbra avatar Feb 16 '16 07:02 ricbra

@ricbra we're using ubuntu 14.04 on aws EC2, a t2.micro for staging purposes, RabbitMQ 3.5.6, Erlang R16B03, consumer version 1.1.0.

I have no experience with go but I was wondering if there is a way to check here if the TCP connection has gone away and to reconnect the consumer?

raul-dan avatar Feb 16 '16 08:02 raul-dan

I'll reopen the issue, because it seems that I'm not the only one that has/had this problem. On my new 15.10 server it now works for 4 days.

Maybe this problem is only present on Ubuntu 14.04, as both I and @lrauldan have this problem on the same Ubuntu version.

alsar avatar Feb 17 '16 13:02 alsar

But on the other side I didn't have any problems on Vagrant with Ubuntu 14.04.

alsar avatar Feb 17 '16 13:02 alsar

Neither did I but after a couple of days on staging we had over 100 messages stuck in our queues. Once I restarted supervisor all of them went thru and got processed without problems.

I will fork the consumer over the weekend and see if I can debug it and see what happens when the connection with rabbitmq gets lost.

raul-dan avatar Feb 17 '16 13:02 raul-dan

The connection itself is handled by streadway/amqp library (https://github.com/streadway/amqp). Maybe there is something usefull in the bug tracker on their Github repository?

Nevertheless it could also be caused by something in rabbitmq-cli-consumer, so looking forward to your findings.

ricbra avatar Feb 17 '16 14:02 ricbra

Any news on this @lrauldan ?

ricbra avatar Feb 27 '16 14:02 ricbra

I have the same problem with the release 1.1.0. Somehow, I don't have any issues on another project with only one consumer. My configuration is the following:

[rabbitmq]
host = rabbitmq
username = *******
password = *******
vhost=/
port=5672
queue=river_commands
compression=Off

[logs]
error =/dev/stderr
info = /dev/stdout

[prefetch]
count=1
global=Off

[exchange]
name=river_commands
autodelete=Off
type=direct
durable=On

sroze avatar Apr 13 '16 14:04 sroze

@ricbra I can confirm that we were not able to reproduce the issue again. The consumers have been running for a couple of months now and none hanged again.

raul-dan avatar May 19 '16 23:05 raul-dan

Hi Ric,

I encounter the same problem as others I also run the script under supervisord with minimal config there was no error on supervisord as well and the script status is running as per supervisord.

https://github.com/streadway/amqp as per your suggestion on the other post I visit that page and found this information on that github page, I don't know if this is related or not :)

mattwilliamson commented 17 days ago Just to make sure you don't chase your tail, the memory leak goes away when using QoS of 1, but when not using QoS, closing the channel and connection do not release the implicit buffer.

When i check your consumer.go is that right The QoS parameter was 0 ?

if err := ch.Qos(cfg.Prefetch.Count, 0, cfg.Prefetch.Global); err != nil { return nil, errors.New(fmt.Sprintf("Failed to set QoS: %s", err.Error())) }

Still chasing the ghost here the problem resurface every two days or so. Using Ubuntu 16.04 I will try to Set the QoS parameter to 1 land recompile your consumer app let see how this fly.

hellracer avatar Feb 27 '17 16:02 hellracer

Refer the official docs for the exact params of this method: https://godoc.org/github.com/streadway/amqp#Channel.Qos

Second param is prefetchSize. I don't understand exactly what mattwilliamson means with "QoS of 1".

Let me know if you solve the problem with this fix.

ricbra avatar Feb 27 '17 20:02 ricbra

This is a sample message that was stucked on queue as soon as I delete this message on queue the consumer script continue to run ric another suggestion I hand in mind how can you enclosed the argument on the invoke script with double quotes?

e.g php test.php "base64_encoded message" ?

The actual payload is a json encoded message

eNoUjjFPwzAQRv/K1bNx7uzETr2lEkJIgCq1A0MWIx+pReJEaQMD4r/jbk+np/ver5jmjzSy8AL35BpjTd02QoqJr9cw3O/PeQw5wvn98cGiheOBoBt5ve120MW4Fs/D27zeLvAaUobum/PGEp5CDnGWcEj9hvhJIUt4CcOWA5zTxF4jub5C3Ve6BSJvjCcr4bQwR49fU19dypPjyVOttEO0xSZNChvnalMKl7JZ8qjg8JOWO+61ItsqjagMib//AAAA//+muzzA

hellracer avatar Feb 28 '17 12:02 hellracer

That doesn't seem to decode correctly? are you sure what's being encoded is valid?

andrefigueira avatar Feb 28 '17 12:02 andrefigueira

//$str = "eNoUjjFPwzAQRv/K1bNx7uzETr2lEkJIgCq1A0MWIx+pReJEaQMD4r/jbk+np/ver5jmjzSy8AL35BpjTd02QoqJr9cw3O/PeQw5wvn98cGiheOBoBt5ve120MW4Fs/D27zeLvAaUobum/PGEp5CDnGWcEj9hvhJIUt4CcOWA5zTxF4jub5C3Ve6BSJvjCcr4bQwR49fU19dypPjyVOttEO0xSZNChvnalMKl7JZ8qjg8JOWO+61ItsqjagMib//AAAA//+muzzA";

//$str = base64_decode($str); //$str = gzuncompress($str);

//$a = json_decode($str); //print_r($a);

Yes that was I was thinking but try to run the script it will successfully decode the encoded message

hellracer avatar Feb 28 '17 22:02 hellracer

Hi guys,

Out of desperation move I slightly modify command_factory.go to send the base64 encoded via STDIN instead of sending the body as parameter. My issue I believed would be the encoded string still large enough even though it was compressed. Though to be honest it's just my hunch because I don't see the argument too long error from PHP resulting the PHP script to bailout and not processed the message.

Even though this will not fixed my issue the string limitation as argument is now removed and will be a major leap forward to triage my issue, thanks guys

hellracer avatar Mar 01 '17 03:03 hellracer

To other who might be interested of doing this I attach the code snippet on the other thread look for STDIN on the subject

hellracer avatar Mar 01 '17 03:03 hellracer

Hi Ric,

I was able to triage my issue and it's irrelevant on this post, what's relevant is basically we can't never tell what your developer will going to passed on the Queue, all I can say the STDIN enhancement should be in as another user option in the queue config section.

Please disregard the QoS issue i didn't touch the code obviously... for me I can say you can now close this issue 👍

hellracer avatar Mar 02 '17 00:03 hellracer

Hi I am using rabbitmq on heroku. It is working fine that is consumer keeps on processing messages. But whenever I make any change in any file and push the code the connection gets lost. In this case , I need to restart the connection manually by calling the consumer process. I have also checked the logs but I didnt get anything there also. Please help me out.

Consumer file class ConsumerController extends AppController { public function consume() {

	$this->writeLog("shopcontroller:: index::starting to make connection with rabbitmq ..");
		
	try {
		$connection = new AMQPStreamConnection("hostname", port, "username", "password", "vhost");
		
		$resultData = print_r($connection, 1);
		$this->writeLog("shopcontroller:: index::connection object value with rabbitmq :: $resultData");

	} catch (Exception $e) {
		$resultData = print_r($e, 1);
		$this->writeLog("shopcontroller:: index::exception while making connection with rabbitmq : $resultData");
	}

	if ($connection)
	{
		try {
			$channel = $connection->channel();
			
			$channel->queue_declare('testqueue', false, false, false, false);
			echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
			 
			$channel->basic_consume('testqueue', '', false, true, false, false,  array($this, 'processOrder') );
			while(count($channel->callbacks)) {
				$channel->wait();
			}
			$channel->close();
			$connection->close();
		} catch (Exception $e) {
			$resultData = print_r($e, 1);
			$this->writeLog("shopcontroller:: index::exception while making channel with rabbitmq connection : $resultData");
		}

	}
 
}   

Function processOrder() { $testCron = new testCron(); $testCron->test(); }

soodkritika avatar May 24 '17 12:05 soodkritika