How to set up Idempotent producer inside of Kafka REST Proxy
Hi everyone,
I've seen there is a setting called "enabled.idempotent=true" which when enabled on producers will give exactly-once guarantees.
How could this be enabled in the context of a Kafka REST proxy instance communicating with Kafka Brokers?
As far as I understand, for an exactly once producer you have to set two things:
enable.idempotence = true transactional.id = {someId}
You can pass Kafka REST proxy the producer config settings of the java Kafka producer. Sadly, transactional does not work for me. I get an exception that the producer invokes the send method before he initializes a transaction with initTransaction method.
+1
Hi everyone , Do we have any update on this issue ?
Hi there. Version: confluentinc/cp-kafka-rest:5.3.1 this is my configuration:
- env:
- name: KAFKA_REST_HOST_NAME
value: "kafkarest"
- name: KAFKA_REST_ENABLE_IDEMPOTENCE
value: "true"
- name: KAFKA_REST_ACKS
value: "-1"
- name: KAFKA_REST_TRANSACTIONAL_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: KAFKA_REST_LISTENERS
value: "http://0.0.0.0:8082"
- name: KAFKA_REST_ACCESS_CONTROL_ALLOW_ORIGN
value: "*"
- name: KAFKA_REST_CONSUMER_REQUEST_TIMEOUT_MS
value: "30000"
- name: KAFKA_REST_ZOOKEEPER_CONNECT
value: "zookeeper:2181"
- name: KAFKA_REST_BOOTSTRAP_SERVERS
value: "PLAINTEXT://kafka:9092"
- name: KAFKA_REST_CONSUMER_METADATA_BROKER_LIST
value: "kafka:9092"
- name: KAFKA_REST_PRODUCER_METADATA_BROKER_LIST
value: "kafka:9092"
Don't mind to Kubernetes specific environment variable. After I set up this environment variables, if I try to send some message to kafka-rest, it shows an exception:
This is the full stack trace:
[kafka-rest-0] [2019-11-21 14:50:00,004] ERROR Unhandled exception (io.confluent.rest.exceptions.KafkaExceptionMapper)
[kafka-rest-0] java.lang.IllegalStateException: Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled.
[kafka-rest-0] at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:362)
[kafka-rest-0] at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:341)
[kafka-rest-0] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:915)
[kafka-rest-0] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
[kafka-rest-0] at io.confluent.kafkarest.NoSchemaRestProducer.produce(NoSchemaRestProducer.java:48)
[kafka-rest-0] at io.confluent.kafkarest.ProducerPool.produce(ProducerPool.java:170)
[kafka-rest-0] at io.confluent.kafkarest.resources.TopicsResource.produce(TopicsResource.java:148)
[kafka-rest-0] at io.confluent.kafkarest.resources.TopicsResource.produceJson(TopicsResource.java:109)
[kafka-rest-0] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[kafka-rest-0] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[kafka-rest-0] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[kafka-rest-0] at java.lang.reflect.Method.invoke(Method.java:498)
[kafka-rest-0] at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:76)
[kafka-rest-0] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:148) [kafka-rest-0] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:191)
[kafka-rest-0] at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:183)
[kafka-rest-0] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:103)
[kafka-rest-0] at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:493)
[kafka-rest-0] at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:415)
[kafka-rest-0] at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:104)
[kafka-rest-0] at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:277)
[kafka-rest-0] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)
[kafka-rest-0] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)
[kafka-rest-0] at org.glassfish.jersey.internal.Errors.process(Errors.java:316)
[kafka-rest-0] at org.glassfish.jersey.internal.Errors.process(Errors.java:298)
[kafka-rest-0] at org.glassfish.jersey.internal.Errors.process(Errors.java:268)
[kafka-rest-0] at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:289)
[kafka-rest-0] at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:256)
[kafka-rest-0] at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:703)
[kafka-rest-0] at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:416)
[kafka-rest-0] at org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:409)
[kafka-rest-0] at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:584)
[kafka-rest-0] at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:525)
[kafka-rest-0] at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:462)
[kafka-rest-0] at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
[kafka-rest-0] at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:540)
[kafka-rest-0] at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
[kafka-rest-0] at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1700)
[kafka-rest-0] at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
[kafka-rest-0] at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1345)
[kafka-rest-0] at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
[kafka-rest-0] at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480)
[kafka-rest-0] at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1667)
[kafka-rest-0] at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
[kafka-rest-0] at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247)
[kafka-rest-0] at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
[kafka-rest-0] at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:152)
[kafka-rest-0] at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:174)
[kafka-rest-0] at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220)
[kafka-rest-0] at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:753)
[kafka-rest-0] at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
[kafka-rest-0] at org.eclipse.jetty.server.Server.handle(Server.java:505)
[kafka-rest-0] at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
[kafka-rest-0] at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
[kafka-rest-0] at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
[kafka-rest-0] at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
[kafka-rest-0] at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
[kafka-rest-0] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
[kafka-rest-0] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
[kafka-rest-0] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
[kafka-rest-0] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
[kafka-rest-0] at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
[kafka-rest-0] at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:698)
[kafka-rest-0] at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:804)
[kafka-rest-0] at java.lang.Thread.run(Thread.java:748)
[kafka-rest-0] [2019-11-21 14:50:00,005] ERROR Unhandled exception resulting in internal server error response (io.confluent.rest.exceptions.GenericExceptionMapper)
[kafka-rest-0] java.lang.IllegalStateException: Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled.
[kafka-rest-0] at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:362)
[kafka-rest-0] at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:341)
[kafka-rest-0] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:915)
[kafka-rest-0] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
[kafka-rest-0] at io.confluent.kafkarest.NoSchemaRestProducer.produce(NoSchemaRestProducer.java:48)
[kafka-rest-0] at io.confluent.kafkarest.ProducerPool.produce(ProducerPool.java:170)
[kafka-rest-0] at io.confluent.kafkarest.resources.TopicsResource.produce(TopicsResource.java:148)
[kafka-rest-0] at io.confluent.kafkarest.resources.TopicsResource.produceJson(TopicsResource.java:109)
[kafka-rest-0] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[kafka-rest-0] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[kafka-rest-0] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[kafka-rest-0] at java.lang.reflect.Method.invoke(Method.java:498)
[kafka-rest-0] at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:76)
[kafka-rest-0] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:148) [kafka-rest-0] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:191)
[kafka-rest-0] at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:183)
[kafka-rest-0] at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:103)
[kafka-rest-0] at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:493)
[kafka-rest-0] at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:415)
[kafka-rest-0] at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:104)
[kafka-rest-0] at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:277)
[kafka-rest-0] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)
[kafka-rest-0] at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)
[kafka-rest-0] at org.glassfish.jersey.internal.Errors.process(Errors.java:316)
[kafka-rest-0] at org.glassfish.jersey.internal.Errors.process(Errors.java:298)
[kafka-rest-0] at org.glassfish.jersey.internal.Errors.process(Errors.java:268)
[kafka-rest-0] at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:289)
[kafka-rest-0] at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:256)
[kafka-rest-0] at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:703)
[kafka-rest-0] at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:416)
[kafka-rest-0] at org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:409)
[kafka-rest-0] at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:584)
[kafka-rest-0] at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:525)
[kafka-rest-0] at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:462)
[kafka-rest-0] at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
[kafka-rest-0] at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:540)
[kafka-rest-0] at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
[kafka-rest-0] at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1700)
[kafka-rest-0] at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
[kafka-rest-0] at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1345)
[kafka-rest-0] at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
[kafka-rest-0] at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480)
[kafka-rest-0] at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1667)
[kafka-rest-0] at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
[kafka-rest-0] at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247)
[kafka-rest-0] at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
[kafka-rest-0] at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:152)
[kafka-rest-0] at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:174)
[kafka-rest-0] at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220)
[kafka-rest-0] at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:753)
[kafka-rest-0] at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
[kafka-rest-0] at org.eclipse.jetty.server.Server.handle(Server.java:505)
[kafka-rest-0] at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
[kafka-rest-0] at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
[kafka-rest-0] at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
[kafka-rest-0] at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
[kafka-rest-0] at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
[kafka-rest-0] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
[kafka-rest-0] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
[kafka-rest-0] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
[kafka-rest-0] at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
[kafka-rest-0] at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
[kafka-rest-0] at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:698)
[kafka-rest-0] at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:804)
[kafka-rest-0] at java.lang.Thread.run(Thread.java:748)
Any suggestion?
I am using 6.1.1 and it works:
version: '3.8'
services:
kafka-rest-proxy:
image: confluentinc/cp-kafka-rest:6.1.1
ports:
- 8082:8082
hostname: kafka-rest-proxy
container_name: kafka-rest-proxy.local
environment:
KAFKA_REST_HOST_NAME: kafka-rest-proxy.local
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_BOOTSTRAP_SERVERS: kafka1.local:9092
KAFKA_REST_PRODUCER_THREADS: 5
KAFKA_REST_PRODUCER_BATCH_SIZE: 0
KAFKA_REST_PRODUCER_RETRIES: 1
KAFKA_REST_PRODUCER_LINGER_MS: 0
KAFKA_REST_PRODUCER_ENABLE_IDEMPOTENCE: true
When sending the first POST request, it creates a producer with the following config:
[2022-04-12 14:03:52,509] INFO ProducerConfig values:
acks = -1
batch.size = 0
bootstrap.servers = [kafka1.local:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-5
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
[...]
And successfully stores the message on the Kafka topic.
For a modern Kafka producer, the default is to enable idempotence. The REST Proxy does not support transactions.