librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

rd_kafka_query_watermark_offsets API hang forever

Open hxiaodon opened this issue 5 years ago • 4 comments

Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ

Description

rd_kafka_query_watermark_offsets API will hang forever when the kafka cluster network encounter access restriction(network isolation)

How to reproduce

I could reproduce this problem with latest librdkafka version

  1. launch 2 vm/docker instances(my local os is centos 6). A, B

  2. install confluent-oss at instance A, start kafka with 3 broker services

    • brokerId: 1, port:9093
    • brokerId: 2, port:9094
    • brokerId: 3, port:9095
  3. create a topic "test" for kafka with 3 partitions and replication-factor equal to 1, each broker should have a unique partition Id, assuming the "test" topic is with the following compositions:

    • brokerId: 1, port:9093 partitionId:0
    • brokerId: 2, port:9094 partitionId:1
    • brokerId: 3, port:9095 partitionId:2
  4. at instance B, deploy the test program main.go.zip

  5. enable iptable service at instance A, just reject instance B's accessing for port 9095

  6. Now run test program at instance B(test API QueryWatermarkOffsets), and it will hang(the partitionId 2's broker is alive but is not accessible for instanceB)

    • ./kafkatest -broker=$instanceA_IP:9093 -newAPI=true -topic=test -partitionId=2 -timeout=2000
  7. If we use the OffsetsForTimes API, the program could exit when timeout

    • ./kafkatest -broker=$instanceA_IP:9093 -newAPI=false -topic=test -partitionId=2 -timeout=5000

conclusion: I think the issue could be easily reproduced when a partitionId's leader(broker) is isolated. The infinite looping code is here,

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • [x] librdkafka version (release number or git tag): v0.11.6
  • [x] Apache Kafka version: confluent-oss-5.0.0-2.11
  • [x] librdkafka client configuration: "session.timeout.ms": 10000
  • [x] Operating system: centos 6
  • [ ] Provide logs (with debug=.. as necessary) from librdkafka
  • [ ] Provide broker log excerpts
  • [x] Critical issue

hxiaodon avatar Oct 25 '19 06:10 hxiaodon

Hi, I made some code modification at rdkafka tag v0.11.6. The change is based on rd_kafka_offsets_for_times API's implementation.

diff --git a/src/rdkafka.c b/src/rdkafka.c
index 86d347f8..797ee937 100644
--- a/src/rdkafka.c
+++ b/src/rdkafka.c
@@ -2592,6 +2592,8 @@ rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic,
         struct rd_kafka_partition_leader *leader;
         rd_list_t leaders;
         rd_kafka_resp_err_t err;
+        int tmout;
+        int cnt;
 
         partitions = rd_kafka_topic_partition_list_new(1);
         rktpar = rd_kafka_topic_partition_list_add(partitions,
@@ -2641,10 +2643,12 @@ rd_kafka_query_watermark_offsets (rd_kafka_t *rk, const char *topic,
 
         /* Wait for reply (or timeout) */
         while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
-               rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
-                                rd_kafka_poll_cb, NULL) !=
-               RD_KAFKA_OP_RES_YIELD)
-                ;
+               !rd_timeout_expired((tmout = rd_timeout_remains(ts_end)))){
+                cnt = rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK,
+                                rd_kafka_poll_cb, NULL);
+                if (cnt == RD_KAFKA_OP_RES_YIELD)
+                   break; 
+        }
 
         rd_kafka_q_destroy_owner(rkq);
 

Does it make sense, or it involves other risk? Hope for your reply, Thanks!

hxiaodon avatar Oct 25 '19 07:10 hxiaodon

@edenhill The code change is tiny, could you help to take a look and consider about merging into master branch? Thanks!

hxiaodon avatar Oct 29 '19 02:10 hxiaodon

@edenhill

The same problem is still there in the most recent release v1.7.0. rd_kafka_query_watermark_offsets would hang forever if brokers are down, completely ignoring its timeout_ms parameter. Meanwhile, @hxiaodon solution seems to be good. Would you consider merging it?

b1aafulei avatar Jun 22 '21 09:06 b1aafulei

@edenhill hi, eden. Our team faces the same bug as above, I verified this change on version 1.7.0, it's a work solution. I found that this problem has not been fixed for two years, Maybe you can talk about your concerns? Thanks !

heshengyuan1311 avatar Oct 14 '21 03:10 heshengyuan1311

I've encountered a similar issue, it appears rd_kafka_query_watermark_offsets will block forever if the provided partition argument does not exist (i.e., Some value n which is greater than the partition count for that topic).

nickwb avatar Apr 04 '23 02:04 nickwb