beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: ClassCastException when using custom DynamicDestination in BigQueryIO.Write

Open dartiga opened this issue 2 years ago • 15 comments

What happened?

Beam: 2.40

While using custom DynamicDestination in BigQueryIO.Write got the following exception:

java.lang.ClassCastException: org.apache.beam.sdk.io.gcp.bigquery.TableDestination cannot be cast to java.lang.String com.king.da.destinations.KingAppDestinations.getTable(KingAppDestinations.java:17)
 org.apache.beam.sdk.io.gcp.bigquery.UpdateSchemaDestination.processElement(UpdateSchemaDestination.java:131)
   org.apache.beam.sdk.io.gcp.bigquery.UpdateSchemaDestination$DoFnInvoker.invokeProcessElement(Unknown Source)

Find below test case to reproduce:

import java.io.IOException;
import java.security.SecureRandom;
import java.util.Arrays;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;

@RunWith(JUnit4.class)
public class UpdateSchemaDestinationTest {

    static final BigqueryClient BQ_CLIENT = new BigqueryClient(UpdateSchemaDestinationTest.class.getName());

    static final String DATASET_ID =
            "schema_update_options_class_cast_excepption"
                + System.currentTimeMillis()
                + "_"
                + new SecureRandom().nextInt(32);

    static TestBigQueryOptions options = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class);

    static Pipeline pipeline;

    @BeforeClass
    public static void setUpAll() throws IOException, InterruptedException {
        options.setTempLocation(options.getTempRoot() + "/bq_it_temp");

        pipeline = Pipeline.create(options);

        BQ_CLIENT.createNewDataset(options.getProject(), DATASET_ID);
    }

    @AfterClass
    public static void tearDownAll() {
        BQ_CLIENT.deleteDataset(options.getProject(), DATASET_ID);
    }

    @Test
    public void classCastExceptionRegression() {
        DynamicDestinations<KV<String, String>, String> destinations = new SomeDynamicDestinations();

        PCollection<KV<String, String>> rows = pipeline
            .apply(Create.of(
                KV.of("table","foo"),
                KV.of("table","bar")
            ));

        rows.apply(BigQueryIO.<KV<String, String>>write()
            .withFormatFunction(kv -> new TableRow().set("name", kv.getValue()))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMaxBytesPerPartition(1)
            .to(destinations));

        pipeline.run().waitUntilFinish();
    }

    private static final class SomeDynamicDestinations extends DynamicDestinations<KV<String, String>, String> {

        private static final long serialVersionUID = 1L;

        @Override
        public String getDestination(@Nullable ValueInSingleWindow<KV<String, String>> element) {
            return element.getValue().getKey();
        }

        @Override
        public TableDestination getTable(String destination) {
            return new TableDestination(DATASET_ID + "." + destination, "a table");
        }

        @Override
        public @Nullable TableSchema getSchema(String destination) {
            return new TableSchema().setFields(Arrays.asList(new TableFieldSchema().setName("name").setType("STRING")));
        }
    }
}

Issue Priority

Priority: 1

Issue Component

Component: io-java-gcp

dartiga avatar Jul 30 '22 07:07 dartiga

I am not able to replicate this exception with the code you provided.

I suspect the issue is that your dynamicDestinations expects a String argument to the getTable method, but a TableDestination is being passed instead, causing the casting exception

johnjcasey avatar Aug 04 '22 14:08 johnjcasey

Can you update your replication code to make this reproducible?

johnjcasey avatar Aug 04 '22 14:08 johnjcasey

I am not able to replicate this exception with the code you provided.

Are you getting any error or just green test? I am able to replicate the issue using the code I posted:

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: org.apache.beam.sdk.io.gcp.bigquery.TableDestination cannot be cast to java.lang.String
	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
	at com.king.da.UpdateSchemaDestinationTest.classCastExceptionRegression(UpdateSchemaDestinationTest.java:78)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
	at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
	at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
	at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:95)
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:91)
	at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:60)
	at org.eclipse.jdt.internal.junit5.runner.JUnit5TestReference.run(JUnit5TestReference.java:98)
	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:40)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:529)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:756)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:452)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210)
Caused by: java.lang.ClassCastException: org.apache.beam.sdk.io.gcp.bigquery.TableDestination cannot be cast to java.lang.String
	at com.king.da.UpdateSchemaDestinationTest$SomeDynamicDestinations.getTable(UpdateSchemaDestinationTest.java:1)
	at org.apache.beam.sdk.io.gcp.bigquery.UpdateSchemaDestination.processElement(UpdateSchemaDestination.java:131)

dartiga avatar Aug 04 '22 16:08 dartiga

Updated code with missing imports just in case.

dartiga avatar Aug 04 '22 16:08 dartiga

I'm running into a separate issue

