beam
beam copied to clipboard
Default translation for SchemaTransforms
To make a PTransform upgradable, we need to write some translation logic for it (see TransformPayloadTranslator). Essentially, this is logic that can:
- translate a transform to its corresponding FunctionSpec proto
- produce a configuration Row from a transform instance
- re-construct the transform using a configuration Row
So far, this has meant handwriting large customized logic for each IO (see translation for BigQueryIO, KafkaIO). While this may be unavoidable for some IOs, the logic can be unified for SchemaTransforms, which already operate using Beam Rows.
With these changes, the SchemaTransformProvider implementation needs to just add one line when building the SchemaTransform:
-
.register(<config row>, <identifier>)
on the SchemaTransform itself, or -
register(<config>, <schematransform>)
after building the SchemaTransform in TypedSchemaTransformProvider.
By storing this information in the transform instance, it can be translated using the SchemaTransformTranslator.
This PR includes a few such improvements, removing the handwritten translation logic for Iceberg(Read/Write)SchemaTransform, Kafka(Read/Write)SchemaTransform, ManagedSchemaTransform. P.S. these were already shorter than usual because of the SchemaTransformPayloadTranslator abstraction introduced in #30910. This PR is a continuation to further simplify it