scio icon indicating copy to clipboard operation
scio copied to clipboard

Refactor BQ to expose all beam's configurations

Open RustedBones opened this issue 1 year ago • 2 comments

Here are the main changes:

  • the BQ Table source has a single normalized definition, with multiple constrictors (form string spec or TableReference). It nows includes an optional Table.Filter that can be used is the storage read API to project and filter.

  • read API changes with

API method returned type
bigQuerySelect export TableRow
bigQuerySelectFormat export T
bigQueryTable export TableRow
bigQueryTableFormat export T
bigQueryStorage direct TableRow
bigQueryStorageFormat direct T
typedBigQuery export T
typedBigQueryStorage direct T

Format API take a BigqueryIO.Format object allowing to convert either from GenericRecord (this should be prefered) or TableRow

  • The Storage Api allow to pass an ErrorHandler (Fix #5530). In order to preserve a flat structure ScioContext.errorSink(): ErrorSink has been added. This allow to do the following
val errorSink = sc.errorSink()
sc.bigQueryStorageFormat[MyType](
  table,
  format,
  errorHandler = errorSink.handler
)
val errors: SCollection[BadRecord] = errorSink.sink()

The handler can be passed to multiple IOs before sink is materialized. The sink will flatten the errors from the IOs.

RustedBones avatar Aug 20 '24 08:08 RustedBones