com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request POST https://bigquery.googleapis.com/bigquery/v2/projects/google.com:clouddfe/jobs?prettyPrint=false { "code" : 400, "errors" : [ { "domain" : "global", "message" : "Invalid path: /Users/johnjcasey/vc/beam-java/sdks/java/io/google-cloud-platform/null/bq_it_temp/BigQueryWriteTemp/beam_bq_job_LOAD_testpipelinejohnjcasey0804135744c736b253_9d89ed77264a49a3a5d5d94dca3004c4/b7a2b0b3-859e-4869-957f-a41375fb5d96", "reason" : "invalid" } ], "message" : "Invalid path: /Users/johnjcasey/vc/beam-java/sdks/java/io/google-cloud-platform/null/bq_it_temp/BigQueryWriteTemp/beam_bq_job_LOAD_testpipelinejohnjcasey0804135744c736b253_9d89ed77264a49a3a5d5d94dca3004c4/b7a2b0b3-859e-4869-957f-a41375fb5d96", "status" : "INVALID_ARGUMENT" }

which would appear to be after any destination resolution

johnjcasey avatar Aug 04 '22 16:08 johnjcasey

That sounds like your tempLocation is not valid. Let me see if I can reproduce your error and update the code.

dartiga avatar Aug 04 '22 16:08 dartiga

In order to fix your issue you might need to provide a valid GS bucket (where you should have permissions for writing) when setting tempLocation option.

So either you replace

options.setTempLocation(options.getTempRoot() + "/bq_it_temp");

with something like:

options.setTempLocation("gs://<bucket>/bq_it_temp");

Or you run the test specifying --tempRoot=gs://<bucket> in the command line.

dartiga avatar Aug 04 '22 17:08 dartiga

Beam ITs follow the same approach https://github.com/apache/beam/blob/04f49848d4b037a5935036927e54c8eb8ed8c361/sdks/java/io/google-cloud-platform/build.gradle#L186

dartiga avatar Aug 04 '22 17:08 dartiga

I'm replicating your issue now and the issue is what I'm seeing is what I thought.

The destination that is being passed in is already a TableDestination, which causing the class cast exception

johnjcasey avatar Aug 04 '22 17:08 johnjcasey

@ahmedabu98 this is the BQ issue.

johnjcasey avatar Aug 04 '22 17:08 johnjcasey

The write behaves pretty strangely. Here's what I found:

The values do end up being written into BQ but a new table is created for each value: image

This is for FILE_LOADS. I'm not observing the same behavior when switching the write method to STREAMING_INSERTS. Currently trying with STORAGE_WRITE_API though I'm running into separate issues there.

ahmedabu98 avatar Aug 04 '22 20:08 ahmedabu98

That's because UpdateSchemaDestination is used in BatchLoads only. It was introduced in 2.40.

As @johnjcasey mentioned, our custom DynamicDestinations expects String to be received, while UpdateSchemaDestination assumes TableDestination instances always.

These two will fail for sure.

dartiga avatar Aug 04 '22 22:08 dartiga

Looks like the user-specified DestinationT type is first replaced with TableDestination during the write here (DestinationT in, TableDestination out).

The WriteTables DoFn returns TableDestination types instead of DestinationT, which is carried downstream to UpdateSchemaDestinations.

ahmedabu98 avatar Aug 05 '22 22:08 ahmedabu98

.take-issue

ahmedabu98 avatar Aug 08 '22 16:08 ahmedabu98

With the changes in #22624 I was able to write successfully with the following pipeline:

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.checkerframework.checker.nullness.qual.Nullable;
 
public class UpdateSchemaDestinationWrite {
 
  static final String PROJECT_AND_DATASET_ID = "<myproject>:<mydataset>.";
 
  private static final class SomeDynamicDestinations
      extends DynamicDestinations<KV<String, String>, String> {
 
    private static final long serialVersionUID = 1L;
 
    @Override
    public String getDestination(@Nullable ValueInSingleWindow<KV<String, String>> element) {
      return element.getValue().getKey();
    }
 
    @Override
    public TableDestination getTable(String destination) {
      return new TableDestination(PROJECT_AND_DATASET_ID + destination, "a table");
    }
 
    @Override
    public @Nullable TableSchema getSchema(String destination) {
      return new TableSchema()
          .setFields(Arrays.asList(new TableFieldSchema().setName("name").setType("STRING")));
    }
  }
 
  public static void main(String args[]) {
 
    BigQueryOptions bqOptions = PipelineOptionsFactory.fromArgs(args).as(BigQueryOptions.class);
    Pipeline pipeline = Pipeline.create(bqOptions);
 
    DynamicDestinations<KV<String, String>, String> destinations = new SomeDynamicDestinations();
 
    PCollection<KV<String, String>> rows =
        pipeline.apply(Create.of(KV.of("my_table", "hi"), KV.of("my_table", "hello")));
 
    rows.apply(
        BigQueryIO.<KV<String, String>>write()
            .withFormatFunction(kv -> new TableRow().set("name", kv.getValue()))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMaxBytesPerPartition(1)
            .to(destinations));
 
    pipeline.run().waitUntilFinish();
  }
}

ahmedabu98 avatar Aug 08 '22 20:08 ahmedabu98