beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: error when trying to serialize headers with LongString

Open devfreitag opened this issue 1 year ago • 0 comments

What happened?

Apache Beam version: 2.55.0 Java version: 17

When attempting to read messages from RabbitMQ with List type headers (streaming dataflow), a NotSerializableException error is being returned.

Example: image

If the headers are String, Number, or Boolean, it works normally. I have tried using a CustomCoder with .setCoder(new CustomCoder()) to try to avoid the headers that are not important in this case, but it appears to throw the error before reaching this coder.

Code:

PCollection<RabbitMqMessage> rabbitMessagesCollection = pipeline
            .apply("Read RabbitMQ Messages", RabbitMqIO.read()
                    .withUri(options.getRabbitURI())
                    .withQueue(options.getInputQueue()))

Stacktrace:

Error message from worker: java.lang.IllegalArgumentException: Unable to encode element 'ValueWithRecordId{id=[], value=org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage@f9a0d014}' with coder 'ValueWithRecordId$ValueWithRecordIdCoder(SerializableCoder(org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage))'.
        org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
        org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
        org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:384)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
        org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1263)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.lambda$scheduleWorkItem$11(StreamingDataflowWorker.java:999)
        org.apache.beam.runners.dataflow.worker.streaming.Work.run(Work.java:81)
        org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:183)
        java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.NotSerializableException: com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString
        java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187)
        java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
        java.base/java.util.ArrayList.writeObject(ArrayList.java:866)
        java.base/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
        java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        java.base/java.lang.reflect.Method.invoke(Method.java:568)
        java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1070)
        java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1516)
        java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1438)
        java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
        java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
        java.base/java.util.HashMap.internalWriteEntries(HashMap.java:1944)
        java.base/java.util.HashMap.writeObject(HashMap.java:1497)
        java.base/jdk.internal.reflect.GeneratedMethodAccessor61.invoke(Unknown Source)
        java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        java.base/java.lang.reflect.Method.invoke(Method.java:568)
        java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1070)
        java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1516)
        java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1438)
        java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
        java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1572)
        java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1529)
        java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1438)
        java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
        java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1572)
        java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1529)
        java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1438)
        java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
        java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
        org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:192)
        org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:57)
        org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:106)
        org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:100)
        org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:82)
        org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
        org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
        org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:384)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
        org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1263)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.lambda$scheduleWorkItem$11(StreamingDataflowWorker.java:999)
        org.apache.beam.runners.dataflow.worker.streaming.Work.run(Work.java:81)
        org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:183)
        java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        java.base/java.lang.Thread.run(Thread.java:833)

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • [ ] Component: Python SDK
  • [X] Component: Java SDK
  • [ ] Component: Go SDK
  • [ ] Component: Typescript SDK
  • [ ] Component: IO connector
  • [ ] Component: Beam YAML
  • [ ] Component: Beam examples
  • [ ] Component: Beam playground
  • [ ] Component: Beam katas
  • [ ] Component: Website
  • [ ] Component: Spark Runner
  • [ ] Component: Flink Runner
  • [ ] Component: Samza Runner
  • [ ] Component: Twister2 Runner
  • [ ] Component: Hazelcast Jet Runner
  • [X] Component: Google Cloud Dataflow Runner

devfreitag avatar May 24 '24 17:05 devfreitag