Transactional producer silently fails to commit a transaction.
Description
Versions: in the checklist. We are using confluent kafka python bindings. This is largely a copy-paste from https://github.com/redpanda-data/redpanda/issues/17921. It is not clear to me whether the issue lies with a misbehaving broker or a client library. Would appreciate any insights.
Transactional producer silently fails to commit a batch: commit_transaction returns without exceptions but messages are not committed after an inspection of a topic.
The issue was encountered at least 3 times.
First 2 times our producer detected an error at an unexpected scenario: after a certain call to produce() (rd_kafka_produce) that is called some time after a begin_transaction(). produce() returned -172 (wrong state).
After the inspection of the state of the topic it was obvious that a call to commit_transaction() that preceded the last begin_transaction() succeeded as we crash on any errors there. But in redpanda messages were not really committed - they did not show up on the redpanda console and when we tried to read those supposedly committed messages programmatically - the request timed out. The producer also got offsets for those uncommitted messages in an on_delivery callback from redpanda meaning that they reached redpanda but were not committed.
For the 3rd time the producer did throw on commit_transaction but I still include this here as it seems relevant.
confluent_kafka did not output any logs while this was happening.
For 2 times when the producer noticed an error only on a produce call confluent_kafka did not log anything so we only have an error code: cimpl.KafkaException: KafkaError{code=_STATE,val=-172,str="Unable to produce message: Local: Erroneous state"}.
For the 3 time when the producer did fail at commit:
Error: _PURGE_QUEUE. Retriable: False. Fatal: False"}
How to reproduce
We couldn't reproduce the issue in any different scenario: it does not fail in our testing environment with the same exact code for the producer. It does however manifests itself reliably in the production environment.
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
- [x] librdkafka: v2.2.0, python-confluent-kafka: v2.2.0
- [x] Broker: Redpanda v23.1.17
- [x] Producer Configuration:
{
'transactional.id': 'REDACTED',
'enable.idempotence': True,
'acks': 'all',
'max.in.flight.requests.per.connection': 5,
'retries': 5,
'retry.backoff.ms': 500,
'linger.ms': 500,
'security.protocol': 'SSL',
'ssl.ca.location': 'REDACTED',
'ssl.certificate.location': 'REDACTED',
'ssl.key.location': 'REDACTED'
}
- [x] CentOS Linux 7 (Core) x64
- [x] Provided error codes, no logs were printed by the package.
- [x] Broker log excerpts: https://github.com/redpanda-data/redpanda/issues/17921
- [x] Critical issue
Can you share some code to reproduce this?
Unfortunately this is not something even we were able to reproduce reliably. It stopped happening after yet another redpanda restart. I am not sure what code you expect to see. The outline would be:
producer = confluent_kafka.Producer(cfg)
producer.init_transactions()
while not stop_requested():
msgs = get_msgs_to_send()
batch_requests_results = []
producer.begin_transaction()
for topic, value, partition in msgs:
batch_requests_results.append(None)
msg_num = len(batch_requests_results) - 1
def on_delivery(
err: confluent_kafka.KafkaError, msg: confluent_kafka.Message
):
if err is None or err.code() == confluent_kafka.KafkaError.NO_ERROR:
batch_requests_results[msg_num] = msg
else:
batch_requests_results[msg_num] = err
producer.produce(topic,value, partition, on_delivery=on_delivery)
producer.flush()
offsets_and_metadata = get_offsets_or_raise(batch_requests_results)
producer.commit_transaction()
# We got here! And crashed only on a subsequent produce() in the next batch.
# Those offsets were not reachable afterwards - they were not really committed.
store_in_db(offsets_and_metadata)