RabbitMQ Peer-Connection
Hello folks,
im am using AMQP (RabbitMQ) for our IntelMQ solution. During development I noticed, that the bots losing connection to RabbitMQ after short time. In my comprehension I think it is because of the short heartbeat parameter set in the pipeline.py.
It says, that the connection should timeout after 10 seconds. I think there was a misunderstanding by that (probably the developer thought that the heartbeat is send after every 10 seconds).
In pipeline.py:
if pika_version < (0, 11):
self.kwargs['heartbeat_interval'] = 10
else:
self.kwargs['heartbeat'] = 10
It would be great, if there is a global parameter, maybe for the Global section in runtime.yaml.
FYI: The source queue does not timeout because it is continually asking the queue if there is a message (pseudo heartbeat), but the destination queue does. After timeout the bot throw an exception if it tries to send a message to the destination queue (like following Error Log) and reconnect after 15 seconds. So the bot is still working after that, but in my opinion in an unpleasant way.
Error Log:
pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/intelmq_dev/intelmq/lib/bot.py", line 321, in start
self.process()
File "/opt/intelmq_dev/intelmq/bots/experts/filter/expert.py", line 142, in process
self.send_message(event)
File "/opt/intelmq_dev/intelmq/lib/bot.py", line 633, in send_message
self.__destination_pipeline.send(raw_message, path=path,
File "/opt/intelmq_dev/intelmq/lib/pipeline.py", line 558, in send
self._send(destination_queue, message)
File "/opt/intelmq_dev/intelmq/lib/pipeline.py", line 533, in _send
raise exceptions.PipelineError(exc)
intelmq.lib.exceptions.PipelineError: pipeline failed - StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",)
2022-06-29 13:33:23,893 - Filter-Expert - INFO - Bot will continue in 15 seconds.
2022-06-29 13:35:07,908 - Filter-Expert - ERROR - Pipeline failed.
Traceback (most recent call last):
File "/opt/intelmq_dev/intelmq/lib/pipeline.py", line 519, in _send
retval = self.channel.basic_publish(exchange=self.exchange,
File "/usr/local/lib/python3.8/dist-packages/pika/adapters/blocking_connection.py", line 2211, in basic_publish
self._flush_output(self._message_confirmation_result.is_ready)
File "/usr/local/lib/python3.8/dist-packages/pika/adapters/blocking_connection.py", line 1335, in _flush_output
self._connection._flush_output(lambda: self.is_closed, *waiters)
File "/usr/local/lib/python3.8/dist-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output
raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/intelmq_dev/intelmq/lib/bot.py", line 321, in start
self.process()
File "/opt/intelmq_dev/intelmq/bots/experts/filter/expert.py", line 142, in process
self.send_message(event)
File "/opt/intelmq_dev/intelmq/lib/bot.py", line 633, in send_message
self.__destination_pipeline.send(raw_message, path=path,
File "/opt/intelmq_dev/intelmq/lib/pipeline.py", line 558, in send
self._send(destination_queue, message)
File "/opt/intelmq_dev/intelmq/lib/pipeline.py", line 533, in _send
raise exceptions.PipelineError(exc)
intelmq.lib.exceptions.PipelineError: pipeline failed - StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",)
Hi @GKJoker98,
im am using AMQP (RabbitMQ) for our IntelMQ solution. During development I noticed, that the bots losing connection to RabbitMQ after short time. In my comprehension I think it is because of the short heartbeat parameter set in the pipeline.py.
It says, that the connection should timeout after 10 seconds. I think there was a misunderstanding by that (probably the developer thought that the heartbeat is send after every 10 seconds).
It's been four years that I wrote that code (https://github.com/certtools/intelmq/commit/fb3037f140#diff-3961e0dc13579648a826949be69464abb176e3323cd4b85ae10dc58c166b00e0R344) so I don't remember my exact thoughts any more, but your assumption may be correct :)
It would be great, if there is a global parameter, maybe for the Global section in runtime.yaml. That makes sense. OTOH if there's a good default value which works, we don't need an extra parameter.
Did you test which timeout works well?
FYI: The source queue does not timeout because it is continually asking the queue if there is a message (pseudo heartbeat), but the destination queue does. After timeout the bot throw an exception if it tries to send a message to the destination queue (like following Error Log) and reconnect after 15 seconds. So the bot is still working after that, but in my opinion in an unpleasant way.
I don't remember having experienced such errors in our setup, but maybe the load was constantly high enough that the timeout was never reached.