Add primitive read translators
Pull Request: Add PrimitiveUnboundedRead/PrimitiveBoundedRead Translators to Flink Runner
Title
[Flink Runner] Add translators for PrimitiveUnboundedRead and PrimitiveBoundedRead
Description
What is this PR doing?
This PR adds explicit translators for SplittableParDo.PrimitiveUnboundedRead and SplittableParDo.PrimitiveBoundedRead to the Flink streaming transform translators. These translators handle the case where Read.Unbounded and Read.Bounded are converted to primitive reads by SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary().
Why is this needed?
The Flink classic runner (non-portable) calls convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary() when NOT using the beam_fn_api experiment. This converts SDF-wrapped reads to PrimitiveUnboundedRead and PrimitiveBoundedRead transforms. However, there were no registered translators for these transforms, causing pipelines using unbounded sources (like KinesisIO.read()) to fail with:
java.lang.IllegalStateException: No translator known for org.apache.beam.sdk.util.construction.SplittableParDo$PrimitiveUnboundedRead
How does it work?
The new translators:
-
PrimitiveUnboundedReadTranslator: Extracts theUnboundedSourcefromPrimitiveUnboundedRead.getSource()and creates aFlinkUnboundedSource, following the same pattern as the existingUnboundedReadSourceTranslator. -
PrimitiveBoundedReadTranslator: Extracts theBoundedSourcefromPrimitiveBoundedRead.getSource()and creates aFlinkBoundedSource, following the same pattern as the existingBoundedReadSourceTranslator.
The key difference from the existing translators is that they retrieve the source directly from the transform (transform.getSource()) rather than using ReadTranslation.unboundedSourceFromTransform(), since PrimitiveUnboundedRead and PrimitiveBoundedRead are not standard Read transforms with URNs.
Changes
-
FlinkStreamingTransformTranslators.java:- Added
PrimitiveUnboundedReadTranslator<T>class - Added
PrimitiveBoundedReadTranslator<T>class - Modified
getTranslator()to check forPrimitiveUnboundedReadandPrimitiveBoundedReadinstances before URN lookup
- Added
-
FlinkStreamingTransformTranslatorsTest.java:- Added
getTranslatorReturnsPrimitiveUnboundedReadTranslator()test - Added
getTranslatorReturnsPrimitiveBoundedReadTranslator()test - Added
primitiveUnboundedReadTranslatorProducesCorrectSource()test - Added
primitiveBoundedReadTranslatorProducesCorrectSource()test
- Added
-
CHANGES.md:- Added bugfix entry for 2.71.0
Issue
Fixes #37035
Created issue: #37035 Related to #20530 (Use SDF read as default)
Checklist
- [x] Code formatted with
./gradlew :runners:flink:1.18:spotlessApply - [x] Unit tests added in
FlinkStreamingTransformTranslatorsTest.java - [x] All Flink runner tests pass (
./gradlew :runners:flink:1.18:test) - [x]
CHANGES.mdupdated and formatted with./gradlew formatChanges - [x] No breaking changes to public API
Testing
Unit Tests
./gradlew :runners:flink:1.18:test
# BUILD SUCCESSFUL - all tests pass
Integration Testing
Tested on AWS Managed Apache Flink with a real pipeline using KinesisIO.read():
- Before fix: Pipeline fails during translation with "No translator known for PrimitiveUnboundedRead"
- After fix: Pipeline successfully translates and runs, reading records from Kinesis
Test environment:
- AWS Managed Apache Flink (FLINK-1_18 runtime)
- Apache Beam 2.71.0-SNAPSHOT (with this fix)
- KinesisIO.read() connector
Backwards Compatibility
This change is fully backwards compatible:
- No public API changes: Only internal translator classes are added
- No behavior changes for existing code: The new translators only activate when
PrimitiveUnboundedReadorPrimitiveBoundedReadtransforms are present (which previously caused failures) - Existing URN-based translation unchanged: Standard
Read.BoundedandRead.Unboundedwith URNs continue to use the existingReadSourceTranslator
Performance
No performance impact expected. The new translators use the same FlinkUnboundedSource and FlinkBoundedSource implementations as the existing translators.
Summary of Changes
Hello @dnamaz, I'm Gemini Code Assist[^1]! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request resolves a critical issue in the Flink classic runner where pipelines using certain read operations, specifically those internally converted to "PrimitiveUnboundedRead" or "PrimitiveBoundedRead" by "SplittableParDo", would fail during translation due to the absence of appropriate translators. By implementing dedicated translators for these primitive read types, the change ensures that such pipelines, including those leveraging unbounded sources like KinesisIO, can now execute successfully, thereby enhancing the robustness and compatibility of the Flink runner.
Highlights
- Flink Runner Bug Fix: Addresses an "IllegalStateException" in the Flink classic runner when using unbounded sources (e.g., KinesisIO.read()) without the "beam_fn_api" experiment, caused by missing translators for "PrimitiveUnboundedRead" and "PrimitiveBoundedRead" transforms.
- New Translators Added: Introduces "PrimitiveUnboundedReadTranslator" and "PrimitiveBoundedReadTranslator" to handle the conversion of "Read.Unbounded" and "Read.Bounded" into their primitive forms by "SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary()".
- Translation Mechanism: The new translators directly extract the "UnboundedSource" or "BoundedSource" from the respective primitive read transform, bypassing the URN-based translation used for standard "Read" transforms.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in pull request comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with :thumbsup: and :thumbsdown: on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
[^1]: Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers
assign set of reviewers
There is a problem with one of the tests, it is happening in this file:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java
on line 302,
public static <T> Block<T> fromValues(List<T> values, @Nullable ByteString nextToken) { if (values.isEmpty() && nextToken == null) { return emptyBlock(); } ImmutableList<T> immutableValues = ImmutableList.copyOf(values); long listWeight = immutableValues.size() * Caches.REFERENCE_SIZE; for (T value : immutableValues) { listWeight = LongMath.saturatedAdd(listWeight, Caches.weigh(value)); } return fromValues(immutableValues, listWeight, nextToken); }
will throw a null pointer exception if the value is null. I believe this is causing the issue with the PR check failing in the test.
testSideInputWithNullDefault,
should this be fixed and expected in the test or modify the collection to use a null safe list? should it be part of this PR or create another bug? Does this currently happen in other PR checks? Am i missing something?
The call chain is: ViewTest.processElement (line 824) calls c.sideInput(view) → FnApiDoFnRunner.sideInput() → FnApiStateAccessor.get() → SingletonViewFn2.apply() iterates through the side input → CachingStateIterator.hasNext() loads the next block → Block.fromValues() at line 315 - calls ImmutableList.copyOf() which fails on null
/gemini review
/gemini review
/gemini review
/gemini summary
Summary of Changes
This pull request addresses a critical bug in the Flink classic runner where pipelines utilizing certain read operations, specifically those internally converted to PrimitiveUnboundedRead or PrimitiveBoundedRead by SplittableParDo, would fail during translation due to the absence of appropriate translators. By introducing dedicated translators and refactoring common source translation logic, this change ensures that such pipelines, including those using unbounded sources like KinesisIO, can now execute successfully, significantly improving the robustness and compatibility of the Flink runner.
Highlights
- Fix for Flink Classic Runner: Resolves an
IllegalStateExceptionin the Flink classic runner whenPrimitiveUnboundedReadorPrimitiveBoundedReadtransforms are encountered, particularly with unbounded sources like KinesisIO, due to missing translators. - New Translators Introduced: Adds
PrimitiveUnboundedReadTranslatorandPrimitiveBoundedReadTranslatorto explicitly handle primitive read transforms generated bySplittableParDo's conversion logic. - Refactored Source Translation: Common logic for translating unbounded and bounded sources has been extracted into dedicated helper methods (
translateUnboundedSource,translateBoundedSource) to reduce code duplication and improve maintainability. - Enhanced Test Coverage: New unit tests have been added to verify the correct functioning and source production of the newly introduced primitive read translators.
Changelog
- CHANGES.md
- Added a bugfix entry for Flink classic runner failing with "No translator known for PrimitiveUnboundedRead" when using unbounded source connectors like KinesisIO after SDF-to-primitive-read conversion.
- runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
- Introduced
PrimitiveUnboundedReadTranslator<T>andPrimitiveBoundedReadTranslator<T>classes to handle primitive read transforms. - Modified the
getTranslator()method to prioritize lookup forPrimitiveUnboundedReadandPrimitiveBoundedReadinstances. - Extracted common source translation logic into new private static helper methods:
translateUnboundedSourceandtranslateBoundedSource. - Extracted a private static helper method
getSourceParallelismto determine source operator parallelism. - Refactored
UnboundedReadSourceTranslatorandBoundedReadSourceTranslatorto utilize the new common translation helper methods.
- Introduced
- runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java
- Added new unit tests:
getTranslatorReturnsPrimitiveUnboundedReadTranslator,getTranslatorReturnsPrimitiveBoundedReadTranslator,primitiveUnboundedReadTranslatorProducesCorrectSource, andprimitiveBoundedReadTranslatorProducesCorrectSource. - Introduced a new helper method
applyReadSourceTransformWithTranslatorfor testing purposes.
- Added new unit tests:
Activity
- A bot provided an initial summary of the pull request.
- The bot noted that checks were failing and would not request a review.
- The author identified a potential NullPointerException in
StateFetchingIterators.javaaffecting a test (testSideInputWithNullDefault) and sought guidance on whether to fix it in this PR or a new one. - The author issued multiple
/gemini reviewcommands and a/gemini summarycommand. - A bot review highlighted that the
PrimitiveBoundedReadTranslatorwas missing important logic (try-catch,.returns(), batch mode handling, consistentstepName) present inBoundedReadSourceTranslator. - A bot review suggested replacing the
XXXXXplaceholder inCHANGES.mdwith the actual issue number37035. - A bot review recommended extracting common logic from
UnboundedReadSourceTranslatorandPrimitiveUnboundedReadTranslatorinto a shared helper method. - A bot review suggested extracting the source parallelism determination logic into a private helper method to reduce duplication.
- A bot review suggested extracting the logic for retrieving the single output
PCollectioninto a helper method, to which the author responded, arguing against over-abstraction for a 2-3 line duplication, and the bot acknowledged the valid point.
assign set of reviewers
Assigning reviewers:
R: @shunping added as fallback since no labels match configuration
Note: If you would like to opt out of this review, comment assign to next reviewer.
Available commands:
stop reviewer notifications- opt out of the automated review toolingremind me after tests pass- tag the comment author after tests passwaiting on author- shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
The PR bot will only process comments in the main thread (not review comments).
Reminder, please take a look at this pr: @shunping