[Feature Request]: Enable withFormatRecordOnFailureFunction() equivalent for BigQuery STORAGE_WRITE_API
What would you like to happen?
We have a dataflow pipeline that reads data from PubSub and writes to BigQuery.
Current status: BigQuery write method: STREAMING_INSERTS Function used to get BigQuery deadletter: getFailedInsertsWithErr() Deadletter format function: withFormatRecordOnFailureFunction()
We have a pipeline that writes multiple events dynamically to different BQ destination tables. withFormatRecordOnFailureFunction() is currently used to transform the bad inserts to the desired format for further deadletter processing - rather than return the original TableRow itself, we provide a customized function encoding the returned TableRow object by adding eventType field thus we can figure out which table it writes to.
As we are enhancing the pipeline by using STORAGE_WRITE_API, we are facing the below issue.
BigQuery write method: STORAGE_WRITE_API Function used to get BigQuery deadletter: getFailedStorageApiInserts() (as getFailedInsertsWithErr() cannot be used for STORAGE_WRITE_API) Deadletter format function: N/A
Without a withFormatRecordOnFailureFunction() equivalent, we cannot format the failed inserts TableRows which is a blocker for our upgrade.
How this might work: Add withFormatRecordOnFailureFunction() equivalent for BigQuerySTORAGE_WRITE_API
As we are dynamically writing to BigQuery tables, the Pcollection may contains multiple eventTypes, we will lose the eventType info if the failure transformation function cannot be added to those failure inserts.
Issue Priority
Priority: 2 (default / most feature requests should be filed as P2)
Issue Components
- [ ] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
.take-issue
There are some difficulties for making such withFormatRecordOnFailureFunction() equivalent for BigQuerySTORAGE_WRITE_API:
- StorageApi has different implementations:
-
- BeamRow implementation that transforms original
ElementTtoRow
- BeamRow implementation that transforms original
-
- Proto implementation that transforms original
ElementTtobyte[]
- Proto implementation that transforms original
-
- Avro implementation that transforms original
ElementTtoGenericRecord
- Avro implementation that transforms original
-
- TableRow implementation that transforms original
ElementTtoTableRow
- TableRow implementation that transforms original
- After that element to payload transformation, at the write records step, different
BigQueryStorageApiInsertErrors may occur (e.g. payload too large, etc.) - At this point (for any implementation), payload will be transformed to
TableRowin order to output tofailedRowsTag. - So it makes no sense to have
withFormatRecordOnFailureFunction()fromElementTtoTableRow(as it is implemented for STREAMING_INSERTS method), because at this step we only have a payload - There is a possibility to add
withFormatRecordOnFailureFunction()fromTableRowtoTableRow, if it makes sense
@sarinasij Could you please share an example how do you use withFormatRecordOnFailureFunction() with STREAMING_INSERTS method? I think it may help to clarify next steps
@Amar3tto
Below is our current usage description:
To facilitate the processing of failed records, we require the original AvroGenericRecordMessage for further deadletter handling. It's important to note that the AvroGenericRecordMessage contains more information than the TableRow in BigQuery. Specifically, it includes the eventType, which is not part of the table columns but is crucial for dead letter metrics.
To address this, we utilize the withFormatRecordOnFailureFunction() to construct a dummy TableRow that can be decoded back into the original AvroGenericRecordMessage.
return BigQueryIO.<AvroGenericRecordMessage>write()
.to(input -> getTableDestination(input, outputTableProject, outputTableDataset, outputTableMap))
.withMethod(STORAGE_WRITE_API)
.withTriggeringFrequency(Duration.standardSeconds(bqWindowInSec))
.withNumStorageWriteApiStreams(numStreams)
.withFormatFunction(AvroGenericRecordToBigQuery::formatAvroToTableRow)
.withFormatRecordOnFailureFunction(AvroGenericRecordToBigQuery::formatAvroToFailedTableRow)
.withCreateDisposition(CREATE_NEVER)
.withWriteDisposition(WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors());
private static TableRow formatAvroToFailedTableRow(final AvroGenericRecordMessage message) {
// Build a dummy table row containing the original Avro payload so it can be later used to
// dead letter the event. Unfortunately, we need to encode this into a TableRow since the BigQueryIO
// module doesn't provide any other way to get back to the original AvroGenericRecordMessage object
// that failed insertion.
return FormatBigQueryDeadLetters.**encodeAvroRecordToTableRow**(message);
}
public static TableRow encodeAvroRecordToTableRow(AvroGenericRecordMessage record) {
var row = new TableRow();
row.set(BQ_TABLEROW_COLUMN_EVENT_NAME, record.getEventType());
row.set(BQ_TABLEROW_COLUMN_PAYLOAD, Base64.getEncoder().encodeToString(record.getBinaryEncoding()));
return row;
}
With the case I would think "add withFormatRecordOnFailureFunction() from TableRow to TableRow" might not work since we need additional info for the failed rows (which is from the original AvroGenericRecordMessage). Please kindly advise how we can implement the same with STORAGE_WRITE_API method.