beam
beam copied to clipboard
[Bug]: error when trying to serialize headers with LongString
What happened?
Apache Beam version: 2.55.0 Java version: 17
When attempting to read messages from RabbitMQ with List type headers (streaming dataflow), a NotSerializableException error is being returned.
Example:
If the headers are String, Number, or Boolean, it works normally. I have tried using a CustomCoder with .setCoder(new CustomCoder()) to try to avoid the headers that are not important in this case, but it appears to throw the error before reaching this coder.
Code:
PCollection<RabbitMqMessage> rabbitMessagesCollection = pipeline
.apply("Read RabbitMQ Messages", RabbitMqIO.read()
.withUri(options.getRabbitURI())
.withQueue(options.getInputQueue()))
Stacktrace:
Error message from worker: java.lang.IllegalArgumentException: Unable to encode element 'ValueWithRecordId{id=[], value=org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage@f9a0d014}' with coder 'ValueWithRecordId$ValueWithRecordIdCoder(SerializableCoder(org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage))'.
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:384)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1263)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.lambda$scheduleWorkItem$11(StreamingDataflowWorker.java:999)
org.apache.beam.runners.dataflow.worker.streaming.Work.run(Work.java:81)
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:183)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.NotSerializableException: com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187)
java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
java.base/java.util.ArrayList.writeObject(ArrayList.java:866)
java.base/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source)
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base/java.lang.reflect.Method.invoke(Method.java:568)
java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1070)
java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1516)
java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1438)
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
java.base/java.util.HashMap.internalWriteEntries(HashMap.java:1944)
java.base/java.util.HashMap.writeObject(HashMap.java:1497)
java.base/jdk.internal.reflect.GeneratedMethodAccessor61.invoke(Unknown Source)
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base/java.lang.reflect.Method.invoke(Method.java:568)
java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1070)
java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1516)
java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1438)
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1572)
java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1529)
java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1438)
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1572)
java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1529)
java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1438)
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:192)
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:57)
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:106)
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:100)
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:82)
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558)
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:384)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128)
org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1263)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.lambda$scheduleWorkItem$11(StreamingDataflowWorker.java:999)
org.apache.beam.runners.dataflow.worker.streaming.Work.run(Work.java:81)
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:183)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
java.base/java.lang.Thread.run(Thread.java:833)
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- [ ] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [X] Component: Google Cloud Dataflow Runner