intelmq icon indicating copy to clipboard operation
intelmq copied to clipboard

RabbitMQ Peer-Connection

Open GKJoker98 opened this issue 3 years ago • 1 comments

Hello folks,

im am using AMQP (RabbitMQ) for our IntelMQ solution. During development I noticed, that the bots losing connection to RabbitMQ after short time. In my comprehension I think it is because of the short heartbeat parameter set in the pipeline.py.

It says, that the connection should timeout after 10 seconds. I think there was a misunderstanding by that (probably the developer thought that the heartbeat is send after every 10 seconds).

In pipeline.py:

        if pika_version < (0, 11): 
            self.kwargs['heartbeat_interval'] = 10 
        else:
            self.kwargs['heartbeat'] = 10

It would be great, if there is a global parameter, maybe for the Global section in runtime.yaml.

FYI: The source queue does not timeout because it is continually asking the queue if there is a message (pseudo heartbeat), but the destination queue does. After timeout the bot throw an exception if it tries to send a message to the destination queue (like following Error Log) and reconnect after 15 seconds. So the bot is still working after that, but in my opinion in an unpleasant way.

Error Log:

pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/intelmq_dev/intelmq/lib/bot.py", line 321, in start
    self.process()
  File "/opt/intelmq_dev/intelmq/bots/experts/filter/expert.py", line 142, in process
    self.send_message(event)
  File "/opt/intelmq_dev/intelmq/lib/bot.py", line 633, in send_message
    self.__destination_pipeline.send(raw_message, path=path,
  File "/opt/intelmq_dev/intelmq/lib/pipeline.py", line 558, in send
    self._send(destination_queue, message)
  File "/opt/intelmq_dev/intelmq/lib/pipeline.py", line 533, in _send
    raise exceptions.PipelineError(exc)
intelmq.lib.exceptions.PipelineError: pipeline failed - StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",)
2022-06-29 13:33:23,893 - Filter-Expert - INFO - Bot will continue in 15 seconds.
2022-06-29 13:35:07,908 - Filter-Expert - ERROR - Pipeline failed.
Traceback (most recent call last):
  File "/opt/intelmq_dev/intelmq/lib/pipeline.py", line 519, in _send
    retval = self.channel.basic_publish(exchange=self.exchange,
  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/blocking_connection.py", line 2211, in basic_publish
    self._flush_output(self._message_confirmation_result.is_ready)
  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/blocking_connection.py", line 1335, in _flush_output
    self._connection._flush_output(lambda: self.is_closed, *waiters)
  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output
    raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/intelmq_dev/intelmq/lib/bot.py", line 321, in start
    self.process()
  File "/opt/intelmq_dev/intelmq/bots/experts/filter/expert.py", line 142, in process
    self.send_message(event)
  File "/opt/intelmq_dev/intelmq/lib/bot.py", line 633, in send_message
    self.__destination_pipeline.send(raw_message, path=path,
  File "/opt/intelmq_dev/intelmq/lib/pipeline.py", line 558, in send
    self._send(destination_queue, message)
  File "/opt/intelmq_dev/intelmq/lib/pipeline.py", line 533, in _send
    raise exceptions.PipelineError(exc)
intelmq.lib.exceptions.PipelineError: pipeline failed - StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",)

GKJoker98 avatar Jul 05 '22 08:07 GKJoker98

Hi @GKJoker98,

im am using AMQP (RabbitMQ) for our IntelMQ solution. During development I noticed, that the bots losing connection to RabbitMQ after short time. In my comprehension I think it is because of the short heartbeat parameter set in the pipeline.py.

It says, that the connection should timeout after 10 seconds. I think there was a misunderstanding by that (probably the developer thought that the heartbeat is send after every 10 seconds).

It's been four years that I wrote that code (https://github.com/certtools/intelmq/commit/fb3037f140#diff-3961e0dc13579648a826949be69464abb176e3323cd4b85ae10dc58c166b00e0R344) so I don't remember my exact thoughts any more, but your assumption may be correct :)

It would be great, if there is a global parameter, maybe for the Global section in runtime.yaml. That makes sense. OTOH if there's a good default value which works, we don't need an extra parameter.

Did you test which timeout works well?

FYI: The source queue does not timeout because it is continually asking the queue if there is a message (pseudo heartbeat), but the destination queue does. After timeout the bot throw an exception if it tries to send a message to the destination queue (like following Error Log) and reconnect after 15 seconds. So the bot is still working after that, but in my opinion in an unpleasant way.

I don't remember having experienced such errors in our setup, but maybe the load was constantly high enough that the timeout was never reached.

sebix avatar Jul 05 '22 19:07 sebix