seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [connector-kafka] submit job kafka to kafka failed

Open gitfortian opened this issue 10 months ago • 1 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

Error reported after running the task for a period of time,i use seatunnnel web to recover job then exception appears,job recover failed

SeaTunnel Version

2.3.4

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "STREAMING"
}

source {
    Kafka {
        topic = "xxx"
        bootstrap.servers = "127.0.0.1:9092"
        consumer.group = "xxx"
        kafka.config = {
            security.protocol = SSL
            ssl.disableTLSHostnameVerification = true
            ssl.certificate.location = /xxx/conf/kafka.client.pem
			ssl.ca.location = /xxx/conf/ca-cert
			ssl.key.password = xxx
			ssl.key.location = /xxx/conf/kafka.client.key
			ssl.keystore.location = /xxx/conf/kafka.client.keystore.jks
		    ssl.truststore.password = xxx
		    ssl.truststore.location = /xxx/conf/kafka.client.truststore.jks
        }
    }
}

sink {
  kafka {
      topic = "xxx"
      bootstrap.servers = "127.0.0.1:9092"
      format = json
      kafka.request.timeout.ms = 60000
      semantics = EXACTLY_ONCE
      kafka.config = {
        acks = "all"
        request.timeout.ms = 60000
        buffer.memory = 33554432
        security.protocol=SASL_PLAINTEXT
        sasl.kerberos.service.name=kafka
        sasl.mechanism=GSSAPI
        java.security.krb5.conf="/xxx/krb5.conf"
        sasl.jaas.config="com.sun.security.auth.module.Krb5LoginModule required \n        useKeyTab=true \n        storeKey=true  \n        keyTab=\"/xxx.keytab\" \n        principal=\"xxxCOM\";"
        java.security.auth.login.config="/xxx/config/kafka_client_jaas.conf"
    }
  }
}

Running Command

use seatunnnel web to submit

Error Exception

2024-03-28 16:06:42,809 WARN  [o.a.k.c.u.AppInfoParser       ] [hz.main.seaTunnel.task.thread-7359] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-1
        at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[?:1.8.0_352]
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[?:1.8.0_352]
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[?:1.8.0_352]
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[?:1.8.0_352]
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[?:1.8.0_352]
        at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:1.8.0_352]
        at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) ~[?:?]
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:433) ~[?:?]
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289) ~[?:?]
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:316) ~[?:?]
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:301) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaNoTransactionSender.<init>(KafkaNoTransactionSender.java:43) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.<init>(KafkaSinkWriter.java:107) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSink.createWriter(KafkaSink.java:56) ~[?:?]
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:297) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$16(SeaTunnelTask.java:425) ~[seatunnel-starter.jar:2.3.4]
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ~[?:1.8.0_352]
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_352]
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) ~[?:1.8.0_352]
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) ~[?:1.8.0_352]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_352]
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_352]
        at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) ~[?:1.8.0_352]
        at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) ~[?:1.8.0_352]
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_352]
        at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) ~[?:1.8.0_352]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:422) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$null$0(NotifyTaskRestoreOperation.java:106) ~[seatunnel-starter.jar:2.3.4]
        at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) ~[?:1.8.0_352]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_352]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_352]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_352]
