librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

rd_kafka_commit_transaction failed, but messages were delivered to topics

Open tkolomiets opened this issue 1 year ago • 1 comments

Description

Pseudo c++ code

rd_kafka_conf_set(conf, "bootstrap.servers", "xxx.xxx.xxx.01:9092,xxx.xxx.xxx.02:9092,xxx.xxx.xxx.03:9092,xxx.xxx.xxx.04:9092,xxx.xxx.xxx.05:9092");
rd_kafka_conf_set(conf, "transactional.id", "librdkafka_transactions_example");
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
rd_kafka_init_transactions(producer, -1);

// q1: 3 partitions + 5 replicas
auto t1 = rd_kafka_topic_new(producer, "q1", nullptr);
// q2: 3 partitions + 5 replicas
auto t2 = rd_kafka_topic_new(producer, "q2", nullptr);
// q3: 3 partitions + 1 replicas
auto t3 = rd_kafka_topic_new(producer, "q3", nullptr);

rd_kafka_begin_transaction(producer);

std::string message = "hello";
auto res1 = rd_kafka_produce(t1, 0, RD_KAFKA_MSG_F_COPY, message.data(), message.length(), nullptr, 0, nullptr);
auto res2 = rd_kafka_produce(t2, 0, RD_KAFKA_MSG_F_COPY, message.data(), message.length(), nullptr, 0, nullptr);
auto res3 = rd_kafka_produce(t3, 0, RD_KAFKA_MSG_F_COPY, message.data(), message.length(), nullptr, 0, nullptr);
// res1 = 0, res2 = 0, res3 = 0

error = rd_kafka_commit_transaction(producer, 1000);
// error is not null
rd_kafka_error_txn_requires_abort(error) == false
// error log is: 
%3|1702461737.738|TXNERR|rdkafka#producer-1| [thrd:xxx.xxx.xxx.03:9092/bootstrap]: Current transaction failed in state BeginCommit: 1 message(s) timed out on q3 [0] (_TIMED_OUT, requires epoch bump)
FATAL ERROR: Failed to commit transaction: _TIMED_OUT: Failed to flush all outstanding messages within the API timeout: 1 message(s) remaining

After failed call of rd_kafka_commit_transaction q1 and q2 contain "hello" message, but q3 - not. I think it is mistake.. Failed commit, even retriable, should not to lead to message delivering to topics.

How to reproduce

Create 3 topics. First and second with 3 partitions and 5 replicas. Thirth - with 3 partitions and 1 replica. Try to run above code.

Checklist

  • [x] librdkafka version v2.3.0 (static libs librdkafka.a, librdkafka++.a)
  • [x] Apache Kafka version: kafka_2.11-0.11.0.1
  • [x] librdkafka client configuration: see pseudocode
  • [x] Operating system: Debian GNU/Linux 10 (buster)
  • [x] Provide logs: see pseudocode
  • [x] Provide broker log excerpts: see pseudocode

tkolomiets avatar Dec 13 '23 10:12 tkolomiets

Failed commits should be retried manually if rd_kafka_error_is_retriable(error) is true. Otherwise the commit won't be retried automatically.

emasab avatar Mar 05 '24 13:03 emasab