beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: IcebergIO fails when the table's partition layout is changed at runtime

Open DanielMorales9 opened this issue 1 year ago • 1 comments

What happened?

A Streaming pipeline continuously fires the below RuntimeException when the partition layout of an Iceberg table is modified during execution.

SEVERE: 2024-10-18T13:53:19.171Z: java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException: Invalid partition spec id '1'. This DataFile was originally created with spec id '0'.
        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:187)
        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1058)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:445)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:130)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1061)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:932)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:793)
        org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:97)
        org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
        org.apache.beam.runners.dataflow.worker.streaming.ComputationWorkExecutor.executeWork(ComputationWorkExecutor.java:78)
        org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler.executeWork(StreamingWorkScheduler.java:382)
        org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler.processWork(StreamingWorkScheduler.java:255)
        org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler.lambda$processWork$4(StreamingWorkScheduler.java:269)
        org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork.run(ExecutableWork.java:38)
        org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeMonitorHeld$0(BoundedQueueExecutor.java:234)
        java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException: Invalid partition spec id '1'. This DataFile was originally created with spec id '0'.
        org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
        org.apache.beam.sdk.io.iceberg.AppendFilesToTables$AppendFilesToTablesDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
        org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:185)
        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1058)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:445)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:130)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1061)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:932)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:793)
        org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:97)
        org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
        org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
        org.apache.beam.runners.dataflow.worker.streaming.ComputationWorkExecutor.executeWork(ComputationWorkExecutor.java:78)
        org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler.executeWork(StreamingWorkScheduler.java:382)
        org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler.processWork(StreamingWorkScheduler.java:255)
        org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler.lambda$processWork$4(StreamingWorkScheduler.java:269)
        org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork.run(ExecutableWork.java:38)
        org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeMonitorHeld$0(BoundedQueueExecutor.java:234)
        java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalStateException: Invalid partition spec id '1'. This DataFile was originally created with spec id '0'.
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:686)
        org.apache.beam.sdk.io.iceberg.SerializableDataFile.createDataFile(SerializableDataFile.java:145)
        org.apache.beam.sdk.io.iceberg.FileWriteResult.getDataFile(FileWriteResult.java:51)
        org.apache.beam.sdk.io.iceberg.AppendFilesToTables$AppendFilesToTablesDoFn.processElement(AppendFilesToTables.java:115)
Oct 18, 2024 3:53:30 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • [ ] Component: Python SDK
  • [x] Component: Java SDK
  • [ ] Component: Go SDK
  • [ ] Component: Typescript SDK
  • [x] Component: IO connector
  • [ ] Component: Beam YAML
  • [ ] Component: Beam examples
  • [ ] Component: Beam playground
  • [ ] Component: Beam katas
  • [ ] Component: Website
  • [ ] Component: Infrastructure
  • [ ] Component: Spark Runner
  • [ ] Component: Flink Runner
  • [ ] Component: Samza Runner
  • [ ] Component: Twister2 Runner
  • [ ] Component: Hazelcast Jet Runner
  • [x] Component: Google Cloud Dataflow Runner

DanielMorales9 avatar Oct 18 '24 14:10 DanielMorales9

Hey @DanielMorales9 👋🏽

Can you take a take a look at #32879 ?

ahmedabu98 avatar Oct 18 '24 23:10 ahmedabu98