spring-kafka
spring-kafka copied to clipboard
Perform rollback processing on ProducerFencedException when it is caused by the same thread
Affects Version(s): 2.3.6.RELASE
🎁 Enhancement
Scenario (using transactions and non batch listener) - (point 1-4 in attached log in the bottom):
- Producer (clientId=producer-12) is created as there is partition assignment and transactional operation needs to be performed (ListenerConsumerRebalanceListener )
- ListenerConsumerRebalanceListener fails on: Timeout expired after 30000milliseconds while awaiting InitProducerId
- New producer is created (clientId=producer-13) and it also initialize transaction which calls InitProducerId
- InitProducerId finished for both producers but it looks like there was a race condition as second producer got older epoch then first one: Producer (clientId=producer-12) for epoch 32 and producer (clientId=producer-13) got epoch 31 (see logs)
- Currently used producer is producer-13 but it has old epoch so first transactional operation when processing record will be fenced:
- currently as I am still using 2.3.1.RELEASE - producers are not closed when failing on transactions so it will just fail on every processed records but none of offsets will be commited - until there is restart or rebalance - this is not so bad
- if I switch to 2.3.6 (which has closing fix https://github.com/spring-projects/spring-kafka/pull/1378) - then (I assume) when processing first record producer will be closed, but since rollback processing is not done for ProducerFencedException, record which was processed wont be processed again - another poll wont return it.
Is it possible to find out that producer is fenced by other producer created in the same thread? For this case it would be good to do rollback to process the same records again or to send it to DLT since next poll will be able to fetch next record on partition.
Mar 3, 2020 @ 15:25:56.701 [Producer clientId=producer-12, transactionalId=some-processor.some.topic.event.0] ProducerId set to 18085 with epoch 32
Mar 3, 2020 @ 15:25:56.596 [Producer clientId=producer-12, transactionalId=some-processor.some.topic.event.0] Cluster ID: K3pyMpjRRHqd0goAdvtJ2Q
Mar 3, 2020 @ 15:25:56.202 [Producer clientId=producer-13, transactionalId=some-processor.some.topic.event.0] ProducerId set to 18085 with epoch 31
Mar 3, 2020 @ 15:25:56.097 [Producer clientId=producer-13, transactionalId=some-processor.some.topic.event.0] Cluster ID: K3pyMpjRRHqd0goAdvtJ2Q
Mar 3, 2020 @ 15:25:55.996 [Producer clientId=producer-13, transactionalId=some-processor.some.topic.event.0] ProducerId set to -1 with epoch -1
Mar 3, 2020 @ 15:25:55.994 [Producer clientId=producer-13, transactionalId=some-processor.some.topic.event.0] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
Mar 3, 2020 @ 15:25:55.993 ProducerConfig values:
Mar 3, 2020 @ 15:25:55.993 [Producer clientId=producer-13, transactionalId=some-processor.some.topic.event.0] Instantiated a transactional producer.
Mar 3, 2020 @ 15:25:55.974 [Consumer clientId=some-processor-0, groupId=some-processor] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on partition assignment (org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired after 30000milliseconds while awaiting InitProducerId)
Mar 3, 2020 @ 15:25:25.973 [Producer clientId=producer-12, transactionalId=some-processor.some.topic.event.0] ProducerId set to -1 with epoch -1
Mar 3, 2020 @ 15:25:25.972 [Producer clientId=producer-12, transactionalId=some-processor.some.topic.event.0] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
Mar 3, 2020 @ 15:25:25.971 ProducerConfig values:
Mar 3, 2020 @ 15:25:25.971 [Producer clientId=producer-12, transactionalId=some-processor.some.topic.event.0] Instantiated a transactional producer.
@bartosz-stasikowski-projectdrgn Interesting; as a work around, 2.3.6 also added the AssignmentCommitOption
container property (https://github.com/spring-projects/spring-kafka/pull/1371).
If you set it to NEVER or LATEST_ONLY_NO_TX you can avoid creating a transactional producer during the rebalance.
/**
* Set the assignment commit option. Default {@link AssignmentCommitOption#ALWAYS}.
* In a future release it will default to {@link AssignmentCommitOption#LATEST_ONLY}.
* @param assignmentCommitOption the option.
* @since 2.3.6
*/
public void setAssignmentCommitOption(AssignmentCommitOption assignmentCommitOption) {
Assert.notNull(assignmentCommitOption, "'assignmentCommitOption' cannot be null");
this.assignmentCommitOption = assignmentCommitOption;
}
public enum AssignmentCommitOption {
/**
* Always commit the current offset during partition assignment.
*/
ALWAYS,
/**
* Never commit the current offset during partition assignment.
*/
NEVER,
/**
* Commit the current offset during partition assignment when auto.offset.reset is
* 'latest'; transactional if so configured.
*/
LATEST_ONLY,
/**
* Commit the current offset during partition assignment when auto.offset.reset is
* 'latest'; use consumer commit even when transactions are being used.
*/
LATEST_ONLY_NO_TX
}
I am not sure if we will be able to detect if we fenced ourself.