[Bug]:No translator known for PrimitiveUnboundedRead after SDF fallback
What happened?
GitHub Issue: Flink Runner - No translator known for PrimitiveUnboundedRead after SDF fallback
Title
[Flink Runner] No translator known for PrimitiveUnboundedRead after SDF-to-primitive-read conversion
Labels
flink-runnerbugP2
Body
What happened?
When using the Flink classic runner (non-portable) with unbounded source connectors like KinesisIO.read(), the pipeline fails with:
java.lang.IllegalStateException: No translator known for org.apache.beam.sdk.util.construction.SplittableParDo$PrimitiveUnboundedRead
Environment
- Apache Beam version: 2.60.0+ (tested with 2.69.0)
- Flink version: 1.18, 1.19, 1.20
- Runner: Flink classic runner (non-portable, without
beam_fn_apiexperiment) - Platform: AWS Managed Apache Flink
Steps to reproduce
- Create a Beam pipeline using
KinesisIO.read()(or anyUnboundedSource-based IO) - Run with FlinkRunner on AWS Managed Flink (or any Flink environment without portable runner)
- Pipeline fails during translation
Pipeline p = Pipeline.create(options);
p.apply(KinesisIO.read()
.withStreamName("my-stream")
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON))
.apply(ParDo.of(new ProcessFn()));
p.run();
Root Cause Analysis
The Flink classic runner calls SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary() in FlinkRunner.run() when NOT using the beam_fn_api experiment:
// FlinkRunner.java line 76-79
// TODO(https://github.com/apache/beam/issues/20530): Use SDF read as default when we address performance issue.
if (!ExperimentalOptions.hasExperiment(pipeline.getOptions(), "beam_fn_api")) {
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}
This conversion transforms:
Read.Unbounded→SplittableParDo.PrimitiveUnboundedReadRead.Bounded→SplittableParDo.PrimitiveBoundedRead
However, FlinkStreamingTransformTranslators had no registered translators for these PrimitiveUnboundedRead and PrimitiveBoundedRead transforms, causing the "No translator known" error.
Related Issues
- Related to #20530 (Use SDF read as default when performance issue is addressed)
Expected Behavior
The pipeline should successfully translate and execute on the Flink classic runner.
Actual Behavior
Pipeline fails during translation with IllegalStateException: No translator known for PrimitiveUnboundedRead.
Proposed Fix
Add explicit translators for PrimitiveUnboundedRead and PrimitiveBoundedRead in FlinkStreamingTransformTranslators.java that delegate to the existing FlinkUnboundedSource and FlinkBoundedSource implementations.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- [ ] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [x] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner