beam icon indicating copy to clipboard operation
beam copied to clipboard

Default translation for SchemaTransforms

Open ahmedabu98 opened this issue 1 year ago • 6 comments

To make a PTransform upgradable, we need to write some translation logic for it (see TransformPayloadTranslator). Essentially, this is logic that can:

  • translate a transform to its corresponding FunctionSpec proto
  • produce a configuration Row from a transform instance
  • re-construct the transform using a configuration Row

So far, this has meant handwriting large customized logic for each IO (see translation for BigQueryIO, KafkaIO). While this may be unavoidable for some IOs, the logic can be unified for SchemaTransforms, which already operate using Beam Rows.

With these changes, the SchemaTransformProvider implementation needs to just add one line when building the SchemaTransform:

  • .register(<config row>, <identifier>) on the SchemaTransform itself, or
  • register(<config>, <schematransform>) after building the SchemaTransform in TypedSchemaTransformProvider.

By storing this information in the transform instance, it can be translated using the SchemaTransformTranslator.

This PR includes a few such improvements, removing the handwritten translation logic for Iceberg(Read/Write)SchemaTransform, Kafka(Read/Write)SchemaTransform, ManagedSchemaTransform. P.S. these were already shorter than usual because of the SchemaTransformPayloadTranslator abstraction introduced in #30910. This PR is a continuation to further simplify it

ahmedabu98 avatar Jun 10 '24 18:06 ahmedabu98

Could you add a description of what is improved here?

robertwb avatar Jun 10 '24 19:06 robertwb

Added a description. This is ready for review now

ahmedabu98 avatar Jun 10 '24 20:06 ahmedabu98

R: @robertwb R: @chamikaramj R: @kennknowles

ahmedabu98 avatar Jun 10 '24 20:06 ahmedabu98

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

github-actions[bot] avatar Jun 10 '24 20:06 github-actions[bot]

Taking a step back, is the problem that you're trying to solve is that those transforms that come from a [Typed]SchemaTransformProvider we already have a (identifier + configRow) -> [Schema]Transform mapping, and we should be able to automatically derive the inverse [Schema]Transform -> (identifier + configRow) mapping for these transforms?

In this case, it's a bit odd because it seems we can only derive the inverse for the specific instances that we created, not generally for all instances of that class. (But maybe that's good enough sometimes?) Might be cleaner in that case to just have a separate (weakref) mapping (registrar) from PTransform instances to their corresponding configs rather add a register method and private members to [Schema]Transform itself.

Separately it'd be good to make defining this pair of mappings so easy that it's "just how users write PTransforms" (maybe working it into builder patterns or providing a good annotation or something like that) and we get their externalization and semantic graph representation for free. But this is likely a larger project.

robertwb avatar Jun 13 '24 16:06 robertwb

Yep that’s the main problem being solved here. To be clear though, transforms that come from any SchemaTransformProvider (doesn’t have to be [Typed]) already have an identifier and configRow (implicitly) attached. This PR makes this attachment explicit and available for translation.

In this case, it's a bit odd because it seems we can only derive the inverse for the specific instances that we created, not generally for all instances of that class

By design, these SchemaTransforms are always created using the corresponding SchemaTransformProvider.from(configRow). We never instantiate the SchemaTransform directly (there are no public SchemaTransform implementations). So we can safely assume any SchemaTransform instantiation will have come from a SchemaTransformProvider that we created.

I share the aversion towards adding unnecessary things to SchemaTransform though. I’ve been trying to cut things down as I go, but there’s probably still a better solution.

Might be cleaner in that case to just have a separate (weakref) mapping (registrar) from PTransform instances to their corresponding configs

Thanks! Will take a stab at this

ahmedabu98 avatar Jun 17 '24 16:06 ahmedabu98

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

github-actions[bot] avatar Aug 17 '24 12:08 github-actions[bot]

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

github-actions[bot] avatar Aug 24 '24 12:08 github-actions[bot]