trino icon indicating copy to clipboard operation
trino copied to clipboard

Scale table writers per task based on throughput

Open gaurav8297 opened this issue 2 years ago • 12 comments

Description

Currently, the task_writer_count per worker defaults to 1 to avoid many small files in some cases. This PR attempts to make it adaptive based on parameters like physicalWrittenBytes and buffer size, which is similar to what we are doing for scale_writers in ScaledWriterScheduler. This feature is behind a new flag scale_task_writers. Although, I don't like having two flags, but it gives more control.

  • scale_writers: To scale out writers by scheduling them on different workers based on throughput.
  • scale_task_writers: To scale out local parallel table writer jobs per worker based on throughput.

PS: For now, this only works for the unpartitioned table target, and for the partitioned target, I'm planning to create a separate PR.

Initial Results (~6x faster):

Before:

trino:gaurav_test_1> Insert into gaurav_test_1.lineitem_1 select * FROM tpch_sf300_orc_part.lineitem;
INSERT: 1799989091 rows

Query 20220706_113152_00007_2df8m, FINISHED, 7 nodes
Splits: 2,758 total, 2,758 done (100.00%)
26:25 [1.8B rows, 47.5GB] [1.14M rows/s, 30.7MB/s]

After:

trino:gaurav_test_1> set session scale_task_writers=true;
SET SESSION
trino:gaurav_test_1> Insert into gaurav_test_1.lineitem_1 select * FROM tpch_sf300_orc_part.lineitem;
INSERT: 1799989091 rows

Query 20220706_115833_00009_2df8m, FINISHED, 7 nodes
Splits: 2,800 total, 2,800 done (100.00%)
4:09 [1.8B rows, 47.5GB] [7.23M rows/s, 195MB/s]

Is this change a fix, improvement, new feature, refactoring, or other?

improvement

Is this a change to the core query engine, a connector, client library, or the SPI interfaces? (be specific)

core query engine

How would you describe this change to a non-technical end user or system administrator?

Related issues, pull requests, and links

Documentation

( ) No documentation is needed. ( ) Sufficient documentation is included in this PR. ( ) Documentation PR is available with #prnumber. ( ) Documentation issue #issuenumber is filed, and can be handled later.

Release notes

( ) No release notes entries required. ( ) Release notes entries required with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

gaurav8297 avatar Jul 07 '22 01:07 gaurav8297

@gaurav8297 There are some test failures

arhimondr avatar Jul 20 '22 13:07 arhimondr

All of these test failures seem to be unrelated.

gaurav8297 avatar Jul 20 '22 15:07 gaurav8297

ci / hive-tests (config-empty and ci / hive-tests (config-hdp3) are failing due to this issue. https://github.com/trinodb/trino/issues/13270

ci / test (plugin/trino-hive) is failing due to OOM and ci / test (plugin/trino-elasticsearch) because of container failure.

All of these issues seem unrelated to this change.

cc @arhimondr @raunaqmorarka

gaurav8297 avatar Jul 21 '22 07:07 gaurav8297

This experiment was performed to find the best default value of task.min-scaled-writer-count.

When the value is 8:

trino:gaurav_test_1> Insert into gaurav_test_1.lineitem_current select * FROM tpch_sf300_orc_part.lineitem;
INSERT: 1799989091 rows

Query 20220722_081653_00003_k3qm4, FINISHED, 7 nodes
Splits: 2,800 total, 2,800 done (100.00%)
4:05 [1.8B rows, 47.5GB] [7.03M rows/s, 190MB/s]

When the value is 4:

trino:gaurav_test_1> Insert into gaurav_test_1.lineitem_current select * FROM tpch_sf300_orc_part.lineitem;
INSERT: 1799989091 rows

Query 20220722_083038_00002_dscnm, FINISHED, 7 nodes
Splits: 2,776 total, 2,776 done (100.00%)
6:26 [1.8B rows, 47.5GB] [4.71M rows/s, 128MB/s]

So the difference is somewhere around ~2.5mins. That's why we are going with 8 as default.

gaurav8297 avatar Jul 22 '22 09:07 gaurav8297

@sopel39 PTAL It lgtm % comments about docs

raunaqmorarka avatar Jul 22 '22 09:07 raunaqmorarka

Test failures are unrelated

gaurav8297 avatar Jul 22 '22 15:07 gaurav8297

Test failures are unrelated

Please rebase to latest mater, the CI issues should be resolved now

raunaqmorarka avatar Jul 23 '22 05:07 raunaqmorarka

benchmark-scale-writers.pdf

For q2 and q4 the initial value of task_writer_count is 16. Therefore, they are a bit slower with scaling because it'll increase tasks gradually.

gaurav8297 avatar Jul 25 '22 15:07 gaurav8297

I think now there is race condition between scaling on task level (adding tasks) and scaling on local level. ScaledWriterScheduler should probably only scale tasks when local scaling is already done. Also new task created by ScaledWriterScheduler will again start scaling from 1 writer, which might mislead ScaledWriterScheduler (e.g. 1 existing writer task with e.g. 16 writers + 1 new task with 1 writer)

sopel39 avatar Jul 25 '22 15:07 sopel39

@sopel39 @raunaqmorarka PTAL again

  • I've fixed the race condition. Now, worker-level scaling will only happen when local tasks are fully scaled.
  • Instead of mixing task.writer-count, created separate property till which we scale up to when taskScaleWriters is enabled. task.scale-writers.max-writer-count

gaurav8297 avatar Jul 27 '22 22:07 gaurav8297

New benchmarks:

trino:gaurav_test_1> Insert into gaurav_test_1.lineitem_current select * FROM tpch_sf300_orc_part.lineitem;
INSERT: 1799989091 rows

Query 20220728_215651_00000_wynas, FINISHED, 7 nodes
Splits: 2,800 total, 2,800 done (100.00%)
6:06 [1.8B rows, 47.5GB] [4.92M rows/s, 133MB/s]

It is ~2 mins slower after scaling-related changes. But still faster than the current one

gaurav8297 avatar Jul 28 '22 22:07 gaurav8297

New benchmarks comparison report:

Benchmarks comparison-task-scale-writers.pdf

For q2 and q4 the initial value of task_writer_count is 8. Therefore, they are a bit slower with scaling because it'll increase tasks gradually.

gaurav8297 avatar Jul 29 '22 11:07 gaurav8297

@dain do you ack adding local writer scaling (the meat of this change)?

sopel39 avatar Aug 19 '22 12:08 sopel39

Simulation:

config:

MAX_WORKERS = 20
TOTAL_DATA_SIZE = 4000
MAX_TASK_WRITER_COUNT = 8
MIN_WRITER_SIZE = 32

Results:

Total Writers: 49
Small Writers: 15
Large Writers: 34
Small/Large Ratio: 0.4411764705882353

cc @sopel39

gaurav8297 avatar Aug 26 '22 10:08 gaurav8297

benchmarks (4x better): insert_unpart_benchmark.pdf

gaurav8297 avatar Aug 28 '22 22:08 gaurav8297

@gaurav8297 We need to have a plan for config properties when we add local scaling for partitioned inserts

sopel39 avatar Aug 29 '22 14:08 sopel39