trino
trino copied to clipboard
Scale table writers per task based on throughput
Description
Currently, the task_writer_count
per worker defaults to 1 to avoid many small files in some cases. This PR attempts to make it adaptive based on parameters like physicalWrittenBytes and buffer size, which is similar to what we are doing for scale_writers
in ScaledWriterScheduler
. This feature is behind a new flag scale_task_writers
. Although, I don't like having two flags, but it gives more control.
-
scale_writers
: To scale out writers by scheduling them on different workers based on throughput. -
scale_task_writers
: To scale out local parallel table writer jobs per worker based on throughput.
PS: For now, this only works for the unpartitioned table target, and for the partitioned target, I'm planning to create a separate PR.
Initial Results (~6x faster):
Before:
trino:gaurav_test_1> Insert into gaurav_test_1.lineitem_1 select * FROM tpch_sf300_orc_part.lineitem;
INSERT: 1799989091 rows
Query 20220706_113152_00007_2df8m, FINISHED, 7 nodes
Splits: 2,758 total, 2,758 done (100.00%)
26:25 [1.8B rows, 47.5GB] [1.14M rows/s, 30.7MB/s]
After:
trino:gaurav_test_1> set session scale_task_writers=true;
SET SESSION
trino:gaurav_test_1> Insert into gaurav_test_1.lineitem_1 select * FROM tpch_sf300_orc_part.lineitem;
INSERT: 1799989091 rows
Query 20220706_115833_00009_2df8m, FINISHED, 7 nodes
Splits: 2,800 total, 2,800 done (100.00%)
4:09 [1.8B rows, 47.5GB] [7.23M rows/s, 195MB/s]
Is this change a fix, improvement, new feature, refactoring, or other?
improvement
Is this a change to the core query engine, a connector, client library, or the SPI interfaces? (be specific)
core query engine
How would you describe this change to a non-technical end user or system administrator?
Related issues, pull requests, and links
Documentation
( ) No documentation is needed. ( ) Sufficient documentation is included in this PR. ( ) Documentation PR is available with #prnumber. ( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
( ) No release notes entries required. ( ) Release notes entries required with the following suggested text:
# Section
* Fix some things. ({issue}`issuenumber`)
@gaurav8297 There are some test failures
All of these test failures seem to be unrelated.
ci / hive-tests (config-empty
and ci / hive-tests (config-hdp3)
are failing due to this issue. https://github.com/trinodb/trino/issues/13270
ci / test (plugin/trino-hive)
is failing due to OOM and ci / test (plugin/trino-elasticsearch)
because of container failure.
All of these issues seem unrelated to this change.
cc @arhimondr @raunaqmorarka
This experiment was performed to find the best default value of task.min-scaled-writer-count
.
When the value is 8:
trino:gaurav_test_1> Insert into gaurav_test_1.lineitem_current select * FROM tpch_sf300_orc_part.lineitem;
INSERT: 1799989091 rows
Query 20220722_081653_00003_k3qm4, FINISHED, 7 nodes
Splits: 2,800 total, 2,800 done (100.00%)
4:05 [1.8B rows, 47.5GB] [7.03M rows/s, 190MB/s]
When the value is 4:
trino:gaurav_test_1> Insert into gaurav_test_1.lineitem_current select * FROM tpch_sf300_orc_part.lineitem;
INSERT: 1799989091 rows
Query 20220722_083038_00002_dscnm, FINISHED, 7 nodes
Splits: 2,776 total, 2,776 done (100.00%)
6:26 [1.8B rows, 47.5GB] [4.71M rows/s, 128MB/s]
So the difference is somewhere around ~2.5mins. That's why we are going with 8 as default.
@sopel39 PTAL It lgtm % comments about docs
Test failures are unrelated
Test failures are unrelated
Please rebase to latest mater, the CI issues should be resolved now
For q2 and q4 the initial value of task_writer_count
is 16. Therefore, they are a bit slower with scaling because it'll increase tasks gradually.
I think now there is race condition between scaling on task level (adding tasks) and scaling on local level. ScaledWriterScheduler
should probably only scale tasks when local scaling is already done. Also new task created by ScaledWriterScheduler
will again start scaling from 1 writer, which might mislead ScaledWriterScheduler
(e.g. 1 existing writer task with e.g. 16 writers + 1 new task with 1 writer)
@sopel39 @raunaqmorarka PTAL again
- I've fixed the race condition. Now, worker-level scaling will only happen when local tasks are fully scaled.
- Instead of mixing
task.writer-count
, created separate property till which we scale up to whentaskScaleWriters
is enabled.task.scale-writers.max-writer-count
New benchmarks:
trino:gaurav_test_1> Insert into gaurav_test_1.lineitem_current select * FROM tpch_sf300_orc_part.lineitem;
INSERT: 1799989091 rows
Query 20220728_215651_00000_wynas, FINISHED, 7 nodes
Splits: 2,800 total, 2,800 done (100.00%)
6:06 [1.8B rows, 47.5GB] [4.92M rows/s, 133MB/s]
It is ~2 mins slower after scaling-related changes. But still faster than the current one
New benchmarks comparison report:
Benchmarks comparison-task-scale-writers.pdf
For q2 and q4 the initial value of task_writer_count is 8. Therefore, they are a bit slower with scaling because it'll increase tasks gradually.
@dain do you ack adding local writer scaling (the meat of this change)?
Simulation:
config:
MAX_WORKERS = 20
TOTAL_DATA_SIZE = 4000
MAX_TASK_WRITER_COUNT = 8
MIN_WRITER_SIZE = 32
Results:
Total Writers: 49
Small Writers: 15
Large Writers: 34
Small/Large Ratio: 0.4411764705882353
cc @sopel39
benchmarks (4x better): insert_unpart_benchmark.pdf
@gaurav8297 We need to have a plan for config properties when we add local scaling for partitioned inserts