reactor-kafka icon indicating copy to clipboard operation
reactor-kafka copied to clipboard

Kafka producer instance throws exception when multiple concurrent calls are made transactionally.

Open burimkrasniqi opened this issue 4 years ago • 9 comments

The same producer kafka instance can not be used at the same time for more than one request. The reason in the producer options we specify the transactionId - ProducerConfig.TRANSACTIONAL_ID_CONFIG

protected Map<String, Object> producerOptions(boolean transactional) { val props = new HashMap<String, Object>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ProducerConfig.ACKS_CONFIG, "all"); if (transactional) { props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transaction-id"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); } props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); if (tClass.equals(String.class)) // props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); else // props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

return props;

}

A work around would be like this: on each request create a new kafka producer with a different transactionId. I tried this work around it does not work properly beside the inefficiency. The problem is that each time we use a new kafka producer once we finish with it we need to close it otherwise it causes memory leak. The problem here is that the function to close a producer like producer.close() is blocking and it makes it impossible to use this as a work around.

The best option would be to not specify this transactionId by us but to be generated by library internally and being able to use the same producer multiple time by multiple requests in parallel.

The error is like this:

org.apache.kafka.common.KafkaException: TransactionalId my-transaction-id: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1078) ~[kafka-clients-2.7.1.jar:na] at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1071) ~[kafka-clients-2.7.1.jar:na] at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:357) ~[kafka-clients-2.7.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:620) ~[kafka-clients-2.7.1.jar:na] at reactor.kafka.sender.internals.DefaultTransactionManager.lambda$null$0(DefaultTransactionManager.java:43) ~[reactor-kafka-1.3.5.jar:1.3.5] at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73) ~[reactor-core-3.4.9.jar:3.4.9] at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32) ~[reactor-core-3.4.9.jar:3.4.9] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:139) ~[reactor-core-3.4.9.jar:3.4.9] at reactor.core.publisher.MonoPublishOn$PublishOnSubscriber.run(MonoPublishOn.java:181) ~[reactor-core-3.4.9.jar:3.4.9] at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.9.jar:3.4.9] at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.9.jar:3.4.9] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na] at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na] at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

burimkrasniqi avatar Oct 12 '21 16:10 burimkrasniqi

In spring-kafka, we cache transactional producers.

There is currently no caching producer factory in reactor-kafka.

You could maintain your own pool of senders with different transactional.ids.

garyrussell avatar Oct 12 '21 16:10 garyrussell

is this issue resolved?

kunallawand5007 avatar Mar 21 '22 18:03 kunallawand5007

It is not; you need to maintain a pool of producers in your code. Only one transaction can be in a process at a time.

garyrussell avatar Mar 21 '22 20:03 garyrussell

Hi Gary ,

Thanks for Help !!

As per your suggestion I have maintain Pool of Kafka Sender .It works well in functional testing but when it comes to load test ,It's started failing with below exception :

org.apache.kafka.common.KafkException: Cannot execute transactional method because we are in error state

Code snippet:

image image image

Due to some security constraints not able to attach code that's why added code snippet , Can you please check what would be the possible root cause for this exception.

Thanks, Kunal Lawand

kunallawand5007 avatar Apr 06 '22 14:04 kunallawand5007

If you get any exceptions (e.g. on beginTransaction(), commitTransaction() or abortTransaction() you must close the producer and remove it from the pool.

garyrussell avatar Apr 06 '22 17:04 garyrussell

In spring-kafka, we cache transactional producers.

There is currently no caching producer factory in reactor-kafka.

You could maintain your own pool of senders with different transactional.ids.

Hi @garyrussell is there any plan to support caching producer factory? Thanks.

qavid avatar Apr 25 '22 07:04 qavid

No immediate plans, but contributions are always welcome.

garyrussell avatar Apr 25 '22 14:04 garyrussell

Hi @garyrussell Does Kafka support multiple on-going transactions at a time? I am expecting that if kafka supports multiple on-going transactions concurrently then consumer should receive messages based on commit order.Is this actually happens?

Manju050 avatar May 16 '24 09:05 Manju050

I am no longer involved with this project.

No; the KafkaProducer does not support multiple concurrent transactions; you need to maintain a pool.

https://github.com/reactor/reactor-kafka/issues/248#issuecomment-941181586

garyrussell avatar May 16 '24 12:05 garyrussell