beam icon indicating copy to clipboard operation
beam copied to clipboard

Add primitive read translators

Open dnamaz opened this issue 3 weeks ago • 10 comments

Pull Request: Add PrimitiveUnboundedRead/PrimitiveBoundedRead Translators to Flink Runner

Title

[Flink Runner] Add translators for PrimitiveUnboundedRead and PrimitiveBoundedRead

Description

What is this PR doing?

This PR adds explicit translators for SplittableParDo.PrimitiveUnboundedRead and SplittableParDo.PrimitiveBoundedRead to the Flink streaming transform translators. These translators handle the case where Read.Unbounded and Read.Bounded are converted to primitive reads by SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary().

Why is this needed?

The Flink classic runner (non-portable) calls convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary() when NOT using the beam_fn_api experiment. This converts SDF-wrapped reads to PrimitiveUnboundedRead and PrimitiveBoundedRead transforms. However, there were no registered translators for these transforms, causing pipelines using unbounded sources (like KinesisIO.read()) to fail with:

java.lang.IllegalStateException: No translator known for org.apache.beam.sdk.util.construction.SplittableParDo$PrimitiveUnboundedRead

How does it work?

The new translators:

  1. PrimitiveUnboundedReadTranslator: Extracts the UnboundedSource from PrimitiveUnboundedRead.getSource() and creates a FlinkUnboundedSource, following the same pattern as the existing UnboundedReadSourceTranslator.

  2. PrimitiveBoundedReadTranslator: Extracts the BoundedSource from PrimitiveBoundedRead.getSource() and creates a FlinkBoundedSource, following the same pattern as the existing BoundedReadSourceTranslator.

The key difference from the existing translators is that they retrieve the source directly from the transform (transform.getSource()) rather than using ReadTranslation.unboundedSourceFromTransform(), since PrimitiveUnboundedRead and PrimitiveBoundedRead are not standard Read transforms with URNs.

Changes

  • FlinkStreamingTransformTranslators.java:

    • Added PrimitiveUnboundedReadTranslator<T> class
    • Added PrimitiveBoundedReadTranslator<T> class
    • Modified getTranslator() to check for PrimitiveUnboundedRead and PrimitiveBoundedRead instances before URN lookup
  • FlinkStreamingTransformTranslatorsTest.java:

    • Added getTranslatorReturnsPrimitiveUnboundedReadTranslator() test
    • Added getTranslatorReturnsPrimitiveBoundedReadTranslator() test
    • Added primitiveUnboundedReadTranslatorProducesCorrectSource() test
    • Added primitiveBoundedReadTranslatorProducesCorrectSource() test
  • CHANGES.md:

    • Added bugfix entry for 2.71.0

Issue

Fixes #37035

Created issue: #37035 Related to #20530 (Use SDF read as default)

Checklist

  • [x] Code formatted with ./gradlew :runners:flink:1.18:spotlessApply
  • [x] Unit tests added in FlinkStreamingTransformTranslatorsTest.java
  • [x] All Flink runner tests pass (./gradlew :runners:flink:1.18:test)
  • [x] CHANGES.md updated and formatted with ./gradlew formatChanges
  • [x] No breaking changes to public API

Testing

Unit Tests

./gradlew :runners:flink:1.18:test
# BUILD SUCCESSFUL - all tests pass

Integration Testing

Tested on AWS Managed Apache Flink with a real pipeline using KinesisIO.read():

  1. Before fix: Pipeline fails during translation with "No translator known for PrimitiveUnboundedRead"
  2. After fix: Pipeline successfully translates and runs, reading records from Kinesis

Test environment:

  • AWS Managed Apache Flink (FLINK-1_18 runtime)
  • Apache Beam 2.71.0-SNAPSHOT (with this fix)
  • KinesisIO.read() connector

Backwards Compatibility

