flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-34252][table] Fix lastRecordTime tracking in WatermarkAssignerOperator

Open dchristle opened this issue 1 year ago • 2 comments

What is the purpose of the change

In the current implementation, the lastRecordTime variable, which tracks the time of the last received data element, is updated only when the WatermarkStatus transitions from IDLE to ACTIVE. However, it is not updated when WatermarkStatus is ACTIVE, which means even under continuous data flow, the condition (currentTime - lastRecordTime > idleTimeout) will eventually always become true, and the WatermarkStatus will erroneously be marked IDLE.

I believe this bug technically causes incorrect outputs since downstream watermarks advance earlier than they otherwise would. The incorrect state doesn't last forever, though, since when the WatermarkStatus is in in the IDLE state, the next processElement will cause a WatermarkStatus.ACTIVE to be emitted.

The new unit test illustrates the flip-flop behavior before the fix:

[ERROR] org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest.testIdleStateAvoidanceWithConsistentDataFlow -- Time elapsed: 0.013 s <<< FAILURE!
java.lang.AssertionError:

Expecting
  [WatermarkStatus(IDLE),
    WatermarkStatus(ACTIVE),
    WatermarkStatus(IDLE),
    WatermarkStatus(ACTIVE),
    WatermarkStatus(IDLE),
    WatermarkStatus(ACTIVE),
    WatermarkStatus(IDLE),
    WatermarkStatus(ACTIVE),
    WatermarkStatus(IDLE)]
not to contain
  [WatermarkStatus(IDLE)]
but found
  [WatermarkStatus(IDLE)]

Brief change log

  • Update lastRecordTime in table WatermarkAssignerOperator on each record to prevent the stream from incorrectly being marked idle

Verifying this change

This change added tests and can be verified as follows:

  • Added test that validates the WatermarkStatus is not set to idle when records are sent more frequently than the idleTimeout

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (yes)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

dchristle avatar Jan 29 '24 02:01 dchristle

CI report:

  • 3b898a0c74a5dcb957ab7ab30b79dd82d01e8c0e Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Jan 29 '24 02:01 flinkbot

Do we know if watermarkInterval == 0 is a valid case where we are sure we need to periodically emit WatermarkStatus? My interpretation when first reading this code was that a zero value is a sentinel indicating the user isn't using watermarks. If that's the case, does the concept of "idleness" make sense anymore? Perhaps we don't emit anything for watermarkInterval == 0.

Yes, that's a valid combination with idleness in at least a couple of cases:

  • watermark is emitted in onEvent, for example for every record OR if there is already some special record in the source that represents a watermark
  • given Source/SourceReader handles emission of watermarks on it's own, but doesn't handle idleness In both cases watermarkInterval = 0 and idleTimeout > 0 could be a desired configuration.

It seems like the intent was to ensure advancement under high load, rather than support the watermarkInterval == 0 case. Without this eager advancement, the code would never emit Watermark, since no timers would be scheduled & we'd never try to detect idleness.

That's a good catch @dchristle!

This safety net for an overloaded subtask thread I think might no longer be needed. In the past the timer thread trying to emit periodic watermark and the subtask/source thread would try to compete to acquire the same lock. AFAIK the original authors were worried about lock starvation (I'm not even sure if this was really needed, as AFAIK kernels do not guarantee fairness, but guarantee that threads won't be starved on locks acquisition). Currently the synchronisation mechanism is very different, and I would say there is no real need for this extra code path.

So if it makes things simpler I would be fine removing this safety net.

pnowojski avatar Feb 23 '24 08:02 pnowojski

Due to inactivity I've prepared an improved version of this PR: https://github.com/apache/flink/pull/24941

Closing this in favour the newer one.

pnowojski avatar Jun 14 '24 13:06 pnowojski