kop
kop copied to clipboard
[BUG] Broker with transactionCoordinatorEnabled property as True has error.
Describe the bug There is an error constantly coming up on the brokers logs Based on the description Transaction Coordinator property must be enabled. Broker config contains the value of property transactionCoordinatorEnabled=true. But error keeps coming.
2022-06-22T21:23:07,078+0000 [pulsar-io-5-4] ERROR io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder - error while handle command:
java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.
at io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.throwIfTransactionCoordinatorDisabled(KafkaRequestHandler.java:2478) ~[ZTCCJ6BUXIIL7zuZcqCOWA/:?]
at io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.getTransactionCoordinator(KafkaRequestHandler.java:275) ~[ZTCCJ6BUXIIL7zuZcqCOWA/:?]
at io.streamnative.pulsar.handlers.kop.KafkaRequestHandler.handleInitProducerId(KafkaRequestHandler.java:1907) ~[ZTCCJ6BUXIIL7zuZcqCOWA/:?]
at io.streamnative.pulsar.handlers.kop.KafkaCommandDecoder.channelRead(KafkaCommandDecoder.java:319) [ZTCCJ6BUXIIL7zuZcqCOWA/:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [io.netty-netty-codec-4.1.72.Final.jar:4.1.72.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [io.netty-netty-codec-4.1.72.Final.jar:4.1.72.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [io.netty-netty-handler-4.1.72.Final.jar:4.1.72.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [io.netty-netty-transport-4.1.72.Final.jar:4.1.72.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) [io.netty-netty-transport-classes-epoll-4.1.72.Final.jar:4.1.72.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) [io.netty-netty-transport-classes-epoll-4.1.72.Final.jar:4.1.72.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) [io.netty-netty-transport-classes-epoll-4.1.72.Final.jar:4.1.72.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.72.Final.jar:4.1.72.Final]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
To Reproduce Steps to reproduce the behavior:
- Start broker with KOP
Expected behavior Do not see this error if we have property as expected.
Screenshots If applicable, add screenshots to help explain your problem.
Additional context Pulsar version 2.9.1 KOP version 2.9.2.17
You need set the following configuration to enable the KoP transaction.
brokerDeduplicationEnabled=true
kafkaTransactionCoordinatorEnabled=true