beam
beam copied to clipboard
[Bug]: ClassCastException when using custom DynamicDestination in BigQueryIO.Write
What happened?
Beam: 2.40
While using custom DynamicDestination
in BigQueryIO.Write
got the following exception:
java.lang.ClassCastException: org.apache.beam.sdk.io.gcp.bigquery.TableDestination cannot be cast to java.lang.String com.king.da.destinations.KingAppDestinations.getTable(KingAppDestinations.java:17)
org.apache.beam.sdk.io.gcp.bigquery.UpdateSchemaDestination.processElement(UpdateSchemaDestination.java:131)
org.apache.beam.sdk.io.gcp.bigquery.UpdateSchemaDestination$DoFnInvoker.invokeProcessElement(Unknown Source)
Find below test case to reproduce:
import java.io.IOException;
import java.security.SecureRandom;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
@RunWith(JUnit4.class)
public class UpdateSchemaDestinationTest {
static final BigqueryClient BQ_CLIENT = new BigqueryClient(UpdateSchemaDestinationTest.class.getName());
static final String DATASET_ID =
"schema_update_options_class_cast_excepption"
+ System.currentTimeMillis()
+ "_"
+ new SecureRandom().nextInt(32);
static TestBigQueryOptions options = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class);
static Pipeline pipeline;
@BeforeClass
public static void setUpAll() throws IOException, InterruptedException {
options.setTempLocation(options.getTempRoot() + "/bq_it_temp");
pipeline = Pipeline.create(options);
BQ_CLIENT.createNewDataset(options.getProject(), DATASET_ID);
}
@AfterClass
public static void tearDownAll() {
BQ_CLIENT.deleteDataset(options.getProject(), DATASET_ID);
}
@Test
public void classCastExceptionRegression() {
DynamicDestinations<KV<String, String>, String> destinations = new SomeDynamicDestinations();
PCollection<KV<String, String>> rows = pipeline
.apply(Create.of(
KV.of("table","foo"),
KV.of("table","bar")
));
rows.apply(BigQueryIO.<KV<String, String>>write()
.withFormatFunction(kv -> new TableRow().set("name", kv.getValue()))
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withMaxBytesPerPartition(1)
.to(destinations));
pipeline.run().waitUntilFinish();
}
private static final class SomeDynamicDestinations extends DynamicDestinations<KV<String, String>, String> {
private static final long serialVersionUID = 1L;
@Override
public String getDestination(@Nullable ValueInSingleWindow<KV<String, String>> element) {
return element.getValue().getKey();
}
@Override
public TableDestination getTable(String destination) {
return new TableDestination(DATASET_ID + "." + destination, "a table");
}
@Override
public @Nullable TableSchema getSchema(String destination) {
return new TableSchema().setFields(Arrays.asList(new TableFieldSchema().setName("name").setType("STRING")));
}
}
}
Issue Priority
Priority: 1
Issue Component
Component: io-java-gcp
I am not able to replicate this exception with the code you provided.
I suspect the issue is that your dynamicDestinations expects a String argument to the getTable method, but a TableDestination is being passed instead, causing the casting exception
Can you update your replication code to make this reproducible?
I am not able to replicate this exception with the code you provided.
Are you getting any error or just green test? I am able to replicate the issue using the code I posted:
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: org.apache.beam.sdk.io.gcp.bigquery.TableDestination cannot be cast to java.lang.String
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
at com.king.da.UpdateSchemaDestinationTest.classCastExceptionRegression(UpdateSchemaDestinationTest.java:78)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:95)
at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:91)
at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:60)
at org.eclipse.jdt.internal.junit5.runner.JUnit5TestReference.run(JUnit5TestReference.java:98)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:40)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:529)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:756)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:452)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210)
Caused by: java.lang.ClassCastException: org.apache.beam.sdk.io.gcp.bigquery.TableDestination cannot be cast to java.lang.String
at com.king.da.UpdateSchemaDestinationTest$SomeDynamicDestinations.getTable(UpdateSchemaDestinationTest.java:1)
at org.apache.beam.sdk.io.gcp.bigquery.UpdateSchemaDestination.processElement(UpdateSchemaDestination.java:131)
Updated code with missing imports just in case.
I'm running into a separate issue
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request POST https://bigquery.googleapis.com/bigquery/v2/projects/google.com:clouddfe/jobs?prettyPrint=false { "code" : 400, "errors" : [ { "domain" : "global", "message" : "Invalid path: /Users/johnjcasey/vc/beam-java/sdks/java/io/google-cloud-platform/null/bq_it_temp/BigQueryWriteTemp/beam_bq_job_LOAD_testpipelinejohnjcasey0804135744c736b253_9d89ed77264a49a3a5d5d94dca3004c4/b7a2b0b3-859e-4869-957f-a41375fb5d96", "reason" : "invalid" } ], "message" : "Invalid path: /Users/johnjcasey/vc/beam-java/sdks/java/io/google-cloud-platform/null/bq_it_temp/BigQueryWriteTemp/beam_bq_job_LOAD_testpipelinejohnjcasey0804135744c736b253_9d89ed77264a49a3a5d5d94dca3004c4/b7a2b0b3-859e-4869-957f-a41375fb5d96", "status" : "INVALID_ARGUMENT" }
which would appear to be after any destination resolution
That sounds like your tempLocation
is not valid. Let me see if I can reproduce your error and update the code.
In order to fix your issue you might need to provide a valid GS bucket (where you should have permissions for writing) when setting tempLocation
option.
So either you replace
options.setTempLocation(options.getTempRoot() + "/bq_it_temp");
with something like:
options.setTempLocation("gs://<bucket>/bq_it_temp");
Or you run the test specifying --tempRoot=gs://<bucket>
in the command line.
Beam ITs follow the same approach https://github.com/apache/beam/blob/04f49848d4b037a5935036927e54c8eb8ed8c361/sdks/java/io/google-cloud-platform/build.gradle#L186
I'm replicating your issue now and the issue is what I'm seeing is what I thought.
The destination that is being passed in is already a TableDestination, which causing the class cast exception
@ahmedabu98 this is the BQ issue.
The write behaves pretty strangely. Here's what I found:
The values do end up being written into BQ but a new table is created for each value:
This is for FILE_LOADS. I'm not observing the same behavior when switching the write method to STREAMING_INSERTS. Currently trying with STORAGE_WRITE_API though I'm running into separate issues there.
That's because UpdateSchemaDestination
is used in BatchLoads
only. It was introduced in 2.40.
As @johnjcasey mentioned, our custom DynamicDestinations
expects String
to be received, while UpdateSchemaDestination
assumes TableDestination
instances always.
These two will fail for sure.
Looks like the user-specified DestinationT
type is first replaced with TableDestination
during the write here (DestinationT
in, TableDestination
out).
The WriteTables
DoFn returns TableDestination
types instead of DestinationT
, which is carried downstream to UpdateSchemaDestinations
.
.take-issue
With the changes in #22624 I was able to write successfully with the following pipeline:
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.checkerframework.checker.nullness.qual.Nullable;
public class UpdateSchemaDestinationWrite {
static final String PROJECT_AND_DATASET_ID = "<myproject>:<mydataset>.";
private static final class SomeDynamicDestinations
extends DynamicDestinations<KV<String, String>, String> {
private static final long serialVersionUID = 1L;
@Override
public String getDestination(@Nullable ValueInSingleWindow<KV<String, String>> element) {
return element.getValue().getKey();
}
@Override
public TableDestination getTable(String destination) {
return new TableDestination(PROJECT_AND_DATASET_ID + destination, "a table");
}
@Override
public @Nullable TableSchema getSchema(String destination) {
return new TableSchema()
.setFields(Arrays.asList(new TableFieldSchema().setName("name").setType("STRING")));
}
}
public static void main(String args[]) {
BigQueryOptions bqOptions = PipelineOptionsFactory.fromArgs(args).as(BigQueryOptions.class);
Pipeline pipeline = Pipeline.create(bqOptions);
DynamicDestinations<KV<String, String>, String> destinations = new SomeDynamicDestinations();
PCollection<KV<String, String>> rows =
pipeline.apply(Create.of(KV.of("my_table", "hi"), KV.of("my_table", "hello")));
rows.apply(
BigQueryIO.<KV<String, String>>write()
.withFormatFunction(kv -> new TableRow().set("name", kv.getValue()))
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withMaxBytesPerPartition(1)
.to(destinations));
pipeline.run().waitUntilFinish();
}
}