beam
beam copied to clipboard
[Bug]: RunInference with automatic refresh using a global window side inputs takes too long to update sometimes
What happened?
Sometimes, a global window side input takes too long to update on a Dataflow job.
The automatic model refresh feature of RunInference uses a pattern WatchFilePattern
which uses a global windowed side input to fetch the latest model path. If the pipeline using this feature launches a dataflow job and there is an update to the model path, it could take long time to update the model since sometimes the workers are busy dealing with backlog.
The code I used to run is at https://github.com/apache/beam/blob/11e2bae4cbee4cc4f9d200a71511d921e8591dcd/examples/notebooks/beam-ml/automatic_model_refresh.ipynb
The work around I found is to increase the num_workers>1
, I usually set it to 5.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [X] Component: Google Cloud Dataflow Runner
Does this reproduce on runners other than Dataflow? Can we isolate it to the SDK harness (possibly a cache that keeps getting its invalidation re-upped?) or is it runner-specific?
Maybe CC @lostluck regarding Prism's ability to get repro this. (global window triggered side input never updating as long as worker kept busy)
Triggers are not yet implemented in Prism, so a global window side input will not re-execute. (The pipeline will fail before starting presently.) Triggers are last thing to implement since they have the most complicated effects on downstream computations.
Overall Prism feature status is tracked in https://github.com/apache/beam/issues/29650
I don't know how RunInference is implemented, but if it's not adding the trigger itself, then the linked code as written won't execute RunInference at all, since the global window side input will never complete, and the consuming transform won't start until the side input is ready.
Ok, so it's WatchFilePattern applying a AfterProcessingTime trigger.
I believe this is a long-standing bug within the python sdk. Side inputs within the global window are cached in PerWindowInvoker without respecting the side input cache token.
This is part of the bundle processor which is reused across bundles. The side input values are otherwise attempted with reset here or by the runner by modifying the side input cache token.
Since bundle procesors are cached as long as there is a steady rate of input so that the last accessed time is less than 60 seconds here, this can lead to extended periods where the captured global side input value is used without refresh.
I think that we should remove the caching at the invoker level as it does not respect the cache token and the StateBackedSideInput supports caching itself. This may be a performance regression as the state cache is currently disabled by default though.
Nice find! I have seen quite a few disconnected issues around side input refresh in Python. I hope this is the real root cause.
@AnandInguva Can you verify the above fixes your issue? It does for my separate testing pipeline
@liferoad - Could this be closed?
Asked @AnandInguva to confirm this.
Note that the fix was put behind an experiment due to potential performance concerns in cases where refreshing was undesired and the short time frame to get the original PR in for the release. See https://github.com/apache/beam/commit/bcb40cf4e4a9b9045b51162edab09cf245456038
I hope to verify there is no significant regression or improve the logic on caching to take into account side input generation triggering and was keeping the the issue open until the experiment was not required.
The experiment is disable_global_windowed_args_caching
Without the sdk cache enabled (not on by default), removing the PerWindowInvoker cache mans that there is lots of FnApi traffic to fetch side inputs which contributed to latency in Dataflow runner. This could be a noticable regression without improved functionality for pipelines that don't actually refresh the global window side input.
I think that we should postpone making this always on until the sdk cache is enabled by default. If that is too far out we could modify this to not clear after every bundle but modify finish_bundle to clear it only after some timeout. Or perhaps we can somehow access the triggering of the side input to observe if it refreshes or is calculated once.
Enabling the sdk cache will let the runner control the refresh via the side-input cache token.