hazelcast-jet icon indicating copy to clipboard operation
hazelcast-jet copied to clipboard

Maximum watermark gap is hardcoded to 1000

Open cangencer opened this issue 5 years ago • 2 comments

Currently max. watermark gap is hardcoded as 1000 however this will cause too many WMs to be emitted when using "nanoseconds" as a time unit.

cangencer avatar Apr 10 '19 11:04 cangencer

We don't have config available when converting pipeline to dag. We can't easily replace the value in current code. We would have to store a special value and apply it later on member - even the client doesn't know the member configuration... The resulting watermark throttling rate is in EventTimePolicy. In DAG it is hidden inside the processors that generate WMs, and those processors are opaque. The member never sees the pipeline where we could update the value. I see these solutions:

  1. when converting Pipeline to DAG, use JobConfig. This is not bw-compatible. Or we can deprecate Pipeline.toDag() and add Pipeline.toDag(JobConfig). Still, there is the problem that the job can actually be submitted with different JobConfig
  2. submit Pipeline to member and convert to DAG there. Still the user could convert to DAG himself and submit that. Won't work either
  3. remove EventTimePolicy.watermarkThrottlingFrameSize and apply the throttling in EventTimeMapper. But we can't calculate the correct value from a DAG, processors are opaque.

My conclusion: leave this alone.

viliam-durina avatar May 21 '19 15:05 viliam-durina

Now that we've deprecated Pipeline.toDag() and we convert the pipeline to the DAG on the server side, this has become doable.

mtopolnik avatar Sep 23 '20 12:09 mtopolnik