beam icon indicating copy to clipboard operation
beam copied to clipboard

[prism] Processing Time handling (TestStream, Timers, ProcessContinuations)

Open lostluck opened this issue 1 year ago • 0 comments

Sharded out this batch of ProcesssingTime features to complete simultaneously, once TestStream #29917 and to allow #29772 to be closed.

After TestStream is in, there's a very clear hook for synthetically advancing the Processing Time for a pipeline, we must now make use of it. In particular, for handling Processing Time Timers and executing ProcessContinuations. Processing Time Triggers will build upon the implementation we end up with from this work, and are out of scope for this issue.

For ProcessContinuations, ProcessingTime is a Runner Relative time. The user is scheduling a recommendation about when the runner should perform the next action WRT wall time. For ProcessingTime timers, an "absolute" time is being provided based on the SDK worker's time. This means clockskew is going to be introduced invariably.

In a production focused runner, the goal is of course to adhere to these targets as closely as possible for predictable execution.

However, prism isn't currently intending to be a production focused runner. It's intending to be a test runner, which means we can largely rely on a notion of synthetic time. There are 3 cases to consider:

  • When operating in a production mode.
  • When operating in a test mode.
  • When there's a TestStream in the pipeline.

In a production mode, this can default to using time.Now() for advancing the realtime clock. Nothing special in this case. An advantage of this is being able to use facilities from the time.Time package more easily in prism if we need to.

In a test mode, it's valuable for the behavior tests to execute fast, and faster than normal. This is essentially how Prism executes presently for ProcessContinuations. The residuals are scheduled to be reprocessed immeadiately like any other inputs, no waiting involved. Assuming we have a queue of ProcessingTime elements, we would want to maintain this behavior somehow.

When a TestStream is in the pipeline, Prism should use the synthetic clock in TestStream while there are events remaining in the queue. Once the final event has occured, the synthetic clock should fallback to the Test behavior above.


I will note the other place "time" is implicilty considered in Prism is with the Progress loop during bundle processing. Presently that loop uses a ticker and does ordinary splits and such. That progress loop will not be considered for using the synthetic clock since it's important for prism to behave normally WRT the SDK in most ways. The split algorithm shouldn't need to be adjusted when the synthetic clock is in use, but there may be something I'm missing.


For handling Relative cases (ProcessContinuations and Triggers), the handling is straightforward: When we receive them, we calculate their firing time based on the current clock (synthetic or real). The Element Manager then needs to introduce them when that clock reaches the next time.

Residual Elements do need to hold back the watermark as appropriate, but given such transforms should be setting an Estimated Output Watermark time, this should avoid issues with holding back processing time.

ProcessingTime Timers are a complication because they are set absolutely and not relatively. This means the SDK and user could be imposing difficult constraints on the execution of the pipeline, that may not be the most testable. While this can't support 100% of cases, most TestStream uses are for unit testing within a pipeline, which usually has the requirement of fast and correct execution. We can make a simplifying assumption that the user genuinely wants to test their code without the true constraints of "real time".

This implies that we need to re-write the received processing time timer to be in terms of the synthetic clock, instead of the real time clock. Since Prism is notionally a single machine local runner, there should not be a great deal of clock skew between SDK workers and the Prism core. We can assume then that skew is going to be small, if any.

Essentially, we take the received absolute timestamp, subtract the current walltime from it to get a relative duration, and then re-apply that duration to the synthetic clock to get the new expected processing time. Then we handle the progression as normal. This is essentially the same as normal relative, except we must calculate the relative duration first. This logic will apply in test mode executions, not production mode executions.

This approach should work in most cases, but may still lead to flaky behavior for non-conforming tests.

In test mode executions, we can continue to use the same notion of synthetic time, but use the "event" queue for the various ProcessingTime based events to define advancement, since we already have defined the ProcessingTime events to be against the synthetic clock. To a first approximation, it should happen similarly to when TestStream events proc: When runner processing is blocked with pending events, but nothing is in progress. This can happen "one event at a time" as well, and must be prior to the TestStream event handling, since those much happen when no further processing is possible.

lostluck avatar Jan 23 '24 18:01 lostluck