This change is fully backwards compatible:

  1. No public API changes: Only internal translator classes are added
  2. No behavior changes for existing code: The new translators only activate when PrimitiveUnboundedRead or PrimitiveBoundedRead transforms are present (which previously caused failures)
  3. Existing URN-based translation unchanged: Standard Read.Bounded and Read.Unbounded with URNs continue to use the existing ReadSourceTranslator

Performance

No performance impact expected. The new translators use the same FlinkUnboundedSource and FlinkBoundedSource implementations as the existing translators.

dnamaz avatar Dec 06 '25 23:12 dnamaz

Summary of Changes

Hello @dnamaz, I'm Gemini Code Assist[^1]! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request resolves a critical issue in the Flink classic runner where pipelines using certain read operations, specifically those internally converted to "PrimitiveUnboundedRead" or "PrimitiveBoundedRead" by "SplittableParDo", would fail during translation due to the absence of appropriate translators. By implementing dedicated translators for these primitive read types, the change ensures that such pipelines, including those leveraging unbounded sources like KinesisIO, can now execute successfully, thereby enhancing the robustness and compatibility of the Flink runner.

Highlights

  • Flink Runner Bug Fix: Addresses an "IllegalStateException" in the Flink classic runner when using unbounded sources (e.g., KinesisIO.read()) without the "beam_fn_api" experiment, caused by missing translators for "PrimitiveUnboundedRead" and "PrimitiveBoundedRead" transforms.
  • New Translators Added: Introduces "PrimitiveUnboundedReadTranslator" and "PrimitiveBoundedReadTranslator" to handle the conversion of "Read.Unbounded" and "Read.Bounded" into their primitive forms by "SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary()".
  • Translation Mechanism: The new translators directly extract the "UnboundedSource" or "BoundedSource" from the respective primitive read transform, bypassing the URN-based translation used for standard "Read" transforms.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with :thumbsup: and :thumbsdown: on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

[^1]: Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

gemini-code-assist[bot] avatar Dec 06 '25 23:12 gemini-code-assist[bot]

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

github-actions[bot] avatar Dec 07 '25 00:12 github-actions[bot]

assign set of reviewers

There is a problem with one of the tests, it is happening in this file:

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java

on line 302,

public static <T> Block<T> fromValues(List<T> values, @Nullable ByteString nextToken) { if (values.isEmpty() && nextToken == null) { return emptyBlock(); } ImmutableList<T> immutableValues = ImmutableList.copyOf(values); long listWeight = immutableValues.size() * Caches.REFERENCE_SIZE; for (T value : immutableValues) { listWeight = LongMath.saturatedAdd(listWeight, Caches.weigh(value)); } return fromValues(immutableValues, listWeight, nextToken); }

will throw a null pointer exception if the value is null. I believe this is causing the issue with the PR check failing in the test.

testSideInputWithNullDefault,

should this be fixed and expected in the test or modify the collection to use a null safe list? should it be part of this PR or create another bug? Does this currently happen in other PR checks? Am i missing something?

The call chain is: ViewTest.processElement (line 824) calls c.sideInput(view) → FnApiDoFnRunner.sideInput() → FnApiStateAccessor.get() → SingletonViewFn2.apply() iterates through the side input → CachingStateIterator.hasNext() loads the next block → Block.fromValues() at line 315 - calls ImmutableList.copyOf() which fails on null

dnamaz avatar Dec 08 '25 05:12 dnamaz

/gemini review

dnamaz avatar Dec 08 '25 23:12 dnamaz

/gemini review

dnamaz avatar Dec 09 '25 00:12 dnamaz

/gemini review

dnamaz avatar Dec 09 '25 00:12 dnamaz

/gemini summary

dnamaz avatar Dec 09 '25 01:12 dnamaz

Summary of Changes

