seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [salserver to kafka] Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.NegativeArraySizeException

Open aa8808021 opened this issue 1 year ago • 0 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

Error synchronizing data from multiple tables to Kafka from MSSQL

SeaTunnel Version

2.3.5

SeaTunnel Config

env {
  # You can set engine configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  SqlServer-CDC {
    base-url = "jdbc:sqlserver://localhost:1433;databaseName=test_01"
    username = "sa"
    password = "password"
    database-names = ["test_01"]
    table-names = ["test_01.dbo.table_a","test_01.dbo.table_b"]
    startup.mode="initial"

  }
}

transform {
}

sink {
  Kafka {
    bootstrap.servers = "xxxx"
    topic = "SYNC-TEST"

    kafka.request.timeout.ms = 60000
    semantics = EXACTLY_ONCE
  }
}

Running Command

./bin/seatunnel.sh --config ./config/test.config -e local

Error Exception

2024-05-16 21:08:04,646 ERROR [o.a.k.c.p.KafkaProducer       ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - [Producer clientId=producer-SeaTunnel4023-1, transactionalId=SeaTunnel4023-1] Interrupted while joining ioThread
java.lang.InterruptedException: null
	at java.lang.Object.wait(Native Method) ~[?:1.8.0_391]
	at java.lang.Thread.join(Thread.java:1265) ~[?:1.8.0_391]
	at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1265) ~[?:?]
	at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1242) ~[?:?]
	at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1218) ~[?:?]
	at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaTransactionSender.close(KafkaTransactionSender.java:119) ~[?:?]
	at org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.close(KafkaSinkWriter.java:140) ~[?:?]
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:158) ~[seatunnel-starter.jar:2.3.5]
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$5(SeaTunnelTask.java:330) ~[seatunnel-starter.jar:2.3.5]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) [?:1.8.0_391]
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) [?:1.8.0_391]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) [?:1.8.0_391]
	at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) [?:1.8.0_391]
	at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) [?:1.8.0_391]
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_391]
	at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1881) [?:1.8.0_391]
	at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2478) [?:1.8.0_391]
	at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) [?:1.8.0_391]
	at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) [?:1.8.0_391]
	at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) [?:1.8.0_391]
	at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) [?:1.8.0_391]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) [?:1.8.0_391]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) [?:1.8.0_391]
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) [?:1.8.0_391]
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) [?:1.8.0_391]
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.close(SeaTunnelTask.java:327) [seatunnel-starter.jar:2.3.5]
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:720) [seatunnel-starter.jar:2.3.5]
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004) [seatunnel-starter.jar:2.3.5]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_391]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_391]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_391]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_391]
	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_391]
2024-05-16 21:08:04,646 INFO  [o.a.k.c.p.KafkaProducer       ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - [Producer clientId=producer-SeaTunnel4023-1, transactionalId=SeaTunnel4023-1] Proceeding to force close the producer since pending requests could not be completed within timeout 9223372036854775807 ms.
2024-05-16 21:08:04,647 INFO  [o.a.k.c.m.Metrics             ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - Metrics scheduler closed
2024-05-16 21:08:04,647 INFO  [o.a.k.c.m.Metrics             ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-05-16 21:08:04,647 INFO  [o.a.k.c.m.Metrics             ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - Metrics reporters closed
2024-05-16 21:08:04,647 INFO  [o.a.k.c.u.AppInfoParser       ] [BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=90000}] - App info kafka.producer for producer-SeaTunnel4023-1 unregistered
2024-05-16 21:08:04,665 INFO  [c.h.i.i.NodeExtension         ] [main] - [localhost]:5801 [seatunnel-253591] [5.1] Destroying node NodeExtension.
2024-05-16 21:08:04,666 INFO  [c.h.i.i.Node                  ] [main] - [localhost]:5801 [seatunnel-253591] [5.1] Hazelcast Shutdown is completed in 28 ms.
2024-05-16 21:08:04,666 INFO  [c.h.c.LifecycleService        ] [main] - [localhost]:5801 [seatunnel-253591] [5.1] [localhost]:5801 is SHUTDOWN
2024-05-16 21:08:04,666 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed HazelcastInstance ......
2024-05-16 21:08:04,666 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - Closed metrics executor service ......
2024-05-16 21:08:04,666 ERROR [o.a.s.c.s.SeaTunnel           ] [main] -

===============================================================================


2024-05-16 21:08:04,666 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Fatal Error,

2024-05-16 21:08:04,666 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Please submit bug report in https://github.com/apache/seatunnel/issues

2024-05-16 21:08:04,666 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Reason:SeaTunnel job executed failed

2024-05-16 21:08:04,667 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:202)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.NegativeArraySizeException
	at com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:111)
	at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:274)
	at com.hazelcast.spi.impl.NodeEngineImpl.toObject(NodeEngineImpl.java:385)
	at com.hazelcast.collection.impl.queue.QueueProxyImpl.drainTo(QueueProxyImpl.java:134)
	at org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle.collect(ShuffleSourceFlowLifeCycle.java:94)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NegativeArraySizeException
	at org.apache.seatunnel.api.table.type.SeaTunnelRow.<init>(SeaTunnelRow.java:38)
	at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:75)
	at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:33)
	at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
	at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:268)
	... 13 more

	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:194)
	... 2 more

