seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [MysqlCDC to kafka] org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of** org.apache.kafka.common.serialization.Serialize

Open ymm1206 opened this issue 1 month ago • 3 comments

Search before asking

  • [ ] I had searched in the issues and found no similar issues.

What happened

从mysqlcdc同步至kafka时,使用自带引擎,报序列化错误问题,jar包版本更替过,也不太行,求解决 2025-12-05 10:18:36,344 ERROR [o.a.s.c.s.SeaTunnel ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:213) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: CheckpointCoordinator inside have error. at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:281) at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:277) at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.reportCheckpointErrorFromTask(CheckpointCoordinator.java:390) at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.reportCheckpointErrorFromTask(CheckpointManager.java:184) at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation.runInternal(CheckpointErrorReportOperation.java:48) at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:44) at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:439) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:289) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:316) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:301) at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaInternalProducer.(KafkaInternalProducer.java:46) at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender.getTransactionProducer(KafkaTransactionSender.java:128) at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender.beginTransaction(KafkaTransactionSender.java:62) at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.(KafkaSinkWriter.java:103) at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSink.createWriter(KafkaSink.java:56) at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.restoreState(SinkFlowLifeCycle.java:293) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$restoreState$16(SeaTunnelTask.java:426) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.restoreState(SeaTunnelTask.java:423) at org.apache.seatunnel.engine.server.checkpoint.operation.NotifyTaskRestoreOperation.lambda$runInternal$0(NotifyTaskRestoreOperation.java:107) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:403) at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434) at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:365) ... 27 more

    ... 12 more

    at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:205)
    ... 2 more

SeaTunnel Version

2.3.8 web 1.0.2

SeaTunnel Config

env {
  parallelism = 2 
  job.mode = "STREAMING"
  checkpoint = 10000
}
source {
  MySQL-CDC {
    base-url = "jdbc:mysql://10.1.2.25:33**/***"
    username = "***"
    password = "***"
    table-names = ["c.eda_handle"]  
	startup.mode = "initial"
	catalog {
        factory=MySQL
    }
    format = compatible_debezium_json
    debezium = {
        # include schema into kafka message
        key.converter.schemas.enable = false
        value.converter.schemas.enable = false
        # include ddl
        include.schema.changes = true
        # topic prefix
        database.server.name =  "mysql_cdc_"
    }
    schema = {
        fields = {
            topic = string
            key = string
            value = string
        }
    }
  }
}
sink {
  kafka {
	topic = "test-topics"
	bootstrap.servers = "10.1.2.22:2161"
	format = compatible_debezium_json
	kafka.request.timeout.ms = 60000
	semantics = EXACTLY_ONCE
	kafka.config{
		acks = "all"
		request.timeout.ms = 60000
		buffer.memory = 33554432
	}
  }
}

Running Command

./bin/seatunnel.sh --config ./config/mysqlcdc_hdfs_.config.template -e local

Error Exception

*org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of** org.apache.kafka.common.serialization.Serialize

Zeta or Flink or Spark Version

2.3.8

Java or Scala Version

1.8

Screenshots

need help!!!!!!!!

Are you willing to submit PR?

  • [x] Yes I am willing to submit a PR!

Code of Conduct

ymm1206 avatar Dec 05 '25 03:12 ymm1206

How come no one has solved this problem???????

ymm1206 avatar Dec 08 '25 01:12 ymm1206

It looks like you use different kafka client jars. Check in Seatunnel whether there are multiple versions of kafka-clients-*.jar, or if your own job has additionally placed other Kafka-related jars.

davidzollo avatar Dec 08 '25 13:12 davidzollo

Thank you, I have already solved this issue. It was caused by a conflict between the version of kafka-client and the Kafka-related dependencies in the lib or connector package. The underlying problem should be a classloader recognition error. My solution was to directly place the jar package of the current Kafka client cluster in the plugin folder, and the issue was resolved

ymm1206 avatar Dec 10 '25 01:12 ymm1206