beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: Dataflow fails to drain a job when using BigQuery (java sdk v.2.38)

Open vbshnsk opened this issue 2 years ago • 0 comments

What happened?

Experiencing a weird BigQuery error without any traces of what exactly is a problem. This only occurs when we start draining the job, I don't think that we experience any problems when actually processing the collections.

JSON payload for the error message is as follows (censored some stuff ykyk):

exception: "java.lang.NullPointerException: Both parameters are null
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull(MoreObjects.java:61)
	at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.finalizeStream(StorageApiWritesShardedRecords.java:516)
	at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.onWindowExpiration(StorageApiWritesShardedRecords.java:546)
"
job: "2022-07-25_07_47_46-13052079299893341614"
logger: "org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker"
message: "Execution of work for computation 'P33' on key '<tenant_id_here>' failed with uncaught exception. Work will be retried locally."
stage: "P33"
thread: "519795"
work: "22dd1724b5cfa5bb-1085e5c9eb8a571d"
worker: "<worker_here>"

I am pretty sure that we are handling the errors when processing, so I am lost with what might happen during the drain :(

Issue Priority

Priority: 1

Issue Component

Component: io-java-gcp

vbshnsk avatar Aug 09 '22 13:08 vbshnsk

Bump.

Also, this is our BigQueryIO config:

BigQueryIO
            .write<T>()
            .to(dynamicDestination)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            .withoutValidation()
            .withFormatFunction(formatFunction)
            .optimizedWrites()
            .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
            .withSchemaUpdateOptions(
                setOf(
                    BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
                    BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_RELAXATION
                )
            )
            .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)

vbshnsk avatar Aug 15 '22 07:08 vbshnsk

This has been updated in Beam 2.39, please update to that version or higher

johnjcasey avatar Aug 24 '22 17:08 johnjcasey