Dropping `Consumer` when using `enable.auto.commit` and Kafka is disconnected hangs for `session.timeout.ms`
As the title says, when using enable.auto.commit and Kafka is disconnected (e.g. kafka crashed), drop(Consumer) will block for around the same time of session.timeout.ms. When enable.auto.commit is disabled, and I commit offsets synchronously, this behavior doesn't reproduce.
This hangs my application for some time while shutting down.
Reproducer
I have attached a reproducer, taken from the simple_consumer example: simple_consumer.zip. To reproduce the behavior:
- Start Kafka with a topic
my-topic. E.g.:
# Start Kafka container
podman run -p 8082:8082 -p 9092:9092 -p 9101:9101 --rm confluentinc/confluent-local
# Create topic
http localhost:8082/v3/clusters/$(http http://localhost:8082/v3/clusters/ | jq '.data[0].cluster_id' -r)/topics topic_name=my-topic
# Send some records
echo '{"records":[{"key":"jsmith","value":"alarm clock"},{"key":"htanaka","value":"batteries"},{"key":"awalther","value":"bookshelves"}]}' | http POST localhost:8082/topics/my-topic 'Content-Type: application/vnd.kafka.json.v2+json'
- Run the
simple_consumer - Stop/Kill Kafka
- Wait for the consumer to detect that kafka went away
-
CTRL+Con the terminal runningsimple_consumer
You'll observe that it takes roughly the same amount of time of session.timeout.ms to drop the consumer.
Sample logs:
15:00:39.524 (t: main) INFO - simple_consumer - rd_kafka_version: 0x020200ff, 2.2.0
15:00:39.535 (t: main) INFO - simple_consumer - Pre rebalance Assign(TPL {my-topic/0: offset=Invalid metadata="", error=Ok(())})
15:00:39.536 (t: main) INFO - simple_consumer - Post rebalance Assign(TPL {my-topic/0: offset=Invalid metadata="", error=Ok(())})
15:01:06.446 (t: main) INFO - simple_consumer - key: 'Some([34, 106, 115, 109, 105, 116, 104, 34])', payload: '"alarm clock"', topic: my-topic, partition: 0, offset: 3, timestamp: CreateTime(1695646866443)
15:01:06.448 (t: main) INFO - simple_consumer - key: 'Some([34, 104, 116, 97, 110, 97, 107, 97, 34])', payload: '"batteries"', topic: my-topic, partition: 0, offset: 4, timestamp: CreateTime(1695646866444)
15:01:06.448 (t: main) INFO - simple_consumer - key: 'Some([34, 97, 119, 97, 108, 116, 104, 101, 114, 34])', payload: '"bookshelves"', topic: my-topic, partition: 0, offset: 5, timestamp: CreateTime(1695646866444)
15:01:09.526 (t: main) INFO - simple_consumer - Committing offsets: Ok(())
15:01:27.688 (t: unknown) INFO - librdkafka - librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: localhost:9092: Disconnected (after 48159ms in state UP)
15:01:27.688 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): GroupCoordinator: localhost:9092: Disconnected (after 48159ms in state UP)
15:01:27.688 (t: unknown) INFO - librdkafka - librdkafka: FAIL [thrd:localhost:9092/bootstrap]: localhost:9092/1: Disconnected (after 48161ms in state UP)
15:01:27.689 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): localhost:9092/1: Disconnected (after 48161ms in state UP)
15:01:27.689 (t: main) ERROR - rdkafka::client - librdkafka: Global error: AllBrokersDown (Local: All broker connections are down): 2/2 brokers are down
15:01:27.706 (t: unknown) INFO - librdkafka - librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: localhost:9092: Disconnected (after 16ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)
15:01:27.706 (t: unknown) INFO - librdkafka - librdkafka: FAIL [thrd:localhost:9092/bootstrap]: localhost:9092/1: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 16ms in state APIVERSION_QUERY)
15:01:27.706 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): GroupCoordinator: localhost:9092: Disconnected (after 16ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)
15:01:27.706 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): localhost:9092/1: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 16ms in state APIVERSION_QUERY)
15:01:27.782 (t: unknown) ERROR - librdkafka - librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: localhost:9092: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:27.782 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): GroupCoordinator: localhost:9092: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:27.828 (t: unknown) ERROR - librdkafka - librdkafka: FAIL [thrd:localhost:9092/bootstrap]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:27.828 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:28.346 (t: unknown) ERROR - librdkafka - librdkafka: FAIL [thrd:localhost:9092/bootstrap]: localhost:9092/1: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:28.346 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): localhost:9092/1: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:28.362 (t: unknown) ERROR - librdkafka - librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: localhost:9092: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:28.362 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): GroupCoordinator: localhost:9092: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:29.190 (t: unknown) ERROR - librdkafka - librdkafka: FAIL [thrd:localhost:9092/bootstrap]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:29.190 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:29.445 (t: unknown) ERROR - librdkafka - librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: localhost:9092: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:29.445 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): GroupCoordinator: localhost:9092: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
^C15:01:31.481 (t: main) INFO - simple_consumer - Going to drop consumer, time SystemTime { tv_sec: 1695646891, tv_nsec: 481807324 }
15:01:31.481 (t: main) INFO - simple_consumer - Pre rebalance Revoke(TPL {my-topic/0: offset=Invalid metadata="", error=Ok(())})
15:01:31.481 (t: main) INFO - simple_consumer - Post rebalance Revoke(TPL {my-topic/0: offset=Invalid metadata="", error=Ok(())})
15:01:33.226 (t: unknown) ERROR - librdkafka - librdkafka: FAIL [thrd:localhost:9092/bootstrap]: localhost:9092/1: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:33.226 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): localhost:9092/1: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:33.897 (t: unknown) ERROR - librdkafka - librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: localhost:9092: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:33.897 (t: main) ERROR - rdkafka::client - librdkafka: Global error: BrokerTransportFailure (Local: Broker transport failure): GroupCoordinator: localhost:9092: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
15:01:37.523 (t: main) INFO - simple_consumer - Committing offsets: Err(KafkaError (Consumer commit error: WaitingForCoordinator (Local: Waiting for coordinator)))
15:01:37.524 (t: main) INFO - simple_consumer - Dropped consumer, time SystemTime { tv_sec: 1695646897, tv_nsec: 524233308 }
This seems somewhat related to https://github.com/fede1024/rust-rdkafka/issues/509 and https://github.com/fede1024/rust-rdkafka/issues/453 in particular.
Apparently even disabling enable.auto.commit and manually committing offsets blocks the drop(Consumer) operation: https://github.com/fede1024/rust-rdkafka/issues/597#issuecomment-1733920186
I am new to rdkafka, and this issue pops up while I was trying on a minimum example of BaseConsumer.
For test purpose, I want to (1) read from earliest; (2) disable auto commit, and not commiting messages at all.
After a Ctrl-C to terminate the program, I must wait for (session.timeout.ms) seconds to re-run, or the consumer won't work. The code is here: (really simple)
https://pastebin.com/b9iN18Hw