beam icon indicating copy to clipboard operation
beam copied to clipboard

[BEAM-14534] Allow users to compress values being shuffled in dataflow

Open steveniemitz opened this issue 3 years ago • 15 comments

This PR adds a new interface, ShuffleCompressor, which allows users to plug in compression for values just before they're written to shuffle.

The interface is a little odd, we originally had exposed it as simply wrapping Input/Output streams. However, this interface is much more efficient:

  • In the compression path, the implementor can use the RandomAccessData as both a buffer pool (using scratch space at the end for example) and also compress the data "in place" if possible. Additionally, many compression algorithms can operate more efficiently on a fixed-sized buffer rather than having to deal with an unknown amount of input data.
  • In the decompression path, using ByteBuffer allows efficiently slicing the input and output buffers as well. Ideally I think this interface would use ByteString as well, but didn't want to expose the shaded protobuf library in the public API.

Additionally this also changes most places in the shuffle IO path to use ByteString rather than byte[]. This allows efficiently "slicing" the buffer received from the shuffle reader, removing a significant number of byte[] copies.

Internally we have an implementation of ShuffleCompressor that uses zstd, and we see a significant benefit from using it. For example, at level 3 (the default), a simple read -> reshuffle -> do something pipeline sees a 50% reduction in data shuffled for our representative test datasets.

R: @lukecwik


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • [x] Choose reviewer(s) and mention them in a comment (R: @username).
  • [x] Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • [ ] Update CHANGES.md with noteworthy changes.
  • [x] If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels Python tests Java tests

See CI.md for more information about GitHub Actions CI.

steveniemitz avatar May 31 '22 17:05 steveniemitz

Can one of the admins verify this patch?

asf-ci avatar May 31 '22 17:05 asf-ci

Can one of the admins verify this patch?

asf-ci avatar May 31 '22 17:05 asf-ci

Can one of the admins verify this patch?

asf-ci avatar May 31 '22 17:05 asf-ci

Can one of the admins verify this patch?

asf-ci avatar May 31 '22 17:05 asf-ci

Can one of the admins verify this patch?

asf-ci avatar May 31 '22 17:05 asf-ci

I see that BEAM-14534 does specifically mention using a wrapping coder to compress the data and I think it would still be a better solution since it would allow you to:

  • plugin compression into more places such as compressing the shuffle key, user state and side inputs
  • works for all runners and not just Dataflow runner v1. (Note that this solution won't work for any portable runners like Dataflow runner v2 since you aren't editting the shuffle code)
  • the compression can be configured per GroupByKey since in some places it won't make sense since the values are not easily compressible

You can also create a PipelineVisitor that walks all transforms and replaces the KV<KeyCoder, ValueCoder> with KV<KeyCoder, CompressingCoder<ValueCoder>> so that users don't need to do this themselves which would give similar behavior as specifying the shuffle compressor pipeline option.

The other part of the change makes sense to reduce byte[] copies by using ByteString.

CC: @tudorm

lukecwik avatar Jun 01 '22 15:06 lukecwik

The other part of the change makes sense to reduce byte[] copies by using ByteString.

CC: @tudorm

Maybe I'll pull the ByteString refactoring stuff out into another review just to make this easier? Do you have any particular issues with it using ByteString there?

The downsides with using Output/Input stream are really too big to ignore here, the performance differences are orders of magnitude in our tests. The main problem is that most "stream" compressor implementations are designed to compress a large amount of data, but in this case we're usually only compressing a few 100-1KB. It makes the overhead from creating/destroying the compressor streams very high (comparatively at least). We ran into this problem both with deflate and zstd, and its one of the reasons we ended up with an interface like this. If its really a non-starter putting this on OSS with a similar interface that's fine though, we can continue maintaining this in our own fork for the time being.

The PipelineVisitor idea is interesting, although I'm skeptical how well it'd work in practice. For example with a Combine the coder for the data being shuffled is the accumulator coder, not the value coder of the KV. I bet you'd need a bunch of special cases to pick the "right" coder to wrap for various transforms.

steveniemitz avatar Jun 01 '22 17:06 steveniemitz

I opened PR #17802 with just the ByteString changes if you want to get those first.

steveniemitz avatar Jun 01 '22 17:06 steveniemitz

The other part of the change makes sense to reduce byte[] copies by using ByteString. CC: @tudorm

