beam
beam copied to clipboard
Generate external transform wrappers using a script
Implementing a script that generates wrappers for external SchemaTransforms, according to Option #3 in the following design doc: https://s.apache.org/autogen-wrappers
The script's workflow takes place in setup.py, which can be invoked for local setup or for building the SDK for a Beam release. Files are generated in a subdirectory that is ignored by git. From there, the wrappers can be imported into relevant __init__.py files.
Wrappers are generated along with any documentation provided from the underlying SchemaTransform, and is in compliance with existing linting and pydoc rules.
Includes documentation for how to use the script. Includes unit tests for different parts of the script.
Also adding a gradle command ./gradlew generateExternalTransformWrappers to build the configs and generate wrappers from scratch. This takes care of building the relevant expansion jars beforehand.
Also adding a PreCommit that generates the transform config from scratch and compares it with the existing. This serves to indicate whether a change will render the existing config out of sync. To resolve, one would re-generate the config (with ./gradlew generateExternalTransformWrappers) and commit the changes.
With the expansion service YAML config, the following transform YAML config is generated:
Transform YAML config
- default_service: sdks:java:io:expansion-service:shadowJar
description: 'Outputs a PCollection of Beam Rows, each containing a single INT64
number called "value". The count is produced from the given "start"value and either
up to the given "end" or until 2^63 - 1.
To produce an unbounded PCollection, simply do not specify an "end" value. Unbounded
sequences can specify a "rate" for output elements.
In all cases, the sequence of numbers is generated in parallel, so there is no
inherent ordering between the generated values'
destinations:
python: apache_beam/io
fields:
end:
description: The maximum number to generate (exclusive). Will be an unbounded
sequence if left unspecified.
nullable: true
type: numpy.int64
rate:
description: Specifies the rate to generate a given number of elements per a
given number of seconds. Applicable only to unbounded sequences.
nullable: true
type: Row(seconds=typing.Union[numpy.int64, NoneType], elements=<class 'numpy.int64'>)
start:
description: The minimum number to generate (inclusive).
nullable: false
type: numpy.int64
identifier: beam:schematransform:org.apache.beam:generate_sequence:v1
name: GenerateSequence
From this config, external transform wrappers are created and stored in appropriate modules. For example, our config gives us the following apache_beam/transforms/xlang/io.py file:
GenerateSequence wrapper
# NOTE: This file contains autogenerated external transform(s)
# and should not be edited by hand.
# Refer to gen_xlang_wrappers.py for more info.
"""Cross-language transforms in this module can be imported from the
:py:mod:`apache_beam.io` package."""
# pylint:disable=line-too-long
from apache_beam.transforms.external import BeamJarExpansionService
from apache_beam.transforms.external_transform_provider import ExternalTransform
class GenerateSequence(ExternalTransform):
"""
Outputs a PCollection of Beam Rows, each containing a single INT64 number
called "value". The count is produced from the given "start" value and either
up to the given "end" or until 2^63 - 1.
To produce an unbounded PCollection, simply do not specify an "end" value.
Unbounded sequences can specify a "rate" for output elements.
In all cases, the sequence of numbers is generated in parallel, so there is no
inherent ordering between the generated values
"""
identifier = "beam:schematransform:org.apache.beam:generate_sequence:v1"
def __init__(self, start, end=None, rate=None, expansion_service=None):
"""
:param start: (numpy.int64)
The minimum number to generate (inclusive).
:param end: (numpy.int64)
The maximum number to generate (exclusive). Will be an unbounded
sequence if left unspecified.
:param rate: (Row(seconds=typing.Union[numpy.int64, NoneType], elements=<class 'numpy.int64'>))
Specifies the rate to generate a given number of elements per a given
number of seconds. Applicable only to unbounded sequences.
"""
self.default_expansion_service = BeamJarExpansionService(
"sdks:java:io:expansion-service:shadowJar")
super().__init__(
start=start, end=end, rate=rate, expansion_service=expansion_service)
Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:
R: @jrmccluskey for label python. R: @damccorm for label build.
Available commands:
stop reviewer notifications- opt out of the automated review toolingremind me after tests pass- tag the comment author after tests passwaiting on author- shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
The PR bot will only process comments in the main thread (not review comments).
R: @robertwb @tvalentyn @chamikaramj
(manually requesting to make the review bot happy)
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control
friendly ping @chamikaramj @robertwb
Thanks for looking @chamikaramj. I addressed these comments, PTAL
You can make the generated transforms more discoverable by importing them in __init.py__ files. that might generating an additional file(s) _external_transforms.py, and importing _external_transforms.py in __init__.py . That would make it easier for users to reference particular transforms without importing them from _et modules.
So would users be expected to import these transforms from the _et paths directly? One transform per module (which is not how the others are typically arranged, e.g. with the Read and the Write in the same place)? I am also a bit wary of scattering these files all over the place (for both tooling and human reasons).
What if instead they were in a single subdirectory (like with protos) and on import this would then dynamically insert any specifically placed transforms into the right modules (which could include modules that already exist)?
What if instead they were in a single subdirectory (like with protos)
I actually also thought about it and hesitated to suggest only since it felt a bit late in the game to do so.
So would users be expected to import these transforms from the _et paths directly
Having generated files in one directory would also require updating / maintaining only one __init__.py file to facilitate the imports.
This seems to be under fairly active work, and doesn't feel quite like a release blocker (new functionality, not a regression, not a bug fix etc).
Is there a compelling reason this should be in the 2.54.0 release instead of getting more time to bake and be in the 2.55.0 release instead? (There's still time, as I have other cherry picks to make tommorrow, and other issues to triage).
Thank you @lostluck, yes best to let it bake and get it in the next release
What if instead they were in a single subdirectory (like with protos)
I like this alternative more actually. I've made some changes to instead generate wrappers inside the apache_beam.transforms._external_transforms package. Inside there, each destination gets its own file (e.g. transforms with destination apache_beam/io will be written to apache_beam/transforms/_external_transforms/io.py).
Then we can make these transforms available to the apache_beam/io package by importing everything from _external_transforms/io.py.
@robertwb @tvalentyn let me know if this is what you had in mind
Having generated files in one directory would also require updating / maintaining only one init.py file to facilitate the imports.
Yeah I'd rather do this manually because I'm wary about altering existing files with a script.
I actually also thought about it and hesitated to suggest only since it felt a bit late in the game to do so.
Not too late! The feedback helps to get this in a good shape before we release it to users
Successful PreCommit run: https://github.com/ahmedabu98/beam/actions/runs/7714357594/job/21026416076
R: @tvalentyn R: @robertwb
Ready for another review!
i think my comments were addressed. LGTM from my side as long as tests pass.
actually, i tried running gradlew generateExternalTransformWrappers in a clean environment and it failed.
I think the intent of that command should be to regenerate the yaml portion only, but not generate the python wrappers. Wrappers should be generated from the yaml config, when setup.py is called.
you could consider extracting the portion that generates the yaml into a separate script. If you keep the script in one file that can be fine too, but then generating the yaml portion (from gradle) shouldn't require installing jinja.
Running ./gradlew generateExternalTransformWrappers fails on my side because of this line in apache_beam.io: from apache_beam.transforms.xlang.io import *. It fails because the script initially cleans up the generated modules, then imports apache_beam to rediscover + regenerate the config. At import though, apache_beam.transforms.xlang.io doesn't exist because it's been previously cleaned up.
I think we need to put the import back in the try:, except: pass block?
you could consider extracting the portion that generates the yaml
Makes sense to have the gradle command only generate the config. Then each remote SDK can have its own command to generate wrappers from there (in the future we can sense to combine them all in one command if it makes sense).
We can still have the script in one file, I'll just move imports around to the relevant portions.
Ah, yes, that's right. Hopefully the file generation itself isn't that slow.
On Mon, Feb 12, 2024 at 3:02 PM Ahmed Abualsaud @.***> wrote:
@.**** commented on this pull request.
In sdks/python/setup.py https://github.com/apache/beam/pull/29834#discussion_r1486898981:
# if exists, this directory will have at least its __init__.py file
if (not os.path.exists(generated_transforms_dir) orlen(os.listdir(generated_transforms_dir)) <= 1):message = 'External transform wrappers have not been generated 'if not script_exists:message += 'and the generation script `gen_xlang_wrappers.py`'if not config_exists:message += 'and the standard external transforms config'message += ' could not be found'warnings.warn(message)else:warnings.warn('Skipping external transform wrapper generation as they ''are already generated.')return- out = subprocess.run([
This will make pretty much every run of setup.py slow, right?
Not anymore, since we decided to decouple the generation steps. setup.py expects the transform config to already exist, so no expansion + discovery is done here. All it does is generate files based on the existing config.
— Reply to this email directly, view it on GitHub https://github.com/apache/beam/pull/29834#discussion_r1486898981, or unsubscribe https://github.com/notifications/unsubscribe-auth/AADWVAJPRS6PTJHJGTPTHE3YTKNPZAVCNFSM6AAAAABA4ZG2QSVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMYTQNZWGUYTSNZSGA . You are receiving this because you were mentioned.Message ID: @.***>
I think we need to put the import back in the try:, except: pass block?
That will work, alternatively, you can have dedicated functions: to generate config, and to generate wrappers, and make necessary imports only in functions where imports are required; you might have to add # pylint: disable=g-import-not-at-top.
@tvalentyn this is already the case, the steps are in different functions. e.g. we're only importing apache_beam when using the function to generate transform configs.
But this function is unusable in a clean setup (one that doesn't have generated modules) unless we make imports like from apache_beam.transforms.xlang.io import * optional.
The newly added precommit that performs a transform config sync check (beam_PreCommit_Xlang_Generated_Transforms) is running green on my fork: https://github.com/ahmedabu98/beam/actions/runs/7933231703/
Will merge after current tests pass