seatunnel
seatunnel copied to clipboard
[Bug] [connector-kafka] submit job kafka to kafka failed
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
Error reported after running the task for a period of time,i use seatunnnel web to recover job then exception appears,job recover failed
SeaTunnel Version
2.3.4
SeaTunnel Config
env {
parallelism = 1
job.mode = "STREAMING"
}
source {
Kafka {
topic = "xxx"
bootstrap.servers = "127.0.0.1:9092"
consumer.group = "xxx"
kafka.config = {
security.protocol = SSL
ssl.disableTLSHostnameVerification = true
ssl.certificate.location = /xxx/conf/kafka.client.pem
ssl.ca.location = /xxx/conf/ca-cert
ssl.key.password = xxx
ssl.key.location = /xxx/conf/kafka.client.key
ssl.keystore.location = /xxx/conf/kafka.client.keystore.jks
ssl.truststore.password = xxx
ssl.truststore.location = /xxx/conf/kafka.client.truststore.jks
}
}
}
sink {
kafka {
topic = "xxx"
bootstrap.servers = "127.0.0.1:9092"
format = json
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config = {
acks = "all"
request.timeout.ms = 60000
buffer.memory = 33554432
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=GSSAPI
java.security.krb5.conf="/xxx/krb5.conf"
sasl.jaas.config="com.sun.security.auth.module.Krb5LoginModule required \n useKeyTab=true \n storeKey=true \n keyTab=\"/xxx.keytab\" \n principal=\"xxxCOM\";"
java.security.auth.login.config="/xxx/config/kafka_client_jaas.conf"
}
}
}
Running Command
use seatunnnel web to submit
Error Exception
2024-03-28 16:06:42,809 WARN [o.a.k.c.u.AppInfoParser ] [hz.main.seaTunnel.task.thread-7359] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-1
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[?:1.8.0_352]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[?:1.8.0_352]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[?:1.8.0_352]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[?:1.8.0_352]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[?:1.8.0_352]
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:1.8.0_352]
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) ~[?:?]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:433) ~[?:?]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289) ~[?:?]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316) ~[?:?]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaNoTransactionSender.<init>(KafkaNoTransactionSender.java:43) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.<init>(KafkaSinkWriter.java:107) ~[?:?]
at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSink.createWriter(KafkaSink.java:56) ~[?:?]
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:297) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$16(SeaTunnelTask.java:425) ~[seatunnel-starter.jar:2.3.4]
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ~[?:1.8.0_352]
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_352]
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) ~[?:1.8.0_352]
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) ~[?:1.8.0_352]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_352]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_352]
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) ~[?:1.8.0_352]
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) ~[?:1.8.0_352]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_352]
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) ~[?:1.8.0_352]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:422) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$null$0(NotifyTaskRestoreOperation.java:106) ~[seatunnel-starter.jar:2.3.4]
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) ~[?:1.8.0_352]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_352]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_352]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_352]
2024-03-28 16:06:42,984 WARN [o.a.k.c.s.k.KerberosLogin ] [kafka-kerberos-refresh-thread-xxxxx] - [Principal=xxxxx]: TGT renewal thread has been interrupted and will exit.
2024-03-28 16:06:43,031 WARN [o.a.k.c.s.k.KerberosLogin ] [kafka-kerberos-refresh-thread-xxxxx] - [Principal=xxxxx]: TGT renewal thread has been interrupted and will exit.
2024-03-28 16:06:43,059 WARN [o.a.k.c.s.k.KerberosLogin ] [kafka-kerberos-refresh-thread-xxxxx] - [Principal=xxxxx]: TGT renewal thread has been interrupted and will exit.
2024-03-28 16:06:44,628 WARN [.s.e.s.t.f.SourceFlowLifeCycle] [BlockingWorker-TaskGroupLocation{jobId=823367147232165937, pipelineId=1, taskGroupId=30002}] - source register failed.
java.util.concurrent.ExecutionException: org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException: task group TaskGroupLocation{jobId=823367147232165937, pipelineId=1, taskGroupId=1} not found.
at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:121) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolveAndThrowIfException(InvocationFuture.java:100) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:617) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.register(SourceFlowLifeCycle.java:214) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.open(SourceFlowLifeCycle.java:124) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:149) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944) ~[seatunnel-starter.jar:2.3.4]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_352]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_352]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_352]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_352]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_352]
Caused by: org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException: task group TaskGroupLocation{jobId=823367147232165937, pipelineId=1, taskGroupId=1} not found.
at org.apache.seatunnel.engine.server.TaskExecutionService.getActiveExecutionContext(TaskExecutionService.java:193) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.TaskExecutionService.getTask(TaskExecutionService.java:259) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation.lambda$run$1(SourceRegisterOperation.java:65) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation.run(SourceRegisterOperation.java:57) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:471) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:197) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:137) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) ~[seatunnel-starter.jar:2.3.4]
at ------ submitted from ------.() ~[?:?]
at com.hazelcast.internal.util.ExceptionUtil.cloneExceptionWithFixedAsyncStackTrace(ExceptionUtil.java:336) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:112) ~[seatunnel-starter.jar:2.3.4]
... 13 more
2024-03-28 16:06:44,628 WARN [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=823367147232165937, pipelineId=1, taskGroupId=30002}] - [127.0.0.1]:5801 [seatunnel] [5.1] Exception in org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@72c22100
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException: task group TaskGroupLocation{jobId=823367147232165937, pipelineId=1, taskGroupId=1} not found.
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.register(SourceFlowLifeCycle.java:217) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.open(SourceFlowLifeCycle.java:124) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:149) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944) ~[seatunnel-starter.jar:2.3.4]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_352]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_352]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_352]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_352]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_352]
Caused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException: task group TaskGroupLocation{jobId=823367147232165937, pipelineId=1, taskGroupId=1} not found.
at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:121) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolveAndThrowIfException(InvocationFuture.java:100) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:617) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.register(SourceFlowLifeCycle.java:214) ~[seatunnel-starter.jar:2.3.4]
... 10 more
Caused by: org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException: task group TaskGroupLocation{jobId=823367147232165937, pipelineId=1, taskGroupId=1} not found.
at org.apache.seatunnel.engine.server.TaskExecutionService.getActiveExecutionContext(TaskExecutionService.java:193) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.TaskExecutionService.getTask(TaskExecutionService.java:259) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation.lambda$run$1(SourceRegisterOperation.java:65) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation.run(SourceRegisterOperation.java:57) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:471) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:197) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:137) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) ~[seatunnel-starter.jar:2.3.4]
at ------ submitted from ------.() ~[?:?]
at com.hazelcast.internal.util.ExceptionUtil.cloneExceptionWithFixedAsyncStackTrace(ExceptionUtil.java:336) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:112) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolveAndThrowIfException(InvocationFuture.java:100) ~[seatunnel-starter.jar:2.3.4]
at com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:617) ~[seatunnel-starter.jar:2.3.4]
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.register(SourceFlowLifeCycle.java:214) ~[seatunnel-starter.jar:2.3.4]
... 10 more
Zeta or Flink or Spark Version
2.3.4
Java or Scala Version
1.8_335
Screenshots
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.
This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.