[FLINK-38477][runtime] Add FINISHED watermark status to support proper watermark aggregation
What is the purpose of the change
This PR fixes a watermark aggregation bug that can cause zero output from time-based operators (e.g., IntervalJoin) when some upstream subtasks finish while others later become temporarily idle.
Details of the issue are described in FLINK-38477.
Root cause: Flink currently allows finished subtasks to dominate watermark aggregation by emitting Long.MAX_VALUE instead of being excluded like idle inputs. This prematurely advances downstream watermarks to infinity (Long.MAX_VALUE), halting event-time progress and eliminating output.
Brief change log
Goal: Ensure a finished input is excluded from operator watermark aggregation, just like an idle input. This prevents Long.MAX_VALUE from dominating aggregation when some channels are finished and others are idle.
Implementation
- Introduced a new watermark status:
FINISHED. - When an operator finishes, it emits:
-
status = FINISHED -
watermark = Long.MAX_VALUEif the watermarks of all input channels are already Long.MAX_VALUE.
-
- Adjusted aggregation rules:
- If there are active channels →
status = ACTIVE,watermark = min(all active channel watermarks). - Else if there are idle channels →
status = IDLE,watermark = max(all idle channel watermarks). - Else (all channels finished) →
status = FINISHED,watermark = Long.MAX_VALUE.
- If there are active channels →
This ensures finished inputs are ignored during aggregation until every input has completed, at which point the operator watermark correctly advances to Long.MAX_VALUE.
Core issues identified
-
Current state model:
ACTIVE←→IDLE- Cannot distinguish between:
-
IDLE= “temporarily no data, may resume” -
FINISHED= “permanently done, never resuming”
-
- Cannot distinguish between:
-
Rejected approach: treating finished as idle.
-
Issue 1: IDLE status gets overwritten
-
StreamTask.endData()emitsWatermarkStatus.IDLEwhen a source finishs. - Then
operatorChain.finishOperators()callsWatermarkAssignerOperator.processWatermark(). - When
WatermarkAssignerOperatorreceivesLong.MAX_VALUE, it forcibly emitsWatermarkStatus.ACTIVE. - Result: finished channels are incorrectly marked as ACTIVE, so the finished tasks still participate in watermark aggregation.
-
-
Issue 2: Incorrect aggregation when all channels are IDLE
- Even if Issue 1 is fixed, when all channels become IDLE (including finished tasks whose watermark is
Long.MAX_VALUE): -
StatusWatermarkValvecallsfindAndOutputMaxWatermarkAcrossAllChannels(). - This still emits a
Long.MAX_VALUEwatermark. - It cannot distinguish between “all temporarily idle” vs. “all permanently finished.”
- Even if Issue 1 is fixed, when all channels become IDLE (including finished tasks whose watermark is
-
Conclusion:
- Introducing the explicit
FINISHEDstatus is necessary to create a three-state WatermarkStatus (ACTIVE / IDLE / FINISHED) that properly distinguishes temporary idleness from permanent completion.
Long-term refactoring (future follow-up)
This PR fixes the immediate bug, but the broader problem is fragmented watermark lifecycle management.
Current problems
- Multiple, conflicting emission points: For example:
StreamTask.endData():
├─ advanceToEndOfEventTime()
│ └─ emits Long.MAX_VALUE watermark
│
└─ operatorChain.finishOperators():
├─ ContinuousFileReaderOperator.finish()
│ └─ emits Long.MAX_VALUE (deprecated?)
│
├─ WatermarkAssignerOperator.finish()
│ ├─ switches IDLE → ACTIVE if idleness enabled
│ └─ emits Long.MAX_VALUE watermark
└─ ...
- Wrapper operators interfere with completion semantics.
- Sources don’t fully control their own lifecycle, at least from the perspective of watermark and watermark status handling.
Proposed long-term direction
-
Source-owned completion and embedded watermark strategy:
Sources should declare when they are active, idle, or finished, and emit both status and watermark atomically. Wrapper operators should not override this behavior.-
Current Architecture:
Source → WatermarkAssignerOperator (wrapper) → Downstream ↑ Adds watermark strategy ↑ Can conflict with source completion -
Benefits:
- No wrapper operator to conflict with source status.
- Source and watermark strategy have consistent lifecycle.
- Table API can configure watermark strategy at source creation, not via wrapper.
-
Current Architecture:
This refactoring would prevent the entire class of bugs we've been fixing and make watermark completion semantics clear and maintainable.
Verifying this change
Unit tests (new)
- Test that FINISHED channels are excluded from min watermark calculation, allowing remaining channels to advance the watermark.
- Test that FINISHED channels are excluded from max watermark calculation when all other channels become IDLE.
- Test that FINISHED channels reject non-MAX_VALUE watermarks, accept Long.MAX_VALUE, and properly aggregate across channels to emit Long.MAX_VALUE once when all FINISHED channels receive it.
- Test that FINISHED is a terminal state and channels cannot transition from FINISHED back to ACTIVE or IDLE.
- Tests ACTIVE to IDLE and ACTIVE to FINISHED transitions in various configurations, including global status changes
- Tests IDLE to ACTIVE transitions with both aligned and unaligned watermarks, verifying realignment behavior
- Tests IDLE to FINISHED transitions in various configurations, verifying global status changes and watermark progression.
- Aggregation rule selection verified:
- Active → min(active watermarks).
- Idle (with no active) → idle status; finished excluded.
- All finished → operator watermark =
Long.MAX_VALUE.
Manual/e2e tests
- Set job parallelism greater than Kafka partitions; disable dynamic partition discovery.
- Some subtasks finish immediately, others keep running.
- Stall the running subtasks until they become idle (
table.exec.source.idle-timeout=10s). -
Before fix: operator watermark =
Long.MAX_VALUE, all records dropped → no output.
After fix: watermark does not jump; resumed records are processed correctly.
Does this pull request potentially affect one of the following parts:
- Dependencies: no
- Public API (
@Public(Evolving)): no - Serializers: no
- Runtime per-record code paths: yes (watermark aggregation; minimal guarded changes)
- Deployment/recovery (JM, checkpointing, K8s/Yarn, ZooKeeper): no
- S3 file system connector: no
Documentation
- New feature: no (bug fix only; introduces internal
FINISHEDwatermark status) - Documentation: not applicable
CI report:
- 943d7e972a84de40815a7b75195162514eb4ad4c UNKNOWN
- e49e3d413c5b208565ad4cd80c3872e979798658 Azure: FAILURE
Bot commands
The @flinkbot bot supports the following commands:-
@flinkbot run azurere-run the last Azure build
@flinkbot run azure