This pull request addresses a critical bug in the Flink classic runner where pipelines utilizing certain read operations, specifically those internally converted to PrimitiveUnboundedRead or PrimitiveBoundedRead by SplittableParDo, would fail during translation due to the absence of appropriate translators. By introducing dedicated translators and refactoring common source translation logic, this change ensures that such pipelines, including those using unbounded sources like KinesisIO, can now execute successfully, significantly improving the robustness and compatibility of the Flink runner.

Highlights

  • Fix for Flink Classic Runner: Resolves an IllegalStateException in the Flink classic runner when PrimitiveUnboundedRead or PrimitiveBoundedRead transforms are encountered, particularly with unbounded sources like KinesisIO, due to missing translators.
  • New Translators Introduced: Adds PrimitiveUnboundedReadTranslator and PrimitiveBoundedReadTranslator to explicitly handle primitive read transforms generated by SplittableParDo's conversion logic.
  • Refactored Source Translation: Common logic for translating unbounded and bounded sources has been extracted into dedicated helper methods (translateUnboundedSource, translateBoundedSource) to reduce code duplication and improve maintainability.
  • Enhanced Test Coverage: New unit tests have been added to verify the correct functioning and source production of the newly introduced primitive read translators.
Changelog
  • CHANGES.md
    • Added a bugfix entry for Flink classic runner failing with "No translator known for PrimitiveUnboundedRead" when using unbounded source connectors like KinesisIO after SDF-to-primitive-read conversion.
  • runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
    • Introduced PrimitiveUnboundedReadTranslator<T> and PrimitiveBoundedReadTranslator<T> classes to handle primitive read transforms.
    • Modified the getTranslator() method to prioritize lookup for PrimitiveUnboundedRead and PrimitiveBoundedRead instances.
    • Extracted common source translation logic into new private static helper methods: translateUnboundedSource and translateBoundedSource.
    • Extracted a private static helper method getSourceParallelism to determine source operator parallelism.
    • Refactored UnboundedReadSourceTranslator and BoundedReadSourceTranslator to utilize the new common translation helper methods.
  • runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java
    • Added new unit tests: getTranslatorReturnsPrimitiveUnboundedReadTranslator, getTranslatorReturnsPrimitiveBoundedReadTranslator, primitiveUnboundedReadTranslatorProducesCorrectSource, and primitiveBoundedReadTranslatorProducesCorrectSource.
    • Introduced a new helper method applyReadSourceTransformWithTranslator for testing purposes.
Activity
  • A bot provided an initial summary of the pull request.
  • The bot noted that checks were failing and would not request a review.
  • The author identified a potential NullPointerException in StateFetchingIterators.java affecting a test (testSideInputWithNullDefault) and sought guidance on whether to fix it in this PR or a new one.
  • The author issued multiple /gemini review commands and a /gemini summary command.
  • A bot review highlighted that the PrimitiveBoundedReadTranslator was missing important logic (try-catch, .returns(), batch mode handling, consistent stepName) present in BoundedReadSourceTranslator.
  • A bot review suggested replacing the XXXXX placeholder in CHANGES.md with the actual issue number 37035.
  • A bot review recommended extracting common logic from UnboundedReadSourceTranslator and PrimitiveUnboundedReadTranslator into a shared helper method.
  • A bot review suggested extracting the source parallelism determination logic into a private helper method to reduce duplication.
  • A bot review suggested extracting the logic for retrieving the single output PCollection into a helper method, to which the author responded, arguing against over-abstraction for a 2-3 line duplication, and the bot acknowledged the valid point.

gemini-code-assist[bot] avatar Dec 09 '25 01:12 gemini-code-assist[bot]

assign set of reviewers

dnamaz avatar Dec 09 '25 01:12 dnamaz

Assigning reviewers:

R: @shunping added as fallback since no labels match configuration

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

github-actions[bot] avatar Dec 09 '25 01:12 github-actions[bot]

Reminder, please take a look at this pr: @shunping

github-actions[bot] avatar Dec 16 '25 12:12 github-actions[bot]