beam
beam copied to clipboard
[Bug]: IcebergIO cannot write data into an hourly partitioned table
What happened?
When I attempt to write data into an hourly partitioned table, no matter whether the schema type Datetime is standard or logical (SQLType), it fails.
Iceberg expects a long but Beam passes a Datetime class.
package com.experiment.beam;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.*;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class HourlyPartitionedIcebergTableTest {
// Constants for testing
private static final String WAREHOUSE_DIR =
new File("src/test/resources/iceberg-test").getAbsolutePath();
private static final String WAREHOUSE_PATH = String.format("file:///%s", WAREHOUSE_DIR);
private static final Map<String, Object> CATALOG_CONFIG = new HashMap<String, Object>() {{
put("warehouse", WAREHOUSE_PATH);
put("type", "hadoop");
}};
private static final Map<String, Object> ICEBERG_CONFIG = new HashMap<String, Object>() {{
put("catalog_name", "iceberg");
put("catalog_properties", CATALOG_CONFIG);
}};
private static final Configuration CONF = new Configuration();
private static final HadoopCatalog HADOOP_CATALOG = new HadoopCatalog(CONF, WAREHOUSE_PATH);
private final String schemaName;
private final String tableName;
private final Schema beamSchema;
private final List<Row> testData;
private TableIdentifier tableIdentifier;
@Parameterized.Parameters
public static Collection<Object[]> data() {
Schema schemaWithSQLDatetime = Schema.builder()
.addStringField("id")
.addLogicalTypeField("event_time", SqlTypes.DATETIME)
.build();
Schema schemaWithDatetime = Schema.builder()
.addStringField("id")
.addDateTimeField("event_time")
.build();
return Arrays.asList(new Object[][]{
{
"myschema",
"test1",
schemaWithDatetime,
Arrays.asList(
Row.withSchema(schemaWithDatetime)
.addValues("record-1", Instant.now())
.build(),
Row.withSchema(schemaWithDatetime)
.addValues("record-2", Instant.now().minus(3600 * 1000))
.build()
)
},
{
"myschema",
"test2",
schemaWithSQLDatetime,
Arrays.asList(
Row.withSchema(schemaWithSQLDatetime)
.addValues("record-3", LocalDateTime.now())
.build(),
Row.withSchema(schemaWithSQLDatetime)
.addValues("record-4", LocalDateTime.now().minus(1, ChronoUnit.HOURS))
.build()
)
},
});
}
public HourlyPartitionedIcebergTableTest(String schemaName, String tableName, Schema beamSchema,
List<Row> hardcodedData) {
this.schemaName = schemaName;
this.tableName = tableName;
this.beamSchema = beamSchema;
this.testData = hardcodedData;
}
@Before
public void setUp() {
// Create table identifier for the test
tableIdentifier = TableIdentifier.of(schemaName, tableName);
// Define Iceberg schema
org.apache.iceberg.Schema icebergSchema = new org.apache.iceberg.Schema(
Types.NestedField.required(1, "id", Types.StringType.get()),
Types.NestedField.required(2, "event_time", Types.TimestampType.withZone())
);
PartitionSpec spec = PartitionSpec.builderFor(icebergSchema)
.hour("event_time")
.build();
// Create the Iceberg table
HADOOP_CATALOG.createTable(tableIdentifier, icebergSchema, spec);
}
@Test
public void testPipeline() {
// Define Beam pipeline options
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
ICEBERG_CONFIG.put("table", String.format("%s.%s", schemaName, tableName));
// Build the pipeline using the hardcoded data
pipeline
.apply("CreateHardcodedData", Create.of(testData).withRowSchema(beamSchema))
.apply("WriteToIceberg", Managed.write(Managed.ICEBERG).withConfig(ICEBERG_CONFIG));
// Run the pipeline
pipeline.run().waitUntilFinish();
// Verify that the table exists after data is written
assertTrue("Table should exist after writing data.", HADOOP_CATALOG.tableExists(tableIdentifier));
}
@After
public void tearDown() {
// Drop the table after the test
if (HADOOP_CATALOG.tableExists(tableIdentifier)) {
HADOOP_CATALOG.dropTable(tableIdentifier);
}
}
}
Test Results:
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Not an instance of java.lang.Long: 2024-10-18T16:14:32.934Z
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:377)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:345)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
at com.experiment.beam.HourlyPartitionedIcebergTableTest.testPipeline(HourlyPartitionedIcebergTableTest.java:136)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.IllegalStateException: Not an instance of java.lang.Long: 2024-10-18T16:14:32.934Z
at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:123)
at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:71)
at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:58)
at org.apache.iceberg.PartitionKey.partition(PartitionKey.java:106)
at org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.write(RecordWriterManager.java:136)
at org.apache.beam.sdk.io.iceberg.RecordWriterManager.write(RecordWriterManager.java:270)
at org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.processElement(WriteUngroupedRowsToFiles.java:243)
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Not an instance of java.lang.Long: 2024-10-18T17:14:32.961579Z
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:377)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:345)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
at com.experiment.beam.HourlyPartitionedIcebergTableTest.testPipeline(HourlyPartitionedIcebergTableTest.java:136)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.IllegalStateException: Not an instance of java.lang.Long: 2024-10-18T17:14:32.961579Z
at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:123)
at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:71)
at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:58)
at org.apache.iceberg.PartitionKey.partition(PartitionKey.java:106)
at org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.write(RecordWriterManager.java:136)
at org.apache.beam.sdk.io.iceberg.RecordWriterManager.write(RecordWriterManager.java:270)
at org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.processElement(WriteUngroupedRowsToFiles.java:243)
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- [ ] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [X] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [X] Component: Google Cloud Dataflow Runner