scio icon indicating copy to clipboard operation
scio copied to clipboard

Methods from saveAsBigQuery category don't expose crucial Beam BigQueryIO.writers parameters

Open mkuthan opened this issue 4 years ago • 3 comments

I would like to discuss API of methods from saveAsBigQuery category. The methods provide very high level of abstraction for underlaying Beam API. In general this is expected behaviour to keep Scio API much more friendly than original Beam API but sometimes some more sophisticated setup is really needed, e.g:

  • to force FILE_LOADS write method for unbounded collection
  • to define triggeringFrequency and numFileShards for FILE_LOADS
  • to define failedInsertRetryPolicy for STREAMING_INSERTS
  • to enable or disable Avro logical types
  • perhaps much more other use-cases for each BigQueryIO.writer configuration knob

For sure, there is a nice Scio API extension point "saveAsCustomOutput" but it would be useful to have more specific saveAsBigQuery methods with access to underlaying Beam BigQueryIO. Not easy task, because the BigQueryIO API is let't say far from concise design - you can build writer in 100 different ways but in 95 you will get error during pipeline startup (e.g this parameter is also required if that parameter is used).

I prepared some minimal abstraction for "saveAsCustomOutput" and it has simplified my pipelines a lot. Please look at the code as a good starting point for further discussion. https://github.com/mkuthan/beam-examples/blob/master/src/main/scala/org/mkuthan/beam/examples/bigquery/SaveSpecificRecord.scala

Thanks for Scio, it saves a lot of my time!

mkuthan avatar Sep 21 '20 17:09 mkuthan

Yeah this is one area the 2 styles don't mix well, i.e. default arguments vs the ever growing Beam builder setters. :shrug:

I don't have a better idea for this. One thing we could possibly do is adding a f: BigQueryIO => BigQueryIO => for every read/write sugar, so that users can set additional params via the lambda, but it'll also make it easy to shoot oneself in the foot. Also there might be IOs with specialized types like {Generic,Specific}RecordAvroIO, not sure how well they fit in here.

Thoughts?

nevillelyh avatar Sep 21 '20 19:09 nevillelyh

Thanks for quick response! Any advantages of exposing f: BigQueryIO => BigQueryIO => over saveAsCustomOutput? Testability? Specialized IOs are useful, for different types different formatters / converters needs to be provided. I also found that insert methods (file_load vs. streaming_insert) requires very different set of options and it might be explicitly mapped to the API.

mkuthan avatar Sep 23 '20 10:09 mkuthan

I think exposing a lambda specifically for configuring options will make the api more extensible and compatible with Beam.. (beam might add more setters, and it would mean Scio adding more functions in a builder rather than changing the function signature).

What we already have is a dependent WriteParam and ReadParam in ScioIO[T] which has concrete types defined in the specific IOs, that make the ScioIO layer extensible, but not the user facing API. I am thinking if the params are exposed as a separate trait to the user facing API it will make the configurations more extendible.

Here is what I was thinking: Introduce a new trait for ScioIoOptions somewhat similar to the {Read, Write}Param in ScioIO

trait ScioIoOptions[T, IO <: ScioIO[_]] {

}

And then have this as an implicit in the user facing api. (Much like a typeclass)

def saveToBigQuery(tableSpec: Spec)(implicit options: BqIoOptions[T]) = {
...
}

Have the options defined with an implicit apply for default args, and builder style methods for overriding configurations.

sealed trait BqIoOptions[T] extends ScioIoOptions[T, BigQueryIO] {
  def withOption1: BqIoOptions[T]  = ???
  def withOption2(value..): BqIoOptions[T] = ???
}

object BqIoOptions {
  // default BqIoOptions
  implicit def apply[T]: BqIoOptions[T] = new BqIoOptions {
    override def withOption1 = ???
    override def withOption2 = ???
  }
}

usage:


.saveToBigQuery(Table.Spec(...)) // default

.saveToBigQuery(Table.Spec(...))(BqIoOptions.withOption1.withOption2(..)) // override options.

anish749 avatar Sep 30 '20 10:09 anish749

There will be a configOverride param for this matter. Other missing params have been added to 0.13

RustedBones avatar Jun 12 '23 14:06 RustedBones