beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: IcebergIO cannot write data into an hourly partitioned table

Open DanielMorales9 opened this issue 1 year ago • 0 comments

What happened?

When I attempt to write data into an hourly partitioned table, no matter whether the schema type Datetime is standard or logical (SQLType), it fails.

Iceberg expects a long but Beam passes a Datetime class.

package com.experiment.beam;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.File;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.*;

import static org.junit.Assert.assertTrue;

@RunWith(Parameterized.class)
public class HourlyPartitionedIcebergTableTest {

    // Constants for testing
    private static final String WAREHOUSE_DIR =
            new File("src/test/resources/iceberg-test").getAbsolutePath();
    private static final String WAREHOUSE_PATH = String.format("file:///%s", WAREHOUSE_DIR);
    private static final Map<String, Object> CATALOG_CONFIG = new HashMap<String, Object>() {{
        put("warehouse", WAREHOUSE_PATH);
        put("type", "hadoop");
    }};
    private static final Map<String, Object> ICEBERG_CONFIG = new HashMap<String, Object>() {{
        put("catalog_name", "iceberg");
        put("catalog_properties", CATALOG_CONFIG);
    }};
    private static final Configuration CONF = new Configuration();
    private static final HadoopCatalog HADOOP_CATALOG = new HadoopCatalog(CONF, WAREHOUSE_PATH);

    private final String schemaName;
    private final String tableName;
    private final Schema beamSchema;
    private final List<Row> testData;
    private TableIdentifier tableIdentifier;


    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        Schema schemaWithSQLDatetime = Schema.builder()
                .addStringField("id")
                .addLogicalTypeField("event_time", SqlTypes.DATETIME)
                .build();
        Schema schemaWithDatetime = Schema.builder()
                .addStringField("id")
                .addDateTimeField("event_time")
                .build();
        return Arrays.asList(new Object[][]{
                {
                        "myschema",
                        "test1",
                        schemaWithDatetime,
                        Arrays.asList(
                                Row.withSchema(schemaWithDatetime)
                                        .addValues("record-1", Instant.now())
                                        .build(),
                                Row.withSchema(schemaWithDatetime)
                                        .addValues("record-2", Instant.now().minus(3600 * 1000))
                                        .build()
                        )
                },
                {
                        "myschema",
                        "test2",
                        schemaWithSQLDatetime,
                        Arrays.asList(
                                Row.withSchema(schemaWithSQLDatetime)
                                        .addValues("record-3", LocalDateTime.now())
                                        .build(),
                                Row.withSchema(schemaWithSQLDatetime)
                                        .addValues("record-4", LocalDateTime.now().minus(1, ChronoUnit.HOURS))
                                        .build()
                        )
                },
        });
    }

    public HourlyPartitionedIcebergTableTest(String schemaName, String tableName, Schema beamSchema,
                                             List<Row> hardcodedData) {
        this.schemaName = schemaName;
        this.tableName = tableName;
        this.beamSchema = beamSchema;
        this.testData = hardcodedData;
    }

    @Before
    public void setUp() {
        // Create table identifier for the test
        tableIdentifier = TableIdentifier.of(schemaName, tableName);

        // Define Iceberg schema
        org.apache.iceberg.Schema icebergSchema = new org.apache.iceberg.Schema(
                Types.NestedField.required(1, "id", Types.StringType.get()),
                Types.NestedField.required(2, "event_time", Types.TimestampType.withZone())
        );

        PartitionSpec spec = PartitionSpec.builderFor(icebergSchema)
                .hour("event_time")
                .build();

        // Create the Iceberg table
        HADOOP_CATALOG.createTable(tableIdentifier, icebergSchema, spec);
    }

    @Test
    public void testPipeline() {
        // Define Beam pipeline options
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline pipeline = Pipeline.create(options);

        ICEBERG_CONFIG.put("table", String.format("%s.%s", schemaName, tableName));

        // Build the pipeline using the hardcoded data
        pipeline
                .apply("CreateHardcodedData", Create.of(testData).withRowSchema(beamSchema))
                .apply("WriteToIceberg", Managed.write(Managed.ICEBERG).withConfig(ICEBERG_CONFIG));

        // Run the pipeline
        pipeline.run().waitUntilFinish();

        // Verify that the table exists after data is written
        assertTrue("Table should exist after writing data.", HADOOP_CATALOG.tableExists(tableIdentifier));
    }

    @After
    public void tearDown() {
        // Drop the table after the test
        if (HADOOP_CATALOG.tableExists(tableIdentifier)) {
            HADOOP_CATALOG.dropTable(tableIdentifier);
        }
    }
}

Test Results:

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Not an instance of java.lang.Long: 2024-10-18T16:14:32.934Z

	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:377)
	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:345)
	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
	at com.experiment.beam.HourlyPartitionedIcebergTableTest.testPipeline(HourlyPartitionedIcebergTableTest.java:136)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runners.Suite.runChild(Suite.java:128)
	at org.junit.runners.Suite.runChild(Suite.java:27)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runners.Suite.runChild(Suite.java:128)
	at org.junit.runners.Suite.runChild(Suite.java:27)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.IllegalStateException: Not an instance of java.lang.Long: 2024-10-18T16:14:32.934Z
	at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:123)
	at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:71)
	at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:58)
	at org.apache.iceberg.PartitionKey.partition(PartitionKey.java:106)
	at org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.write(RecordWriterManager.java:136)
	at org.apache.beam.sdk.io.iceberg.RecordWriterManager.write(RecordWriterManager.java:270)
	at org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.processElement(WriteUngroupedRowsToFiles.java:243)

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: Not an instance of java.lang.Long: 2024-10-18T17:14:32.961579Z

	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:377)
	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:345)
	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
	at com.experiment.beam.HourlyPartitionedIcebergTableTest.testPipeline(HourlyPartitionedIcebergTableTest.java:136)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runners.Suite.runChild(Suite.java:128)
	at org.junit.runners.Suite.runChild(Suite.java:27)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runners.Suite.runChild(Suite.java:128)
	at org.junit.runners.Suite.runChild(Suite.java:27)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
	at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.IllegalStateException: Not an instance of java.lang.Long: 2024-10-18T17:14:32.961579Z
	at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:123)
	at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:71)
	at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:58)
	at org.apache.iceberg.PartitionKey.partition(PartitionKey.java:106)
	at org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.write(RecordWriterManager.java:136)
	at org.apache.beam.sdk.io.iceberg.RecordWriterManager.write(RecordWriterManager.java:270)
	at org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.processElement(WriteUngroupedRowsToFiles.java:243)

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • [ ] Component: Python SDK
  • [X] Component: Java SDK
  • [ ] Component: Go SDK
  • [ ] Component: Typescript SDK
  • [X] Component: IO connector
  • [ ] Component: Beam YAML
  • [ ] Component: Beam examples
  • [ ] Component: Beam playground
  • [ ] Component: Beam katas
  • [ ] Component: Website
  • [ ] Component: Infrastructure
  • [ ] Component: Spark Runner
  • [ ] Component: Flink Runner
  • [ ] Component: Samza Runner
  • [ ] Component: Twister2 Runner
  • [ ] Component: Hazelcast Jet Runner
  • [X] Component: Google Cloud Dataflow Runner

DanielMorales9 avatar Oct 18 '24 16:10 DanielMorales9