seatunnel
seatunnel copied to clipboard
[Bug] [Kafka] Use EXACTLY_ONCE commit error for kafka sink
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
https://mp.weixin.qq.com/s/DSv76j1riGMNBkR0VWNEhg
kafka clientId duplicate for writer and committer
SeaTunnel Version
dev
SeaTunnel Config
-
Running Command
-
Error Exception
-
Zeta or Flink or Spark Version
No response
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Supplement: Recently,I have also tested zeta, and the corresponding error is as follows. I think this is the zeta error described in the article.(guess)
related fix https://github.com/apache/seatunnel/pull/4469#issuecomment-2372833258, but it not update long time. @fcb-xiaobo hi, can you help fix this issue?
I also looked into it and provided some information, it should be that write and commit both hold the same client.id producer, conflict
I am currently trying to reproduce this issue,and I will try to fix it later
related fix #4469 (comment), but it not update long time. @fcb-xiaobo hi, can you help fix this issue?
@fcb-xiaobo As long as you configure 'semantics = EXACTLY_ONCE' to create this problem, pr referenced by @liunaijie is a good idea but needs some optimization,I briefly tried it
I am currently trying to reproduce this issue,and I will try to fix it later
@fcb-xiaobo Hi, is there any progress here?
I am currently trying to reproduce this issue,and I will try to fix it later
@fcb-xiaobo Hi, is there any progress here?
The local code has been modified, but there are still issues with the e2e test case running on Docker, which I am still working on resolving
at_least_once mode,this problem also occurs
at_least_once mode,this problem also occurs
[903461970382946316] 2024-10-29 10:01:43,713 ERROR [.s.e.s.c.CheckpointCoordinator] [seatunnel-coordinator-service-268] - notify checkpoint completed failed java.util.concurrent.CompletionException: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_202]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_202]
at java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1286) ~[?:1.8.0_202]
at java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1270) ~[?:1.8.0_202]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_202]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_202]
at com.hazelcast.spi.impl.AbstractInvocationFuture.onComplete(AbstractInvocationFuture.java:1243) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1234) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.Invocation.completeExceptionally(Invocation.java:680) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyThrowable(Invocation.java:386) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyError(Invocation.java:330) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandler.notifyErrorResponse(InboundResponseHandler.java:145) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandler.accept(InboundResponseHandler.java:101) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandlerSupplier$ResponseThread.doRun(InboundResponseHandlerSupplier.java:297) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandlerSupplier$ResponseThread.executeRun(InboundResponseHandlerSupplier.java:284) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) ~[seatunnel-starter.jar:2.3.8]
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.
at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$runInternal$0(CheckpointFinishedOperation.java:99) ~[seatunnel-starter.jar:2.3.8]
at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) ~[seatunnel-starter.jar:2.3.8]
at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.runInternal(CheckpointFinishedOperation.java:81) ~[seatunnel-starter.jar:2.3.8]
at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:44) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:471) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:197) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:137) ~[seatunnel-starter.jar:2.3.8]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) ~[seatunnel-starter.jar:2.3.8]
... 1 more
Version 2.3.8, this problem still exists. When I tested doris or clickhouse sink to kafka, I reported this error: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state
2.3.8版本,这个问题还存在,我测试了从doris或者clickhouse sink到kafka时,就会报这个错:org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state
Version 2.3.8, this problem still exists. When I tested doris or clickhouse sink to kafka, I reported this error: org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state
2.3.8版本,这个问题还存在,我测试了从doris或者clickhouse sink到kafka时,就会报这个错:org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state
wait -> https://github.com/apache/seatunnel/pull/7857 @aiyi926 you can pay attention and provide corresponding comments.