2024-05-16 21:08:04,667 ERROR [o.a.s.c.s.SeaTunnel           ] [main] -
===============================================================================



Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:202)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.NegativeArraySizeException
	at com.hazelcast.internal.serialization.impl.SerializationUtil.handleException(SerializationUtil.java:111)
	at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:274)
	at com.hazelcast.spi.impl.NodeEngineImpl.toObject(NodeEngineImpl.java:385)
	at com.hazelcast.collection.impl.queue.QueueProxyImpl.drainTo(QueueProxyImpl.java:134)
	at org.apache.seatunnel.engine.server.task.flow.ShuffleSourceFlowLifeCycle.collect(ShuffleSourceFlowLifeCycle.java:94)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NegativeArraySizeException
	at org.apache.seatunnel.api.table.type.SeaTunnelRow.<init>(SeaTunnelRow.java:38)
	at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:75)
	at org.apache.seatunnel.engine.server.serializable.RecordSerializer.read(RecordSerializer.java:33)
	at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:44)
	at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toObject(AbstractSerializationService.java:268)
	... 13 more

	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:194)
	... 2 more
2024-05-16 21:08:09,127 INFO  [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=30000}] - Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher
2024-05-16 21:08:09,138 INFO  [i.d.j.JdbcConnection          ] [pool-26-thread-1] - Connection gracefully closed
2024-05-16 21:08:09,139 INFO  [i.d.j.JdbcConnection          ] [pool-27-thread-1] - Connection gracefully closed
2024-05-16 21:08:09,140 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited.
2024-05-16 21:08:14,521 INFO  [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=30000}] - Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher
2024-05-16 21:08:14,525 INFO  [i.d.j.JdbcConnection          ] [pool-28-thread-1] - Connection gracefully closed
2024-05-16 21:08:14,527 INFO  [i.d.j.JdbcConnection          ] [pool-29-thread-1] - Connection gracefully closed
2024-05-16 21:08:14,527 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=843474790923632641, pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited.
2024-05-16 21:08:18,585 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-5] - Metrics scheduler closed
2024-05-16 21:08:18,585 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-5] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-05-16 21:08:18,585 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-5] - Metrics reporters closed
2024-05-16 21:08:18,585 INFO  [o.a.k.c.u.AppInfoParser       ] [ForkJoinPool.commonPool-worker-5] - App info kafka.producer for producer-SeaTunnel6694-1 unregistered
2024-05-16 21:08:18,597 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-3] - Metrics scheduler closed
2024-05-16 21:08:18,597 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-3] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-05-16 21:08:18,597 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-3] - Metrics reporters closed
2024-05-16 21:08:18,597 INFO  [o.a.k.c.u.AppInfoParser       ] [ForkJoinPool.commonPool-worker-3] - App info kafka.producer for producer-SeaTunnel3841-1 unregistered
2024-05-16 21:08:29,196 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-4] - Metrics scheduler closed
2024-05-16 21:08:29,196 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-4] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-05-16 21:08:29,196 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-4] - Metrics reporters closed
2024-05-16 21:08:29,197 INFO  [o.a.k.c.u.AppInfoParser       ] [ForkJoinPool.commonPool-worker-4] - App info kafka.producer for producer-SeaTunnel4066-1 unregistered
2024-05-16 21:08:29,197 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-2] - Metrics scheduler closed
2024-05-16 21:08:29,197 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-2] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-05-16 21:08:29,198 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-2] - Metrics reporters closed
2024-05-16 21:08:29,198 INFO  [o.a.k.c.u.AppInfoParser       ] [ForkJoinPool.commonPool-worker-2] - App info kafka.producer for producer-SeaTunnel7334-1 unregistered
2024-05-16 21:08:34,594 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-6] - Metrics scheduler closed
2024-05-16 21:08:34,595 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-6] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-05-16 21:08:34,595 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-6] - Metrics reporters closed
2024-05-16 21:08:34,596 INFO  [o.a.k.c.u.AppInfoParser       ] [ForkJoinPool.commonPool-worker-6] - App info kafka.producer for producer-SeaTunnel3209-1 unregistered
2024-05-16 21:08:34,595 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-1] - Metrics scheduler closed
2024-05-16 21:08:34,598 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-1] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-05-16 21:08:34,598 INFO  [o.a.k.c.m.Metrics             ] [ForkJoinPool.commonPool-worker-1] - Metrics reporters closed
2024-05-16 21:08:34,599 INFO  [o.a.k.c.u.AppInfoParser       ] [ForkJoinPool.commonPool-worker-1] - App info kafka.producer for producer-SeaTunnel5301-1 unregistered
2024-05-16 21:08:34,604 INFO  [s.c.s.s.c.ClientExecuteCommand] [ForkJoinPool.commonPool-worker-1] - run shutdown hook because get close signal

Zeta or Flink or Spark Version

Zeta

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

aa8808021 avatar May 16 '24 13:05 aa8808021