beam
beam copied to clipboard
Default translation for SchemaTransforms
To make a PTransform upgradable, we need to write some translation logic for it (see TransformPayloadTranslator). Essentially, this is logic that can:
- translate a transform to its corresponding FunctionSpec proto
- produce a configuration Row from a transform instance
- re-construct the transform using a configuration Row
So far, this has meant handwriting large customized logic for each IO (see translation for BigQueryIO, KafkaIO). While this may be unavoidable for some IOs, the logic can be unified for SchemaTransforms, which already operate using Beam Rows.
With these changes, the SchemaTransformProvider implementation needs to just add one line when building the SchemaTransform:
.register(<config row>, <identifier>)on the SchemaTransform itself, orregister(<config>, <schematransform>)after building the SchemaTransform in TypedSchemaTransformProvider.
By storing this information in the transform instance, it can be translated using the SchemaTransformTranslator.
This PR includes a few such improvements, removing the handwritten translation logic for Iceberg(Read/Write)SchemaTransform, Kafka(Read/Write)SchemaTransform, ManagedSchemaTransform. P.S. these were already shorter than usual because of the SchemaTransformPayloadTranslator abstraction introduced in #30910. This PR is a continuation to further simplify it
Could you add a description of what is improved here?
Added a description. This is ready for review now
R: @robertwb R: @chamikaramj R: @kennknowles
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control
Taking a step back, is the problem that you're trying to solve is that those transforms that come from a [Typed]SchemaTransformProvider we already have a (identifier + configRow) -> [Schema]Transform mapping, and we should be able to automatically derive the inverse [Schema]Transform -> (identifier + configRow) mapping for these transforms?
In this case, it's a bit odd because it seems we can only derive the inverse for the specific instances that we created, not generally for all instances of that class. (But maybe that's good enough sometimes?) Might be cleaner in that case to just have a separate (weakref) mapping (registrar) from PTransform instances to their corresponding configs rather add a register method and private members to [Schema]Transform itself.
Separately it'd be good to make defining this pair of mappings so easy that it's "just how users write PTransforms" (maybe working it into builder patterns or providing a good annotation or something like that) and we get their externalization and semantic graph representation for free. But this is likely a larger project.
Yep that’s the main problem being solved here. To be clear though, transforms that come from any SchemaTransformProvider (doesn’t have to be [Typed]) already have an identifier and configRow (implicitly) attached. This PR makes this attachment explicit and available for translation.
In this case, it's a bit odd because it seems we can only derive the inverse for the specific instances that we created, not generally for all instances of that class
By design, these SchemaTransforms are always created using the corresponding SchemaTransformProvider.from(configRow). We never instantiate the SchemaTransform directly (there are no public SchemaTransform implementations). So we can safely assume any SchemaTransform instantiation will have come from a SchemaTransformProvider that we created.
I share the aversion towards adding unnecessary things to SchemaTransform though. I’ve been trying to cut things down as I go, but there’s probably still a better solution.
Might be cleaner in that case to just have a separate (weakref) mapping (registrar) from PTransform instances to their corresponding configs
Thanks! Will take a stab at this
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.