beam
beam copied to clipboard
Initial DaskRunner for Beam
Here, I've created a minimum viable Apache Beam runner for Dask. My approach is to visit a Beam Pipeline an translate PCollections into Dask Bags, and PTransformations to Bag methods.
In this version, I have supported enough operations to make test pipeline asserts work. The test themselves are not comprehensive. Further, there are many Bag operations that could be translated for greater efficiency.
CC: @pabloem
Fixes: #18962
Original PR discussion can be found here: https://github.com/alxmrs/beam/pull/1
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [x] Choose reviewer(s) and mention them in a comment (
R: @username). - [x] Mention the appropriate issue in your description (for example:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead. - [ ] Update
CHANGES.mdwith noteworthy changes. - [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.
See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.
Codecov Report
Merging #22421 (f9cf45a) into master (107a43d) will decrease coverage by
0.14%. The diff coverage is82.04%.
@@ Coverage Diff @@
## master #22421 +/- ##
==========================================
- Coverage 73.35% 73.21% -0.15%
==========================================
Files 719 728 +9
Lines 95800 96272 +472
==========================================
+ Hits 70276 70482 +206
- Misses 24212 24479 +267
+ Partials 1312 1311 -1
| Flag | Coverage Δ | |
|---|---|---|
| python | 82.80% <86.85%> (-0.25%) |
:arrow_down: |
Flags with carried forward coverage won't be shown. Click here to find out more.
| Impacted Files | Coverage Δ | |
|---|---|---|
| sdks/go/pkg/beam/core/runtime/graphx/translate.go | 38.42% <0.00%> (ø) |
|
| sdks/go/pkg/beam/core/runtime/xlangx/expand.go | 0.00% <0.00%> (ø) |
|
| sdks/go/pkg/beam/schema.go | 35.29% <ø> (ø) |
|
| ...ython/apache_beam/runners/interactive/sql/utils.py | 76.09% <ø> (ø) |
|
| sdks/python/apache_beam/transforms/combiners.py | 93.43% <ø> (ø) |
|
| sdks/python/apache_beam/typehints/row_type.py | 100.00% <ø> (ø) |
|
| ...apache_beam/typehints/native_type_compatibility.py | 85.52% <33.33%> (-1.06%) |
:arrow_down: |
| sdks/python/apache_beam/typehints/opcodes.py | 85.35% <50.00%> (-0.26%) |
:arrow_down: |
| ...dks/python/apache_beam/runners/dask/dask_runner.py | 86.45% <86.45%> (ø) |
|
| .../python/apache_beam/typehints/trivial_inference.py | 96.15% <87.50%> (-0.27%) |
:arrow_down: |
| ... and 42 more |
:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more
@TomAugspurger: I'm having trouble running my unit tests. My tests used to work, but now I'm noticing infinite loops when running them on a local cluster (the default scheduler).
In my last commit, I changed the client.gather command to async mode, and this let me hit a timeout error. Here, it appears that the Beam tests pass (asserts behave as expected within a compute graph), however, the client never stops running and times out after 4 seconds.
Do you have any idea of what's going on? One key difference between my set up now and when I wrote this is that I'm not on a M1 Mac (ARM64). Could this cause my problem?
CC: @rabernat @cisaacstern @pabloem
In my last commit, I changed the client.gather command to async mode, and this let me hit a timeout error
The
self.client.gather(self.futures, errors='raise', asynchronous=True)
looks incorrect inside of a regular def function. That would typically need to be await self.client.gather inside of an async function, since asynchronous=True makes that return a coroutine that needs to be awaited.
Can you expand on the desire for asynchronous=True there? The timeout wasn't working properly without it? FWIW, I don't see the infinite loops locally, even with asynchronous=True.
looks incorrect inside of a regular def function.
Yes – thanks for pointing this out. This makes sense to me, looking further at the documentation.
Can you expand on the desire for asynchronous=True there?
I... really am just trying things to stop hitting an infinite loop. This got me to a timeout error when run in tests. Though, when running e2e in Pangeo-Forge, I definitely experience a runtime error complaining that I wasn't in an async def.
FWIW, I don't see the infinite loops locally, even with asynchronous=True.
Interesting! Do the tests pass for you? What is your environment like? I'm concerned that I'm hitting another architecture issue with ARM.
Thanks for taking a look at this, Tom.
Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:
R: @tvalentyn for label python.
Available commands:
stop reviewer notifications- opt out of the automated review toolingremind me after tests pass- tag the comment author after tests passwaiting on author- shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
The PR bot will only process comments in the main thread (not review comments).
I'll take a look at this today!
oops it seems like there's a failure : /
Run Python PreCommit
Run Python PreCommit
Run Python PreCommit
the only test that is giving trouble should be easy to fix or skip for now. I'll review the PR as is and maybe we'll merge it soon
Thanks Pablo. I think I can easily fix it – I'm having trouble reproducing the issue on my local environment due to my M1 woes.
ok I've taken a look. The code, in fact, looks so clean that I'm very happy to merge.
Run Python PreCommit
Run Python PreCommit
Run Python PreCommit
ugggg haha can't get a passing precommit even though the tests are unrelated.
Run Python PreCommit
Run Python PreCommit
sorry about the crazy flakiness. Something is going on recently with our precommits...
ugggg incredibly enough, this issue reproduces only very occasionally in my environment.
Run Python PreCommit
given no changes anywhere close to the current flaky tests, I will merge.
LGTM
Wohoo!
Thanks. Python Precommit showing following test failure: https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/6286/
apache_beam.runners.dask.dask_runner_test.DaskOptionsTest.test_parser_destinations__agree_with_dask_client
AssertionError: 'vpt_vp_arg3' not found in ['address', 'loop', 'timeout', 'set_as_default', 'scheduler_file', 'security', 'asynchronous', 'name', 'heartbeat_interval', 'serializers', 'deserializers', 'extensions', 'direct_to_workers', 'connection_limit', 'kwargs']
apache_beam.runners.dask.dask_runner_test.DaskRunnerRunPipelineTest.test_create
TypeError: __init__() got an unexpected keyword argument 'vpt_vp_arg3'
thanks Yi for pointing this out