flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-38477][runtime] Add FINISHED watermark status to support proper watermark aggregation

Open weiqingy opened this issue 3 months ago • 2 comments

What is the purpose of the change

This PR fixes a watermark aggregation bug that can cause zero output from time-based operators (e.g., IntervalJoin) when some upstream subtasks finish while others later become temporarily idle.

Details of the issue are described in FLINK-38477.

Root cause: Flink currently allows finished subtasks to dominate watermark aggregation by emitting Long.MAX_VALUE instead of being excluded like idle inputs. This prematurely advances downstream watermarks to infinity (Long.MAX_VALUE), halting event-time progress and eliminating output.

Brief change log

Goal: Ensure a finished input is excluded from operator watermark aggregation, just like an idle input. This prevents Long.MAX_VALUE from dominating aggregation when some channels are finished and others are idle.

Implementation

  • Introduced a new watermark status: FINISHED.
  • When an operator finishes, it emits:
    • status = FINISHED
    • watermark = Long.MAX_VALUE if the watermarks of all input channels are already Long.MAX_VALUE.
  • Adjusted aggregation rules:
    • If there are active channelsstatus = ACTIVE, watermark = min(all active channel watermarks).
    • Else if there are idle channelsstatus = IDLE, watermark = max(all idle channel watermarks).
    • Else (all channels finished) → status = FINISHED, watermark = Long.MAX_VALUE.

This ensures finished inputs are ignored during aggregation until every input has completed, at which point the operator watermark correctly advances to Long.MAX_VALUE.

Core issues identified

  • Current state model: ACTIVE ←→ IDLE

    • Cannot distinguish between:
      • IDLE = “temporarily no data, may resume”
      • FINISHED = “permanently done, never resuming”
  • Rejected approach: treating finished as idle.

    • Issue 1: IDLE status gets overwritten

      • StreamTask.endData() emits WatermarkStatus.IDLE when a source finishs.
      • Then operatorChain.finishOperators() calls WatermarkAssignerOperator.processWatermark().
      • When WatermarkAssignerOperator receives Long.MAX_VALUE, it forcibly emits WatermarkStatus.ACTIVE.
      • Result: finished channels are incorrectly marked as ACTIVE, so the finished tasks still participate in watermark aggregation.
    • Issue 2: Incorrect aggregation when all channels are IDLE

      • Even if Issue 1 is fixed, when all channels become IDLE (including finished tasks whose watermark is Long.MAX_VALUE):
      • StatusWatermarkValve calls findAndOutputMaxWatermarkAcrossAllChannels().
      • This still emits a Long.MAX_VALUE watermark.
      • It cannot distinguish between “all temporarily idle” vs. “all permanently finished.”

Conclusion:

  • Introducing the explicit FINISHED status is necessary to create a three-state WatermarkStatus (ACTIVE / IDLE / FINISHED) that properly distinguishes temporary idleness from permanent completion.

Long-term refactoring (future follow-up)

This PR fixes the immediate bug, but the broader problem is fragmented watermark lifecycle management.

Current problems

  • Multiple, conflicting emission points: For example:
StreamTask.endData():
  ├─ advanceToEndOfEventTime()
  │   └─ emits Long.MAX_VALUE watermark
  │
  └─ operatorChain.finishOperators():
      ├─ ContinuousFileReaderOperator.finish()
      │   └─ emits Long.MAX_VALUE (deprecated?)
      │
      ├─ WatermarkAssignerOperator.finish()
      │    ├─ switches IDLE → ACTIVE if idleness enabled 
      │    └─ emits Long.MAX_VALUE watermark
      └─ ...
  • Wrapper operators interfere with completion semantics.
  • Sources don’t fully control their own lifecycle, at least from the perspective of watermark and watermark status handling.

Proposed long-term direction

  • Source-owned completion and embedded watermark strategy:
    Sources should declare when they are active, idle, or finished, and emit both status and watermark atomically. Wrapper operators should not override this behavior.

    • Current Architecture:
      Source → WatermarkAssignerOperator (wrapper) → Downstream
               ↑ Adds watermark strategy
               ↑ Can conflict with source completion
      
    • Benefits:
      • No wrapper operator to conflict with source status.
      • Source and watermark strategy have consistent lifecycle.
      • Table API can configure watermark strategy at source creation, not via wrapper.

This refactoring would prevent the entire class of bugs we've been fixing and make watermark completion semantics clear and maintainable.

Verifying this change

Unit tests (new)

  • Test that FINISHED channels are excluded from min watermark calculation, allowing remaining channels to advance the watermark.
  • Test that FINISHED channels are excluded from max watermark calculation when all other channels become IDLE.
  • Test that FINISHED channels reject non-MAX_VALUE watermarks, accept Long.MAX_VALUE, and properly aggregate across channels to emit Long.MAX_VALUE once when all FINISHED channels receive it.
  • Test that FINISHED is a terminal state and channels cannot transition from FINISHED back to ACTIVE or IDLE.
  • Tests ACTIVE to IDLE and ACTIVE to FINISHED transitions in various configurations, including global status changes
  • Tests IDLE to ACTIVE transitions with both aligned and unaligned watermarks, verifying realignment behavior
  • Tests IDLE to FINISHED transitions in various configurations, verifying global status changes and watermark progression.
  • Aggregation rule selection verified:
    • Active → min(active watermarks).
    • Idle (with no active) → idle status; finished excluded.
    • All finished → operator watermark = Long.MAX_VALUE.

Manual/e2e tests

  1. Set job parallelism greater than Kafka partitions; disable dynamic partition discovery.
  2. Some subtasks finish immediately, others keep running.
  3. Stall the running subtasks until they become idle (table.exec.source.idle-timeout=10s).
  4. Before fix: operator watermark = Long.MAX_VALUE, all records dropped → no output.
    After fix: watermark does not jump; resumed records are processed correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies: no
  • Public API (@Public(Evolving)): no
  • Serializers: no
  • Runtime per-record code paths: yes (watermark aggregation; minimal guarded changes)
  • Deployment/recovery (JM, checkpointing, K8s/Yarn, ZooKeeper): no
  • S3 file system connector: no

Documentation

  • New feature: no (bug fix only; introduces internal FINISHED watermark status)
  • Documentation: not applicable

weiqingy avatar Oct 05 '25 20:10 weiqingy

CI report:

  • 943d7e972a84de40815a7b75195162514eb4ad4c UNKNOWN
  • e49e3d413c5b208565ad4cd80c3872e979798658 Azure: FAILURE
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Oct 05 '25 20:10 flinkbot

@flinkbot run azure

weiqingy avatar Oct 07 '25 05:10 weiqingy