flink-spector icon indicating copy to clipboard operation
flink-spector copied to clipboard

Find a way to manipulate ProcessingTime

Open rami-alisawi opened this issue 8 years ago • 4 comments

I have a GlobalWindow with a simple class TimedTrigger extends Trigger<Value,GlobalWindow>. I registerProcessingTimeTimer ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime()+timer);

But it never gets triggered ( unless I put the 'timer' value to the unrealistic value of 2 milliseconds). I have tired to increase the timeout value with no effect.

rami-alisawi avatar Sep 25 '16 14:09 rami-alisawi

Hi,

This is a shortcoming of flinkspector. What fs does is it sets the time characteristic of the StreamExecutionEnviroment in the background to EventTime. When you insert an window that works explicitly with processing time flinkspector can not manipulate the time anymore by injecting timestamps and watermarks. I think I've addressed this issue shortly in the documentation.

ProcessingTimeWindows work directly with the system time so there's no way for flinkspector to manipulate the triggering from the outside than actually wait the required amount of time to trigger the window, this could potentially lead to very long running tests. Also the emitting of records would have to be totally in sync with the processing time triggers which would be very hard to achieve. There is an old branch of flinkspector where somebody tried this but I've never got an pull request so I'm assuming it didn't work as aspected.

Long story short: The only workaround I see is to replace the processingtimewindows during testing with "normal" windows. Or if you just want to test one window evaluation you can insert a mapper with a sleep in it to keep the pipeline running as long as necessary for that window to trigger. But remember to increase the timeout of flinkspector.

Cheers, Alex

lofifnc avatar Sep 25 '16 14:09 lofifnc

Thanks for fast and elaborated answer. However, regarding this statement

... actually wait the required amount of time to trigger the window, this could potentially lead to very long running tests.

I have my timer passed as parameter so for testing I can pass a much shorter timer. I realise that it would mean that the record emitting needs to happen all at once, but that's a limitation I can live with. If I want to achieve this, I doubt that it is as simple as removing testEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Any pointers to what more I have to modify.

  • The sleep addition to keep the pipeline running sounds like a good workaround, but I could not get it to work. I added Thread.sleep with an amount that is more than the window timer but less than the spector timeout and the timer did not trigger. Any ideas?

rami-alisawi avatar Sep 25 '16 15:09 rami-alisawi

It might be useful to use a dedicated clock object inside flink or flink-spector to simulate the time.

kaelumania avatar Nov 14 '16 16:11 kaelumania

A clock object inside Flink which could be manipulated from the outside and represents processing time would be a really neat feature for testing. In a way EventTime represents such a mechanism, which is why it is used for flink-spector. The downside is that is would be nice to test the relationship of event time and processing time.

Spark Streaming and Storm have a clock object which can be played with during local execution and testing. This is realized by running the pipelines in a specialized local runtime environment. When Flink runs locally it will start up a cluster and run in that cluster completely isolated from the user. The processing time windows in Flink directly access the system time of the machine they are running on. To implement a clock which can be manipulated would mean to replace the System.currentTimeMillis() call inside Flink by an object which can be somehow interchanged during tests. But this object would not be accessible during tests, which means you would have to implement a messaging system which let's you change the time of that object. This would be a pretty big intrusion into one of Flink's core functionalities.

What I have been thinking about is writing a own TimeWindow implementation which you would have to use during tests containing a clock you which can manipulated. The downside is that you would have to replace the normal timeWindow() calls of your code during tests. Also any other user code using some form of system time is still not testable.

I'm not sure if there is a better way in doing this. I personally dropped the idea of just waiting for the clock to progress. The emitting of a record and when it will be processed by the window can not be synced up.

lofifnc avatar Nov 15 '16 09:11 lofifnc