pyamqp stops reading from TCP socket receive buffer
Environment info:
pyamqp version 5.2.0 celery version 5.4.0 kombu version 5.3.7 RabbitMQ broker version 3.10.10
Worker pool type: gevent
Issue:
For the last couple of weeks, my team has been having an issue with our celery RabbitMQ consumers, where there is work to be consumed, but the worker gets stuck sending heartbeats because it suddenly thinks the broker is gone:
DEBUG 2024-06-14 22:04:15,712 connection 7 281472581992672 heartbeat_tick : for connection 3c9339672cd04997a9fa3abdb25de448
DEBUG 2024-06-14 22:04:15,712 connection 7 281472581992672 heartbeat_tick : Prev sent/recv: 12171/36251, now - 12171/36251, monotonic - 542532.68460477, last_heartbeat_sent - 542472.683557858, heartbeat int. - 60 for connection 3c9339672cd04997a9fa3abdb25de448
DEBUG 2024-06-14 22:04:15,712 connection 7 281472581992672 heartbeat_tick: sending heartbeat for connection 3c9339672cd04997a9fa3abdb25de448
DEBUG 2024-06-14 22:05:15,713 connection 7 281472581998272 heartbeat_tick : for connection 3c9339672cd04997a9fa3abdb25de448
DEBUG 2024-06-14 22:05:15,713 connection 7 281472581998272 heartbeat_tick : Prev sent/recv: 12171/36251, now - 12172/36251, monotonic - 542592.686267937, last_heartbeat_sent - 542592.686266149, heartbeat int. - 60 for connection 3c9339672cd04997a9fa3abdb25de448
DEBUG 2024-06-14 22:06:15,714 connection 7 281472581999072 heartbeat_tick : for connection 3c9339672cd04997a9fa3abdb25de448
DEBUG 2024-06-14 22:06:15,715 connection 7 281472581999072 heartbeat_tick : Prev sent/recv: 12172/36251, now - 12172/36251, monotonic - 542652.687340558, last_heartbeat_sent - 542592.686266149, heartbeat int. - 60 for connection 3c9339672cd04997a9fa3abdb25de448
DEBUG 2024-06-14 22:06:15,715 connection 7 281472581999072 heartbeat_tick: sending heartbeat for connection 3c9339672cd04997a9fa3abdb25de448
DEBUG 2024-06-14 22:07:15,715 connection 7 281472581998272 heartbeat_tick : for connection 3c9339672cd04997a9fa3abdb25de448
DEBUG 2024-06-14 22:07:15,716 connection 7 281472581998272 heartbeat_tick : Prev sent/recv: 12172/36251, now - 12173/36251, monotonic - 542712.688349488, last_heartbeat_sent - 542712.688347798, heartbeat int. - 60 for connection 3c9339672cd04997a9fa3abdb25de448
DEBUG 2024-06-14 22:08:15,717 connection 7 281472581999072 heartbeat_tick : for connection 3c9339672cd04997a9fa3abdb25de448
DEBUG 2024-06-14 22:08:15,717 connection 7 281472581999072 heartbeat_tick : Prev sent/recv: 12173/36251, now - 12173/36251, monotonic - 542772.690110966, last_heartbeat_sent - 542712.688347798, heartbeat int. - 60 for connection 3c9339672cd04997a9fa3abdb25de448
The worker will just sit in this state for days on end, not consuming any work, and not terminating or creating a new connection. However, if I check netstat repeatedly to check the status of the amqp TCP sockets on our client machine, I see something like the following over time:
tcp 296 0 worker:36008 ip-10-12-43-16.us:amqps ESTABLISHED
tcp 333 0 worker:36008 ip-10-12-43-16.us:amqps ESTABLISHED
tcp 370 0 worker:36008 ip-10-12-43-16.us:amqps ESTABLISHED
tcp 555 0 worker:36008 ip-10-12-43-16.us:amqps ESTABLISHED
tcp 1517 0 worker:36008 ip-10-12-43-16.us:amqps ESTABLISHED
... etc
If I'm understanding what I'm seeing here, it looks like this is what's happening:
- Kombu detects some socket timeout when trying to ack or consume, and starts the heartbeat check loop in the underlying pyamqp library
- pyamqp sends a heartbeat to the RabbitMQ broker on our TCP socket
- The broker sends a response to the heartbeat back on our TCP connection
- The response ends up in the receive buffer on our client consumer's TCP socket, but pyamqp is not seeing it or reading it, so it keeps sending heartbeat checks to the server, and the receive buffer on the consumer side continually grows with each unread response from the broker
We've tried a variety of celery settings (heartbeat checkrate interval, broker connection pool, etc.) and even tried upgrading our RabbitMQ to 3.12.x, but this issue is still plaguing us. Any help would be greatly appreciated.