beam
beam copied to clipboard
Clear side input caches of dofninvoker when finishing bundle. Add microbenchmarks for map_fn with side inputs.
Improve the map_fn test by using common utilities. Add support to benchmarks to add profiling. I was originally trying to optimize the code other ways but most of the overhead appears to be due to cython/Python interactions so the fast-past cache for repeated process invocations is important.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [ ] Mention the appropriate issue in your description (for example:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead. - [ ] Update
CHANGES.mdwith noteworthy changes. - [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.
See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.
fixes #28776
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers
Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:
R: @shunping for label python.
Available commands:
stop reviewer notifications- opt out of the automated review toolingremind me after tests pass- tag the comment author after tests passwaiting on author- shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
The PR bot will only process comments in the main thread (not review comments).
fixed lint issue, remaining test failures seem unrelated but they are rerunning anyway
@robertwb @tvalentyn as you have context from before and had performance concerns. From the benchmark, clearing on finish_bundle doesn't appear to impact runtime. You were correct that doing it every time slowed down the process time by about 25%. I tried to optimize by using a placeholder approach similar to other placeholders but it still was slow, I believe due to going back and forth between cython/python as shown by cython annotate see see test.html.txt (uploading as text to workaround github filter). It might be something to investigate to improve performance if desired for process()
Thanks, @scwhittle !
doing it every time slowed down the process time by about 25%.
Have you observed this regression on Dataflow runner as well?
From the benchmark, clearing on finish_bundle doesn't appear to impact runtime.
Seems like a great idea! Have you checked whether Dataflow runtime has changed on any of the perf tests using side-inputs ?
Without the sdk cache enabled (not on by default), removing the PerWindowInvoker cache mans that there is lots of FnApi traffic to fetch side inputs which contributed to latency in Dataflow runner.
I think that we should postpone making this always on until the sdk cache is enabled by default. If that is too far out we could modify this to not clear after every bundle but modify finish_bundle to clear it only after some timeout.
Enabling the sdk cache will let the runner control the refresh via the side-input cache token.
Reminder, please take a look at this pr: @shunping
I agree, and this will likely slow things down a lot more on a real runner without SDK cache than is shown in the microbenchmarks. How hard would it be to only clear the cache if it's reached a certain age (at least until we have sdk cache is enabled by default).
Reminder, please take a look at this pr: @shunping
It would be pretty easy to do so but it could still cause latency regressions on pipelines that have a global side input that is not refreshing.
I was looking into doing so only if the pcollection generating the side input was unbounded as a non-updating side input seems like it would be bounded. I was using the AsSideInput pcollection but then it looked like some additional plumbing was necessary to keep this metadata in SideInputData which is what is unpickled and used for execution. I didn't get a chance to get it all working yet.
@robertwb Do you forsee issues with that approach before I work further on completing it? IIUC the pickling just needs to be consistent between pipeline submission and execution so it wouldn't be an update compatibility issue.
waiting on author
FWIW I am planning to look into enabling state cache again next quarter
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.