kafka-rest icon indicating copy to clipboard operation
kafka-rest copied to clipboard

How to set up Idempotent producer inside of Kafka REST Proxy

Open miguelpais opened this issue 8 years ago • 5 comments

Hi everyone,

I've seen there is a setting called "enabled.idempotent=true" which when enabled on producers will give exactly-once guarantees.

How could this be enabled in the context of a Kafka REST proxy instance communicating with Kafka Brokers?

miguelpais avatar Mar 21 '18 14:03 miguelpais

As far as I understand, for an exactly once producer you have to set two things:

enable.idempotence = true transactional.id = {someId}

You can pass Kafka REST proxy the producer config settings of the java Kafka producer. Sadly, transactional does not work for me. I get an exception that the producer invokes the send method before he initializes a transaction with initTransaction method.

StefanRoesch avatar Apr 12 '18 13:04 StefanRoesch

+1

urgelacko avatar Jun 11 '18 14:06 urgelacko

Hi everyone , Do we have any update on this issue ?

thyagu90 avatar Oct 08 '18 12:10 thyagu90

Hi there. Version: confluentinc/cp-kafka-rest:5.3.1 this is my configuration:

- env:
            - name: KAFKA_REST_HOST_NAME
              value: "kafkarest"
            - name: KAFKA_REST_ENABLE_IDEMPOTENCE
              value: "true"
            - name: KAFKA_REST_ACKS
              value: "-1"
            - name: KAFKA_REST_TRANSACTIONAL_ID
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: KAFKA_REST_LISTENERS
              value: "http://0.0.0.0:8082"
            - name: KAFKA_REST_ACCESS_CONTROL_ALLOW_ORIGN
              value: "*"
            - name: KAFKA_REST_CONSUMER_REQUEST_TIMEOUT_MS
              value: "30000"
            - name: KAFKA_REST_ZOOKEEPER_CONNECT
              value: "zookeeper:2181"
            - name: KAFKA_REST_BOOTSTRAP_SERVERS
              value: "PLAINTEXT://kafka:9092"
            - name: KAFKA_REST_CONSUMER_METADATA_BROKER_LIST
              value: "kafka:9092"
            - name: KAFKA_REST_PRODUCER_METADATA_BROKER_LIST
              value: "kafka:9092"

Don't mind to Kubernetes specific environment variable. After I set up this environment variables, if I try to send some message to kafka-rest, it shows an exception:

This is the full stack trace:

[kafka-rest-0] [2019-11-21 14:50:00,004] ERROR Unhandled exception (io.confluent.rest.exceptions.KafkaExceptionMapper)
[kafka-rest-0] java.lang.IllegalStateException: Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled.
[kafka-rest-0]  at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:362)
[kafka-rest-0]  at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:341)
[kafka-rest-0]  at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:915)
[kafka-rest-0]  at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
[kafka-rest-0]  at io.confluent.kafkarest.NoSchemaRestProducer.produce(NoSchemaRestProducer.java:48)
[kafka-rest-0]  at io.confluent.kafkarest.ProducerPool.produce(ProducerPool.java:170)
[kafka-rest-0]  at io.confluent.kafkarest.resources.TopicsResource.produce(TopicsResource.java:148)
[kafka-rest-0]  at io.confluent.kafkarest.resources.TopicsResource.produceJson(TopicsResource.java:109)
[kafka-rest-0]  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[kafka-rest-0]  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[kafka-rest-0]  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[kafka-rest-0]  at java.lang.reflect.Method.invoke(Method.java:498)
[kafka-rest-0]  at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:76)
[kafka-rest-0]  at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:148)     [kafka-rest-0]  at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:191)
[kafka-rest-0]  at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:183)
[kafka-rest-0]  at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:103)
[kafka-rest-0]  at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:493)
[kafka-rest-0]  at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:415)
[kafka-rest-0]  at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:104)
[kafka-rest-0]  at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:277)
[kafka-rest-0]  at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)
[kafka-rest-0]  at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)
[kafka-rest-0]  at org.glassfish.jersey.internal.Errors.process(Errors.java:316)
[kafka-rest-0]  at org.glassfish.jersey.internal.Errors.process(Errors.java:298)
[kafka-rest-0]  at org.glassfish.jersey.internal.Errors.process(Errors.java:268)
[kafka-rest-0]  at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:289)
[kafka-rest-0]  at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:256)
[kafka-rest-0]  at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:703)
[kafka-rest-0]  at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:416)
[kafka-rest-0]  at org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:409)
[kafka-rest-0]  at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:584)
[kafka-rest-0]  at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:525)
[kafka-rest-0]  at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:462)
[kafka-rest-0]  at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
[kafka-rest-0]  at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:540)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
[kafka-rest-0]  at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1700)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1345)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
[kafka-rest-0]  at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480)
[kafka-rest-0]  at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1667)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:152)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:174)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:753)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
[kafka-rest-0]  at org.eclipse.jetty.server.Server.handle(Server.java:505)
[kafka-rest-0]  at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
[kafka-rest-0]  at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
[kafka-rest-0]  at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
[kafka-rest-0]  at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
[kafka-rest-0]  at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
[kafka-rest-0]  at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
[kafka-rest-0]  at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
[kafka-rest-0]  at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
[kafka-rest-0]  at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
[kafka-rest-0]  at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
[kafka-rest-0]  at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:698)
[kafka-rest-0]  at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:804)
[kafka-rest-0]  at java.lang.Thread.run(Thread.java:748)
[kafka-rest-0] [2019-11-21 14:50:00,005] ERROR Unhandled exception resulting in internal server error response (io.confluent.rest.exceptions.GenericExceptionMapper)
[kafka-rest-0] java.lang.IllegalStateException: Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled.
[kafka-rest-0]  at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:362)
[kafka-rest-0]  at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:341)
[kafka-rest-0]  at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:915)
[kafka-rest-0]  at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
[kafka-rest-0]  at io.confluent.kafkarest.NoSchemaRestProducer.produce(NoSchemaRestProducer.java:48)
[kafka-rest-0]  at io.confluent.kafkarest.ProducerPool.produce(ProducerPool.java:170)
[kafka-rest-0]  at io.confluent.kafkarest.resources.TopicsResource.produce(TopicsResource.java:148)
[kafka-rest-0]  at io.confluent.kafkarest.resources.TopicsResource.produceJson(TopicsResource.java:109)
[kafka-rest-0]  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[kafka-rest-0]  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[kafka-rest-0]  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[kafka-rest-0]  at java.lang.reflect.Method.invoke(Method.java:498)
[kafka-rest-0]  at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:76)
[kafka-rest-0]  at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:148)     [kafka-rest-0]  at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:191)
[kafka-rest-0]  at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:183)
[kafka-rest-0]  at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:103)
[kafka-rest-0]  at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:493)
[kafka-rest-0]  at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:415)
[kafka-rest-0]  at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:104)
[kafka-rest-0]  at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:277)
[kafka-rest-0]  at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)
[kafka-rest-0]  at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)
[kafka-rest-0]  at org.glassfish.jersey.internal.Errors.process(Errors.java:316)
[kafka-rest-0]  at org.glassfish.jersey.internal.Errors.process(Errors.java:298)
[kafka-rest-0]  at org.glassfish.jersey.internal.Errors.process(Errors.java:268)
[kafka-rest-0]  at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:289)
[kafka-rest-0]  at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:256)
[kafka-rest-0]  at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:703)
[kafka-rest-0]  at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:416)
[kafka-rest-0]  at org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:409)
[kafka-rest-0]  at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:584)
[kafka-rest-0]  at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:525)
[kafka-rest-0]  at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:462)
[kafka-rest-0]  at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
[kafka-rest-0]  at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:540)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
[kafka-rest-0]  at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1700)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1345)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
[kafka-rest-0]  at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480)
[kafka-rest-0]  at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1667)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:152)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:174)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:753)
[kafka-rest-0]  at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
[kafka-rest-0]  at org.eclipse.jetty.server.Server.handle(Server.java:505)
[kafka-rest-0]  at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
[kafka-rest-0]  at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
[kafka-rest-0]  at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
[kafka-rest-0]  at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
[kafka-rest-0]  at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
[kafka-rest-0]  at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
[kafka-rest-0]  at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
[kafka-rest-0]  at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
[kafka-rest-0]  at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
[kafka-rest-0]  at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
[kafka-rest-0]  at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:698)
[kafka-rest-0]  at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:804)
[kafka-rest-0]  at java.lang.Thread.run(Thread.java:748)

Any suggestion?

justpolidor avatar Nov 21 '19 14:11 justpolidor

I am using 6.1.1 and it works:

version: '3.8'
services:
  kafka-rest-proxy:
    image: confluentinc/cp-kafka-rest:6.1.1
    ports:
      - 8082:8082
    hostname: kafka-rest-proxy
    container_name: kafka-rest-proxy.local
    environment:
      KAFKA_REST_HOST_NAME: kafka-rest-proxy.local
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_BOOTSTRAP_SERVERS: kafka1.local:9092
      KAFKA_REST_PRODUCER_THREADS: 5
      KAFKA_REST_PRODUCER_BATCH_SIZE: 0
      KAFKA_REST_PRODUCER_RETRIES: 1
      KAFKA_REST_PRODUCER_LINGER_MS: 0
      KAFKA_REST_PRODUCER_ENABLE_IDEMPOTENCE: true

When sending the first POST request, it creates a producer with the following config:

[2022-04-12 14:03:52,509] INFO ProducerConfig values: 
	acks = -1
	batch.size = 0
	bootstrap.servers = [kafka1.local:9092]
	buffer.memory = 33554432
	client.dns.lookup = use_all_dns_ips
	client.id = producer-5
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = true
[...]

And successfully stores the message on the Kafka topic.

siom79 avatar Apr 12 '22 14:04 siom79

For a modern Kafka producer, the default is to enable idempotence. The REST Proxy does not support transactions.

AndrewJSchofield avatar Mar 23 '23 17:03 AndrewJSchofield