beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]:No translator known for PrimitiveUnboundedRead after SDF fallback

Open dnamaz opened this issue 3 weeks ago • 0 comments

What happened?

GitHub Issue: Flink Runner - No translator known for PrimitiveUnboundedRead after SDF fallback

Title

[Flink Runner] No translator known for PrimitiveUnboundedRead after SDF-to-primitive-read conversion

Labels

  • flink-runner
  • bug
  • P2

Body

What happened?

When using the Flink classic runner (non-portable) with unbounded source connectors like KinesisIO.read(), the pipeline fails with:

java.lang.IllegalStateException: No translator known for org.apache.beam.sdk.util.construction.SplittableParDo$PrimitiveUnboundedRead

Environment

  • Apache Beam version: 2.60.0+ (tested with 2.69.0)
  • Flink version: 1.18, 1.19, 1.20
  • Runner: Flink classic runner (non-portable, without beam_fn_api experiment)
  • Platform: AWS Managed Apache Flink

Steps to reproduce

  1. Create a Beam pipeline using KinesisIO.read() (or any UnboundedSource-based IO)
  2. Run with FlinkRunner on AWS Managed Flink (or any Flink environment without portable runner)
  3. Pipeline fails during translation
Pipeline p = Pipeline.create(options);
p.apply(KinesisIO.read()
    .withStreamName("my-stream")
    .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON))
  .apply(ParDo.of(new ProcessFn()));
p.run();

Root Cause Analysis

The Flink classic runner calls SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary() in FlinkRunner.run() when NOT using the beam_fn_api experiment:

// FlinkRunner.java line 76-79
// TODO(https://github.com/apache/beam/issues/20530): Use SDF read as default when we address performance issue.
if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "beam_fn_api")) {
  SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}

This conversion transforms:

  • Read.UnboundedSplittableParDo.PrimitiveUnboundedRead
  • Read.BoundedSplittableParDo.PrimitiveBoundedRead

However, FlinkStreamingTransformTranslators had no registered translators for these PrimitiveUnboundedRead and PrimitiveBoundedRead transforms, causing the "No translator known" error.

Related Issues

  • Related to #20530 (Use SDF read as default when performance issue is addressed)

Expected Behavior

The pipeline should successfully translate and execute on the Flink classic runner.

Actual Behavior

Pipeline fails during translation with IllegalStateException: No translator known for PrimitiveUnboundedRead.

Proposed Fix

Add explicit translators for PrimitiveUnboundedRead and PrimitiveBoundedRead in FlinkStreamingTransformTranslators.java that delegate to the existing FlinkUnboundedSource and FlinkBoundedSource implementations.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • [ ] Component: Python SDK
  • [ ] Component: Java SDK
  • [ ] Component: Go SDK
  • [ ] Component: Typescript SDK
  • [ ] Component: IO connector
  • [ ] Component: Beam YAML
  • [ ] Component: Beam examples
  • [ ] Component: Beam playground
  • [ ] Component: Beam katas
  • [ ] Component: Website
  • [ ] Component: Infrastructure
  • [ ] Component: Spark Runner
  • [x] Component: Flink Runner
  • [ ] Component: Samza Runner
  • [ ] Component: Twister2 Runner
  • [ ] Component: Hazelcast Jet Runner
  • [ ] Component: Google Cloud Dataflow Runner

dnamaz avatar Dec 06 '25 22:12 dnamaz