beam icon indicating copy to clipboard operation
beam copied to clipboard

Initial DaskRunner for Beam

Open alxmrs opened this issue 3 years ago • 1 comments

Here, I've created a minimum viable Apache Beam runner for Dask. My approach is to visit a Beam Pipeline an translate PCollections into Dask Bags, and PTransformations to Bag methods.

In this version, I have supported enough operations to make test pipeline asserts work. The test themselves are not comprehensive. Further, there are many Bag operations that could be translated for greater efficiency.

CC: @pabloem

Fixes: #18962

Original PR discussion can be found here: https://github.com/alxmrs/beam/pull/1


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • [x] Choose reviewer(s) and mention them in a comment (R: @username).
  • [x] Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • [ ] Update CHANGES.md with noteworthy changes.
  • [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels Python tests Java tests

See CI.md for more information about GitHub Actions CI.

alxmrs avatar Jul 22 '22 21:07 alxmrs

Codecov Report

Merging #22421 (f9cf45a) into master (107a43d) will decrease coverage by 0.14%. The diff coverage is 82.04%.

@@            Coverage Diff             @@
##           master   #22421      +/-   ##
==========================================
- Coverage   73.35%   73.21%   -0.15%     
==========================================
  Files         719      728       +9     
  Lines       95800    96272     +472     
==========================================
+ Hits        70276    70482     +206     
- Misses      24212    24479     +267     
+ Partials     1312     1311       -1     
Flag Coverage Δ
python 82.80% <86.85%> (-0.25%) :arrow_down:

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/go/pkg/beam/core/runtime/graphx/translate.go 38.42% <0.00%> (ø)
sdks/go/pkg/beam/core/runtime/xlangx/expand.go 0.00% <0.00%> (ø)
sdks/go/pkg/beam/schema.go 35.29% <ø> (ø)
...ython/apache_beam/runners/interactive/sql/utils.py 76.09% <ø> (ø)
sdks/python/apache_beam/transforms/combiners.py 93.43% <ø> (ø)
sdks/python/apache_beam/typehints/row_type.py 100.00% <ø> (ø)
...apache_beam/typehints/native_type_compatibility.py 85.52% <33.33%> (-1.06%) :arrow_down:
sdks/python/apache_beam/typehints/opcodes.py 85.35% <50.00%> (-0.26%) :arrow_down:
...dks/python/apache_beam/runners/dask/dask_runner.py 86.45% <86.45%> (ø)
.../python/apache_beam/typehints/trivial_inference.py 96.15% <87.50%> (-0.27%) :arrow_down:
... and 42 more

:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more

codecov[bot] avatar Sep 05 '22 22:09 codecov[bot]

@TomAugspurger: I'm having trouble running my unit tests. My tests used to work, but now I'm noticing infinite loops when running them on a local cluster (the default scheduler).

In my last commit, I changed the client.gather command to async mode, and this let me hit a timeout error. Here, it appears that the Beam tests pass (asserts behave as expected within a compute graph), however, the client never stops running and times out after 4 seconds.

Do you have any idea of what's going on? One key difference between my set up now and when I wrote this is that I'm not on a M1 Mac (ARM64). Could this cause my problem?

CC: @rabernat @cisaacstern @pabloem

alxmrs avatar Sep 21 '22 21:09 alxmrs

In my last commit, I changed the client.gather command to async mode, and this let me hit a timeout error

The

self.client.gather(self.futures, errors='raise', asynchronous=True)

looks incorrect inside of a regular def function. That would typically need to be await self.client.gather inside of an async function, since asynchronous=True makes that return a coroutine that needs to be awaited.

Can you expand on the desire for asynchronous=True there? The timeout wasn't working properly without it? FWIW, I don't see the infinite loops locally, even with asynchronous=True.

TomAugspurger avatar Sep 22 '22 11:09 TomAugspurger

looks incorrect inside of a regular def function.

Yes – thanks for pointing this out. This makes sense to me, looking further at the documentation.

Can you expand on the desire for asynchronous=True there?

I... really am just trying things to stop hitting an infinite loop. This got me to a timeout error when run in tests. Though, when running e2e in Pangeo-Forge, I definitely experience a runtime error complaining that I wasn't in an async def.

FWIW, I don't see the infinite loops locally, even with asynchronous=True.

Interesting! Do the tests pass for you? What is your environment like? I'm concerned that I'm hitting another architecture issue with ARM.

Thanks for taking a look at this, Tom.

alxmrs avatar Sep 22 '22 15:09 alxmrs

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @tvalentyn for label python.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

github-actions[bot] avatar Oct 10 '22 20:10 github-actions[bot]

I'll take a look at this today!

pabloem avatar Oct 17 '22 21:10 pabloem

oops it seems like there's a failure : /

pabloem avatar Oct 18 '22 16:10 pabloem

Run Python PreCommit

pabloem avatar Oct 19 '22 15:10 pabloem

Run Python PreCommit

pabloem avatar Oct 19 '22 18:10 pabloem

Run Python PreCommit

pabloem avatar Oct 19 '22 20:10 pabloem

the only test that is giving trouble should be easy to fix or skip for now. I'll review the PR as is and maybe we'll merge it soon

pabloem avatar Oct 20 '22 00:10 pabloem

Thanks Pablo. I think I can easily fix it – I'm having trouble reproducing the issue on my local environment due to my M1 woes.

alxmrs avatar Oct 20 '22 02:10 alxmrs

ok I've taken a look. The code, in fact, looks so clean that I'm very happy to merge.

pabloem avatar Oct 21 '22 05:10 pabloem

Run Python PreCommit

pabloem avatar Oct 21 '22 16:10 pabloem

Run Python PreCommit

pabloem avatar Oct 21 '22 17:10 pabloem

Run Python PreCommit

pabloem avatar Oct 21 '22 20:10 pabloem

ugggg haha can't get a passing precommit even though the tests are unrelated.

pabloem avatar Oct 24 '22 16:10 pabloem

Run Python PreCommit

pabloem avatar Oct 24 '22 16:10 pabloem

Run Python PreCommit

pabloem avatar Oct 24 '22 21:10 pabloem

sorry about the crazy flakiness. Something is going on recently with our precommits...

pabloem avatar Oct 24 '22 21:10 pabloem

ugggg incredibly enough, this issue reproduces only very occasionally in my environment.

pabloem avatar Oct 25 '22 02:10 pabloem

Run Python PreCommit

pabloem avatar Oct 25 '22 02:10 pabloem

given no changes anywhere close to the current flaky tests, I will merge.

pabloem avatar Oct 25 '22 16:10 pabloem

LGTM

pabloem avatar Oct 25 '22 16:10 pabloem

Wohoo!

alxmrs avatar Oct 25 '22 17:10 alxmrs

Thanks. Python Precommit showing following test failure: https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/6286/

apache_beam.runners.dask.dask_runner_test.DaskOptionsTest.test_parser_destinations__agree_with_dask_client

AssertionError: 'vpt_vp_arg3' not found in ['address', 'loop', 'timeout', 'set_as_default', 'scheduler_file', 'security', 'asynchronous', 'name', 'heartbeat_interval', 'serializers', 'deserializers', 'extensions', 'direct_to_workers', 'connection_limit', 'kwargs']

apache_beam.runners.dask.dask_runner_test.DaskRunnerRunPipelineTest.test_create

TypeError: __init__() got an unexpected keyword argument 'vpt_vp_arg3'

Abacn avatar Oct 25 '22 21:10 Abacn

thanks Yi for pointing this out

pabloem avatar Oct 26 '22 04:10 pabloem