hazelcast-jet
hazelcast-jet copied to clipboard
Maximum watermark gap is hardcoded to 1000
Currently max. watermark gap is hardcoded as 1000
however this will cause too many WMs to be emitted when using "nanoseconds" as a time unit.
We don't have config available when converting pipeline to dag. We can't easily replace the value in current code. We would have to store a special value and apply it later on member - even the client doesn't know the member configuration...
The resulting watermark throttling rate is in EventTimePolicy
. In DAG
it is hidden inside the processors that generate WMs, and those processors are opaque. The member never sees the pipeline where we could update the value. I see these solutions:
- when converting
Pipeline
toDAG
, useJobConfig
. This is not bw-compatible. Or we can deprecatePipeline.toDag()
and addPipeline.toDag(JobConfig)
. Still, there is the problem that the job can actually be submitted with differentJobConfig
- submit Pipeline to member and convert to DAG there. Still the user could convert to DAG himself and submit that. Won't work either
- remove
EventTimePolicy.watermarkThrottlingFrameSize
and apply the throttling inEventTimeMapper
. But we can't calculate the correct value from a DAG, processors are opaque.
My conclusion: leave this alone.
Now that we've deprecated Pipeline.toDag()
and we convert the pipeline to the DAG on the server side, this has become doable.