2024-03-28 16:06:42,984 WARN  [o.a.k.c.s.k.KerberosLogin     ] [kafka-kerberos-refresh-thread-xxxxx] - [Principal=xxxxx]: TGT renewal thread has been interrupted and will exit.
2024-03-28 16:06:43,031 WARN  [o.a.k.c.s.k.KerberosLogin     ] [kafka-kerberos-refresh-thread-xxxxx] - [Principal=xxxxx]: TGT renewal thread has been interrupted and will exit.
2024-03-28 16:06:43,059 WARN  [o.a.k.c.s.k.KerberosLogin     ] [kafka-kerberos-refresh-thread-xxxxx] - [Principal=xxxxx]: TGT renewal thread has been interrupted and will exit.
2024-03-28 16:06:44,628 WARN  [.s.e.s.t.f.SourceFlowLifeCycle] [BlockingWorker-TaskGroupLocation{jobId=823367147232165937, pipelineId=1, taskGroupId=30002}] - source register failed.
java.util.concurrent.ExecutionException: org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException: task group TaskGroupLocation{jobId=823367147232165937, pipelineId=1, taskGroupId=1} not found.
        at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:121) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolveAndThrowIfException(InvocationFuture.java:100) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:617) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.register(SourceFlowLifeCycle.java:214) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.open(SourceFlowLifeCycle.java:124) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:149) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944) ~[seatunnel-starter.jar:2.3.4]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_352]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_352]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_352]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_352]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_352]
Caused by: org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException: task group TaskGroupLocation{jobId=823367147232165937, pipelineId=1, taskGroupId=1} not found.
        at org.apache.seatunnel.engine.server.TaskExecutionService.getActiveExecutionContext(TaskExecutionService.java:193) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.TaskExecutionService.getTask(TaskExecutionService.java:259) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation.lambda$run$1(SourceRegisterOperation.java:65) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation.run(SourceRegisterOperation.java:57) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:471) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:197) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:137) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) ~[seatunnel-starter.jar:2.3.4]
        at ------ submitted from ------.() ~[?:?]
        at com.hazelcast.internal.util.ExceptionUtil.cloneExceptionWithFixedAsyncStackTrace(ExceptionUtil.java:336) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:112) ~[seatunnel-starter.jar:2.3.4]
        ... 13 more
2024-03-28 16:06:44,628 WARN  [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=823367147232165937, pipelineId=1, taskGroupId=30002}] - [127.0.0.1]:5801 [seatunnel] [5.1] Exception in org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@72c22100
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException: task group TaskGroupLocation{jobId=823367147232165937, pipelineId=1, taskGroupId=1} not found.
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.register(SourceFlowLifeCycle.java:217) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.open(SourceFlowLifeCycle.java:124) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:149) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:121) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:643) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:944) ~[seatunnel-starter.jar:2.3.4]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_352]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_352]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_352]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_352]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_352]
Caused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException: task group TaskGroupLocation{jobId=823367147232165937, pipelineId=1, taskGroupId=1} not found.
        at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:121) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolveAndThrowIfException(InvocationFuture.java:100) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:617) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.register(SourceFlowLifeCycle.java:214) ~[seatunnel-starter.jar:2.3.4]
        ... 10 more
Caused by: org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException: task group TaskGroupLocation{jobId=823367147232165937, pipelineId=1, taskGroupId=1} not found.
        at org.apache.seatunnel.engine.server.TaskExecutionService.getActiveExecutionContext(TaskExecutionService.java:193) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.TaskExecutionService.getTask(TaskExecutionService.java:259) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation.lambda$run$1(SourceRegisterOperation.java:65) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation.run(SourceRegisterOperation.java:57) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:471) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:197) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:137) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) ~[seatunnel-starter.jar:2.3.4]
        at ------ submitted from ------.() ~[?:?]
        at com.hazelcast.internal.util.ExceptionUtil.cloneExceptionWithFixedAsyncStackTrace(ExceptionUtil.java:336) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:112) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolveAndThrowIfException(InvocationFuture.java:100) ~[seatunnel-starter.jar:2.3.4]
        at com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:617) ~[seatunnel-starter.jar:2.3.4]
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.register(SourceFlowLifeCycle.java:214) ~[seatunnel-starter.jar:2.3.4]
        ... 10 more

Zeta or Flink or Spark Version

2.3.4

Java or Scala Version

1.8_335

Screenshots

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

gitfortian avatar Mar 28 '24 08:03 gitfortian

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] avatar Apr 28 '24 00:04 github-actions[bot]

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.

github-actions[bot] avatar May 06 '24 00:05 github-actions[bot]