airbyte
airbyte copied to clipboard
Destination BigQuery: `createTableIfNotExists` does not check if table exists prior to creating table
Tell us about the problem you're trying to solve
Currently, Destination BigQuery will call the method createTableIfNotExist
, however, there's no check for whether the table with the associated tableId
exists before attempting to create the table. This results in connectors using BigQuery staging to see errors that a table "ALREADY EXISTS" despite not seeing this message in the past
Describe the solution you’d like
Get the Table object if it exists. BigQuery already has documentation on how to do this here and ensure a table does not exist prior to creating a table
Describe the alternative you’ve considered or used
A clear and concise description of any alternative solutions or features you've considered or are using today.
Additional context
Add any other context or screenshots about the feature request here.
Are you willing to submit a PR?
Remove this with your answer :-)
I am seeing this stack trace a lot recently. The jobs continue after these errors, so they are likely a red herring. The problem is that createTableIfNotExists
in BigQueryGcsOperations
method does not check if a table exists.
@Override
public void createTableIfNotExists(final TableId tableId, final Schema tableSchema) {
LOGGER.info("Creating target table {}", tableId);
BigQueryUtils.createPartitionedTable(bigQuery, tableId, tableSchema);
}
2023-01-31 20:25:43 [43mdestination[0m > Creating dataset airbyte_stripe_main
2023-01-31 20:25:43 [43mdestination[0m > Creating target table GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=airbyte_stripe_main, tableId=_airbyte_raw_transfers}}
2023-01-31 20:25:43 [43mdestination[0m > Partitioned table was not created: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=airbyte_stripe_main, tableId=_airbyte_raw_transfers}}
Stack Trace: com.google.cloud.bigquery.BigQueryException: Already Exists: Table data-warehouse-355910:airbyte_stripe_main._airbyte_raw_transfers
at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:115)
at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.create(HttpBigQueryRpc.java:187)
at com.google.cloud.bigquery.BigQueryImpl$2.call(BigQueryImpl.java:291)
at com.google.cloud.bigquery.BigQueryImpl$2.call(BigQueryImpl.java:288)
at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:105)
at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
at com.google.cloud.bigquery.BigQueryImpl.create(BigQueryImpl.java:287)
at io.airbyte.integrations.destination.bigquery.BigQueryUtils.createPartitionedTable(BigQueryUtils.java:223)
at io.airbyte.integrations.destination.bigquery.BigQueryGcsOperations.createTableIfNotExists(BigQueryGcsOperations.java:103)
at io.airbyte.integrations.destination.bigquery.BigQueryStagingConsumerFactory.lambda$onStartFunction$3(BigQueryStagingConsumerFactory.java:123)
at io.airbyte.commons.concurrency.VoidCallable.call(VoidCallable.java:15)
at io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.startTracked(BufferedStreamConsumer.java:120)
at io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer.start(FailureTrackingAirbyteMessageConsumer.java:43)
at io.airbyte.integrations.base.IntegrationRunner.consumeWriteStream(IntegrationRunner.java:200)
at io.airbyte.integrations.base.IntegrationRunner.lambda$runConsumer$2(IntegrationRunner.java:208)
at io.airbyte.integrations.base.IntegrationRunner.watchForOrphanThreads(IntegrationRunner.java:237)
at io.airbyte.integrations.base.IntegrationRunner.runConsumer(IntegrationRunner.java:207)
at io.airbyte.integrations.base.IntegrationRunner.runInternal(IntegrationRunner.java:148)
at io.airbyte.integrations.base.IntegrationRunner.run(IntegrationRunner.java:98)
at io.airbyte.integrations.destination.bigquery.BigQueryDestination.main(BigQueryDestination.java:341)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 409 Conflict
POST https://www.googleapis.com/bigquery/v2/projects/data-warehouse-355910/datasets/airbyte_stripe_main/tables?prettyPrint=false
{
"code" : 409,
"errors" : [ {
"domain" : "global",
"message" : "Already Exists: Table data-warehouse-355910:airbyte_stripe_main._airbyte_raw_transfers",
"reason" : "duplicate"
} ],
"message" : "Already Exists: Table data-warehouse-355910:airbyte_stripe_main._airbyte_raw_transfers",
"status" : "ALREADY_EXISTS"
}
at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:428)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:514)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:565)
at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.create(HttpBigQueryRpc.java:185)
... 19 more
putting to the top of the sprint backlog, because these logs are very distracting