beam icon indicating copy to clipboard operation
beam copied to clipboard

Default translation for SchemaTransforms

Open ahmedabu98 opened this issue 8 months ago • 6 comments

To make a PTransform upgradable, we need to write some translation logic for it (see TransformPayloadTranslator). Essentially, this is logic that can:

  • translate a transform to its corresponding FunctionSpec proto
  • produce a configuration Row from a transform instance
  • re-construct the transform using a configuration Row

So far, this has meant handwriting large customized logic for each IO (see translation for BigQueryIO, KafkaIO). While this may be unavoidable for some IOs, the logic can be unified for SchemaTransforms, which already operate using Beam Rows.

With these changes, the SchemaTransformProvider implementation needs to just add one line when building the SchemaTransform:

  • .register(<config row>, <identifier>) on the SchemaTransform itself, or
  • register(<config>, <schematransform>) after building the SchemaTransform in TypedSchemaTransformProvider.

By storing this information in the transform instance, it can be translated using the SchemaTransformTranslator.

This PR includes a few such improvements, removing the handwritten translation logic for Iceberg(Read/Write)SchemaTransform, Kafka(Read/Write)SchemaTransform, ManagedSchemaTransform. P.S. these were already shorter than usual because of the SchemaTransformPayloadTranslator abstraction introduced in #30910. This PR is a continuation to further simplify it

ahmedabu98 avatar Jun 10 '24 18:06 ahmedabu98