Maybe I'll pull the ByteString refactoring stuff out into another review just to make this easier? Do you have any particular issues with it using ByteString there?

The downsides with using Output/Input stream are really too big to ignore here, the performance differences are orders of magnitude in our tests. The main problem is that most "stream" compressor implementations are designed to compress a large amount of data, but in this case we're usually only compressing a few 100-1KB. It makes the overhead from creating/destroying the compressor streams very high (comparatively at least). We ran into this problem both with deflate and zstd, and its one of the reasons we ended up with an interface like this. If its really a non-starter putting this on OSS with a similar interface that's fine though, we can continue maintaining this in our own fork for the time being.

The PipelineVisitor idea is interesting, although I'm skeptical how well it'd work in practice. For example with a Combine the coder for the data being shuffled is the accumulator coder, not the value coder of the KV. I bet you'd need a bunch of special cases to pick the "right" coder to wrap for various transforms.

I'm happy with using ByteStrings as it reduces copies and will review and merge that and then we can revisit this.

I was unaware of the additional overhead for using a stream compressor/decompressor vs one that worked on fixed length byte strings. I'm curious to learn more about it though as I might see something that wasn't considered before and also with the migration to Dataflow runner v2 you'll lose the ability to have this option have an effect unless you go the coder route which will have additional benefits since you'll reduce the amount of data being sent over gRPC between the SDK harness and the runner and also how much the runner materializes in shuffle in addition to those other use cases such as side inputs/user state/...

Using the PipelineVisitor should work quite well as since you really only need to handle GBK, CoGBK and Combine and that would give you everything that is optimized across runners today to my knowledge.

lukecwik avatar Jun 03 '22 20:06 lukecwik

Is this blocked on https://github.com/apache/beam/pull/17802 ?

aaltay avatar Jun 18 '22 03:06 aaltay

Is this blocked on #17802 ?

Yes, it is the precursor.

lukecwik avatar Jun 22 '22 22:06 lukecwik

Have you tried the pipeline visitor using a SnappyCoder that wraps existing encodings?

lukecwik avatar Jul 13 '22 19:07 lukecwik

Have you tried the pipeline visitor using a SnappyCoder that wraps existing encodings?

now that we have compression working well end-to-end in our internal fork (and a good baseline for performance), I was going to re-evaluate options for making a "coder wrapper" that can do the compression/decompression. I really do agree that a coder-wrapper would be much cleaner here, I'm just worried we won't be able to reach the same level of performance with that interface.

steveniemitz avatar Jul 14 '22 21:07 steveniemitz

Have you tried the pipeline visitor using a SnappyCoder that wraps existing encodings?

now that we have compression working well end-to-end in our internal fork (and a good baseline for performance), I was going to re-evaluate options for making a "coder wrapper" that can do the compression/decompression. I really do agree that a coder-wrapper would be much cleaner here, I'm just worried we won't be able to reach the same level of performance with that interface.

Its not just about being cleaner, its about the value being added to other runners and for Dataflow Prime where you wouldn't have access to be able to edit the shuffle code.

lukecwik avatar Jul 14 '22 21:07 lukecwik

Any updates on using the pipeline visitor vs this approach?

lukecwik avatar Aug 04 '22 21:08 lukecwik

I just stumbled upon this PR while searching for the ZstdCoder PR. I had been thinking of writing a PipelineVisitor to do this, so it's a fun surprise to see it suggested here as well. @steveniemitz I would be happy to pick up the PipelineVisitor feature if you have other priorities.

sjvanrossum avatar Nov 17 '22 16:11 sjvanrossum

I just stumbled upon this PR while searching for the ZstdCoder PR. I had been thinking of writing a PipelineVisitor to do this, so it's a fun surprise to see it suggested here as well. @steveniemitz I would be happy to pick up the PipelineVisitor feature if you have other priorities.

It would make sense to have an approved and/or denied list of coders that we should compress or not. Things that are already small like int/double/... would make sense to have in the denied list. Making it user configurable also would be nice. Lots of options here.

lukecwik avatar Nov 17 '22 23:11 lukecwik

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

github-actions[bot] avatar Jun 11 '23 12:06 github-actions[bot]

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

github-actions[bot] avatar Jun 19 '23 12:06 github-actions[bot]