librdkafka
librdkafka copied to clipboard
rd_kafka_query_watermark_offsets API hang forever
Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ
Description
rd_kafka_query_watermark_offsets API will hang forever when the kafka cluster network encounter access restriction(network isolation)
How to reproduce
I could reproduce this problem with latest librdkafka version
-
launch 2 vm/docker instances(my local os is centos 6). A, B
-
install confluent-oss at instance A, start kafka with 3 broker services
- brokerId: 1, port:9093
- brokerId: 2, port:9094
- brokerId: 3, port:9095
-
create a topic "test" for kafka with 3 partitions and replication-factor equal to 1, each broker should have a unique partition Id, assuming the "test" topic is with the following compositions:
- brokerId: 1, port:9093 partitionId:0
- brokerId: 2, port:9094 partitionId:1
- brokerId: 3, port:9095 partitionId:2
-
at instance B, deploy the test program main.go.zip
-
enable iptable service at instance A, just reject instance B's accessing for port 9095
-
Now run test program at instance B(test API QueryWatermarkOffsets), and it will hang(the partitionId 2's broker is alive but is not accessible for instanceB)
- ./kafkatest -broker=$instanceA_IP:9093 -newAPI=true -topic=test -partitionId=2 -timeout=2000
-
If we use the OffsetsForTimes API, the program could exit when timeout
- ./kafkatest -broker=$instanceA_IP:9093 -newAPI=false -topic=test -partitionId=2 -timeout=5000
conclusion: I think the issue could be easily reproduced when a partitionId's leader(broker) is isolated. The infinite looping code is here,
IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
- [x] librdkafka version (release number or git tag):
v0.11.6
- [x] Apache Kafka version:
confluent-oss-5.0.0-2.11
- [x] librdkafka client configuration:
"session.timeout.ms": 10000
- [x] Operating system:
centos 6
- [ ] Provide logs (with
debug=..
as necessary) from librdkafka - [ ] Provide broker log excerpts
- [x] Critical issue
Hi, I made some code modification at rdkafka tag v0.11.6. The change is based on rd_kafka_offsets_for_times API's implementation.
diff --git a/src/rdkafka.c b/src/rdkafka.c
index 86d347f8..797ee937 100644
--- a/src/rdkafka.c
+++ b/src/rdkafka.c
@@ -2592,6 +2592,8 @@ rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic,
struct rd_kafka_partition_leader *leader;
rd_list_t leaders;
rd_kafka_resp_err_t err;
+ int tmout;
+ int cnt;
partitions = rd_kafka_topic_partition_list_new(1);
rktpar = rd_kafka_topic_partition_list_add(partitions,
@@ -2641,10 +2643,12 @@ rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic,
/* Wait for reply (or timeout) */
while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
- rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
- rd_kafka_poll_cb, NULL) !=
- RD_KAFKA_OP_RES_YIELD)
- ;
+ !rd_timeout_expired((tmout = rd_timeout_remains(ts_end)))){
+ cnt = rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK,
+ rd_kafka_poll_cb, NULL);
+ if (cnt == RD_KAFKA_OP_RES_YIELD)
+ break;
+ }
rd_kafka_q_destroy_owner(rkq);
Does it make sense, or it involves other risk? Hope for your reply, Thanks!
@edenhill The code change is tiny, could you help to take a look and consider about merging into master branch? Thanks!
@edenhill
The same problem is still there in the most recent release v1.7.0. rd_kafka_query_watermark_offsets
would hang forever if brokers are down, completely ignoring its timeout_ms
parameter. Meanwhile, @hxiaodon solution seems to be good. Would you consider merging it?
@edenhill hi, eden. Our team faces the same bug as above, I verified this change on version 1.7.0, it's a work solution. I found that this problem has not been fixed for two years, Maybe you can talk about your concerns? Thanks !
I've encountered a similar issue, it appears rd_kafka_query_watermark_offsets
will block forever if the provided partition
argument does not exist (i.e., Some value n which is greater than the partition count for that topic).