librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

Transactional producer silently fails to commit a transaction.

Open vncorpacc opened this issue 1 year ago • 2 comments

Description

Versions: in the checklist. We are using confluent kafka python bindings. This is largely a copy-paste from https://github.com/redpanda-data/redpanda/issues/17921. It is not clear to me whether the issue lies with a misbehaving broker or a client library. Would appreciate any insights.

Transactional producer silently fails to commit a batch: commit_transaction returns without exceptions but messages are not committed after an inspection of a topic.

The issue was encountered at least 3 times. First 2 times our producer detected an error at an unexpected scenario: after a certain call to produce() (rd_kafka_produce) that is called some time after a begin_transaction(). produce() returned -172 (wrong state). After the inspection of the state of the topic it was obvious that a call to commit_transaction() that preceded the last begin_transaction() succeeded as we crash on any errors there. But in redpanda messages were not really committed - they did not show up on the redpanda console and when we tried to read those supposedly committed messages programmatically - the request timed out. The producer also got offsets for those uncommitted messages in an on_delivery callback from redpanda meaning that they reached redpanda but were not committed. For the 3rd time the producer did throw on commit_transaction but I still include this here as it seems relevant.

confluent_kafka did not output any logs while this was happening. For 2 times when the producer noticed an error only on a produce call confluent_kafka did not log anything so we only have an error code: cimpl.KafkaException: KafkaError{code=_STATE,val=-172,str="Unable to produce message: Local: Erroneous state"}.

For the 3 time when the producer did fail at commit: Error: _PURGE_QUEUE. Retriable: False. Fatal: False"}

How to reproduce

We couldn't reproduce the issue in any different scenario: it does not fail in our testing environment with the same exact code for the producer. It does however manifests itself reliably in the production environment.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • [x] librdkafka: v2.2.0, python-confluent-kafka: v2.2.0
  • [x] Broker: Redpanda v23.1.17
  • [x] Producer Configuration:
{
               'transactional.id': 'REDACTED',
               'enable.idempotence': True,
               'acks': 'all',
               'max.in.flight.requests.per.connection': 5,
               'retries': 5,
               'retry.backoff.ms': 500,
               'linger.ms': 500,
               'security.protocol': 'SSL',
               'ssl.ca.location': 'REDACTED',
               'ssl.certificate.location': 'REDACTED',
               'ssl.key.location': 'REDACTED'
}
  • [x] CentOS Linux 7 (Core) x64
  • [x] Provided error codes, no logs were printed by the package.
  • [x] Broker log excerpts: https://github.com/redpanda-data/redpanda/issues/17921
  • [x] Critical issue

vncorpacc avatar Apr 22 '24 10:04 vncorpacc

Can you share some code to reproduce this?

emasab avatar Jun 18 '24 12:06 emasab

Unfortunately this is not something even we were able to reproduce reliably. It stopped happening after yet another redpanda restart. I am not sure what code you expect to see. The outline would be:

producer = confluent_kafka.Producer(cfg)
producer.init_transactions()
while not stop_requested():
	msgs = get_msgs_to_send()
	batch_requests_results = []
	producer.begin_transaction()
	for topic, value, partition in msgs:
		batch_requests_results.append(None)
		msg_num = len(batch_requests_results) - 1
		def on_delivery(
			err: confluent_kafka.KafkaError, msg: confluent_kafka.Message
		):
			if err is None or err.code() == confluent_kafka.KafkaError.NO_ERROR:
				batch_requests_results[msg_num] = msg
			else:
				batch_requests_results[msg_num] = err
		producer.produce(topic,value, partition, on_delivery=on_delivery)
	producer.flush()
	offsets_and_metadata = get_offsets_or_raise(batch_requests_results)
	producer.commit_transaction()
	# We got here! And crashed only on a subsequent produce() in the next batch.
	# Those offsets were not reachable afterwards - they were not really committed.
	store_in_db(offsets_and_metadata)

vncorpacc avatar Jun 18 '24 13:06 vncorpacc