velox
velox copied to clipboard
Add TaskOutput Operator for shuffling with a single destination
Summary: Currently we use the same logic whether a PartitionedOutput operator has 1 destination (OutputBuffer) or many.
With the BatchVectorSerializer we can take advantage of the fact data is going to a single destination to maintain encodings in the serialized data reducing the amount of data shuffled. In order to use this we need to change the way input RowVectors are divided into pages. With the IterativeVectorSerializer we can add rows one by one until a byte size or row number limit is hit. With BatchVectorSerializer the size of individual rows is no longer independent, and neither is the serialization, so we need to estimate the size of batches of rows (splits) from the same RowVector and serialize these batches in a single request.
This fundamentally changes the strategy for preparing rows to be serialized, as well as the process of serializing them such that I thought splitting it into a separate Operator, which I've called TaskOutput (borrowed from Presto). This operator is overall much simpler than PartitionedOutput with only a small amount of duplicated code.
The general flow is:
- addInput constructs the output from the input based on the outputChannels
- the output is divided into splits by recursively splitting it in half until the estimated size of each split is less than the limit
- getOutput serializes one batch at a time to the OutputBuffer until all the input is written out or the Buffer blocks
Note that I did not implement the row limit from PartitionedOutput. I assumed that that was added to make sure we don't buffer many rows when the rows are really small, which could delay processing (we could be processing rather than waiting for more). Since TaskOutput is guaranteed to output after each input there's no need to worry about waiting too long buffering.
Differential Revision: D53717840
Deploy Preview for meta-velox canceled.
Name | Link |
---|---|
Latest commit | 0763d624d288c5ed9960f03e371fbca8766ee6cc |
Latest deploy log | https://app.netlify.com/sites/meta-velox/deploys/665e1408cd97a6000892346e |
This pull request was exported from Phabricator. Differential Revision: D53717840
This pull request was exported from Phabricator. Differential Revision: D53717840
This pull request was exported from Phabricator. Differential Revision: D53717840
This pull request was exported from Phabricator. Differential Revision: D53717840
This pull request was exported from Phabricator. Differential Revision: D53717840
This pull request was exported from Phabricator. Differential Revision: D53717840
This pull request was exported from Phabricator. Differential Revision: D53717840
This pull request was exported from Phabricator. Differential Revision: D53717840
This pull request was exported from Phabricator. Differential Revision: D53717840
This pull request was exported from Phabricator. Differential Revision: D53717840
This pull request has been automatically marked as stale because it has not had recent activity. If you'd still like this PR merged, please comment on the PR, make sure you've addressed reviewer comments, and rebase on the latest main. Thank you for your contributions!