delta
delta copied to clipboard
Adding a test parallelization strategy (#1128)
Description
Adds a test parallelization option to run Delta tests on different JVM addressing #1128:
- Implements a generic abstraction called
GroupingStrategy
and a default implementationSimpleHashStrategy
that uses a fixed number ofTest.Groups
each forked in its own JVM. - Provides two collection of settings, one that can be added to enable test parallelization, one to use the default strategy which will use 4 JVM unless separately specified by an environment variable called
DELTA_TEST_JVM_COUNT
- Adds those two settings to the core package
Resolves #1128 on local developer machine, but not on the CI Pipeline.
How was this patch tested?
Logging has been introduced using SBT logger so to get some statistics around the distribution of tests
sbt:delta-core> Test/testGrouping
[info] scalastyle using config /Users/edmondoporcu/Development/personal/delta/scalastyle-config.xml
[info] scalastyle Processed 151 file(s)
[info] scalastyle Found 0 errors
[info] scalastyle Found 0 warnings
[info] scalastyle Found 0 infos
[info] scalastyle Finished in 25 ms
[success] created output: /Users/edmondoporcu/Development/personal/delta/core/target
[info] Tests will be grouped in 4 groups
[info] Test group 0 contains 34 tests
[info] Test group 1 contains 31 tests
[info] Test group 2 contains 34 tests
[info] Test group 3 contains 30 tests
[success] Total time: 6 s, completed Jul 3, 2022 3:39:29 PM
Additionally running tops on my Macbook shows 4 JVM running as expected, all with the same Parent PID
Still need to be tested, needs to trigger build on CI/CD
Does this PR introduce any user-facing changes?
No
@tdas this works only locally, as the major blocker on CI is the worker type that GitHub provides (https://docs.github.com/en/actions/using-github-hosted-runners/about-github-hosted-runners#supported-runners-and-hardware-resources). 2 Core, 7GB of ram are really small to compile large scala projects.
I can use this PR as a basis in the future to allow a more sophisticated parallelization strategy, like so:
- using dependent jobs in Github, splitting the setup part from the test run, running the tests as X different jobs, each having its own runner node
- using hosted runners. One option I like a lot is the idea of using AWS Batch with Fargate, however it is not clear who would own the AWS account. An alternative would be to host the runner on Kubernetes
@edmondo1984 thank you for doing this and my apologies for not getting to this earlier (was down with covid). gonna review this today
- Implemented
- Automatically done by SBT, see below
- Not much of a difference because the hosted runner from GitHub are single core and 7GB of ram. We could set up a more advanced strategy, such as using a Kubernetes cluster on AWS or an AWS Batch based system.
This is the difference between the four cores parallelization and the sequential on a four core machine
➜ delta git:(issue-1128) ✗ tail -6 parallel.log
[info] Total number of tests run: 3579
[info] Suites: completed 129, aborted 0
[info] Tests: succeeded 3579, failed 0, canceled 0, ignored 880, pending 0
[info] All tests passed.
[info] Passed: Total 3597, Failed 0, Errors 0, Passed 3597, Ignored 880
[success] Total time: 2748 s (45:48), completed Jul 15, 2022 12:56:13 PM
➜ delta git:(issue-1128) ✗ tail -6 sequential.log
[info] Total number of tests run: 3579
[info] Suites: completed 129, aborted 0
[info] Tests: succeeded 3579, failed 0, canceled 0, ignored 880, pending 0
[info] All tests passed.
[info] Passed: Total 3597, Failed 0, Errors 0, Passed 3597, Ignored 880
[success] Total time: 6265 s (01:44:25), completed Jul 15, 2022 10:32:57 AM
Some useful metrics trying with 8 cores:
➜ delta git:(issue-1128) ✗ export TEST_RUN_IN_PARALLEL=TRUE
➜ delta git:(issue-1128) ✗ export TEST_PARALLELISM_COUNT=8
➜ delta git:(issue-1128) ✗ sbt test > highparallelism.log
➜ delta git:(issue-1128) ✗ tail -f highparallelism.log
[info] Test org.apache.spark.sql.delta.MergeIntoJavaSuite.checkUpdateAllAndInsertAll started
[info] Test org.apache.spark.sql.delta.MergeIntoJavaSuite.checkBasicApi started
[info] Test run finished: 0 failed, 0 ignored, 4 total, 6.978s
[info] ScalaTest
[info] Run completed in 31 minutes, 29 seconds.
[info] Total number of tests run: 3579
[info] Suites: completed 129, aborted 0
[info] Tests: succeeded 3579, failed 0, canceled 0, ignored 880, pending 0
[info] All tests passed.
[info] Passed: Total 3597, Failed 0, Errors 0, Passed 3597, Ignored 880
Hey @edmondo1984 . Thank you for answering my questions and all the updates. Most of the changes look good. except the need for two env variables. Just trying to minimize the configs needed. I think its fine to just use TEST_PARALLELISM_COUNT
.. which by default is set to 1
making it serial, and it becomes parallel when set to more than 1. What do you think?
Regarding parallelization on Github hosted runners, I thought the runners have 2 cores? Did you try with parallelism = 2? does it not help? And if not, i wonder why! :(
Using external infra like AWS is definitely possible but quite a bit challenging i guess. Another alternative that will still use github runners is to run N runners (per scala version) each running 1 / N split of the tests.
Hey @edmondo1984 . Thank you for answering my questions and all the updates. Most of the changes look good. except the need for two env variables. Just trying to minimize the configs needed. I think its fine to just use
TEST_PARALLELISM_COUNT
.. which by default is set to1
making it serial, and it becomes parallel when set to more than 1. What do you think?Regarding parallelization on Github hosted runners, I thought the runners have 2 cores? Did you try with parallelism = 2? does it not help? And if not, i wonder why! :(
Using external infra like AWS is definitely possible but quite a bit challenging i guess. Another alternative that will still use github runners is to run N runners (per scala version) each running 1 / N split of the tests.
With two cores it looks like there is no improvement with the simple hash strategy, I imagine if the longest tests they all end in the same bucket, there is no luck :(
I think larger workers are more user friendly than breaking down the tests into splits in the CI/CD pipeline, what do you think?
I think larger workers are more user friendly than breaking down the tests into splits in the CI/CD pipeline, what do you think?
I agree that larger workers (self hosted runners) is a better long term solution. But this will be quite a lot of work to integrate with a AWS-base solution and manage it. Lots of uncertainties. Thats the only reason why I am considering an ugly but predictable short term solution.
For this PR, I think if you update the env variable stuff (make it just 1), then I can merge this PR and we can continue brainstorming. For example, can we get a more detailed report of how long does each bucket run? how long does each testsuite run? maybe we can do a more explicit assignment of the larger testsuites into different buckets?
I think larger workers are more user friendly than breaking down the tests into splits in the CI/CD pipeline, what do you think?
I agree that larger workers (self hosted runners) is a better long term solution. But this will be quite a lot of work to integrate with a AWS-base solution and manage it. Lots of uncertainties. Thats the only reason why I am considering an ugly but predictable short term solution.
For this PR, I think if you update the env variable stuff (make it just 1), then I can merge this PR and we can continue brainstorming. For example, can we get a more detailed report of how long does each bucket run? how long does each testsuite run? maybe we can do a more explicit assignment of the larger testsuites into different buckets?
@tdas does this last change make sense to you? https://github.com/delta-io/delta/pull/1249/commits/d688779abc902c5d314e18e82e7333995aab8184
Yep. LGTM.
Hi @edmondo1984 - one thing I've noticed after this PR change is that, for example, when my parallelism is set to 20, the order of my sbt logs is all mixed up.
e.g. the below example says that all tests are from SchemaValidationSuite yet some are actually from MergeIntoScalaSuite
[info] SchemaValidationSuite:
[info] - merge without table alias (1 second, 144 milliseconds) <---- MergeIntoScalaSuite
[info] - write - add a column concurrently (1 second, 364 milliseconds)
[info] - disallow to change schema after starting a streaming query (2 seconds, 499 milliseconds)
[info] - pre-resolved exprs: should work in all expressions in absence of duplicate refs (1 second, 318 milliseconds) <---- MergeIntoScalaSuite
[info] - pre-resolved exprs: negative cases with refs resolved to wrong Dataframes (395 milliseconds) <---- MergeIntoScalaSuite
[info] - write - remove a column concurrently (1 second, 218 milliseconds)
Is this a known problem? Do you know of any solution?
Hi @edmondo1984 - one thing I've noticed after this PR change is that, for example, when my parallelism is set to 20, the order of my sbt logs is all mixed up.
e.g. the below example says that all tests are from SchemaValidationSuite yet some are actually from MergeIntoScalaSuite
[info] SchemaValidationSuite: [info] - merge without table alias (1 second, 144 milliseconds) <---- MergeIntoScalaSuite [info] - write - add a column concurrently (1 second, 364 milliseconds) [info] - disallow to change schema after starting a streaming query (2 seconds, 499 milliseconds) [info] - pre-resolved exprs: should work in all expressions in absence of duplicate refs (1 second, 318 milliseconds) <---- MergeIntoScalaSuite [info] - pre-resolved exprs: negative cases with refs resolved to wrong Dataframes (395 milliseconds) <---- MergeIntoScalaSuite [info] - write - remove a column concurrently (1 second, 218 milliseconds)
Is this a known problem? Do you know of any solution?
@scottsand-db
Logging frameworks have a two tier architecture where events are placed in a queue, and then emitted on the screen by a consumer of that queue. When the queue is within a single JVM, the code can for example collect all logs for one test before emitting to the console, even if tests are run in parallel. When you fork, this is not possible anymore, and you would have to use inter-process communication primitives or a mutex
You can see here how a separate Test logger is created PerTest, and how they are flushed https://github.com/sbt/sbt/blob/1f29c9053cdc52128fa45cfe0b22e9a4d085cc1b/testing/src/main/scala/sbt/internal/testing/TestLogger.scala#L31
I see. @edmondo1984 - do you know of any workaround for this?
I see. @edmondo1984 - do you know of any workaround for this?
- We might try to pipe the forked processes and then merge their output sequentially. However, we need to be careful because it will seems that all tests executed by forked process 1 were run before forked process 2, etc.
- We might try to implement a custom logger that flushes to disk and report the results by giving priority to the test cases which started first
Happy to explore both of these options