delta icon indicating copy to clipboard operation
delta copied to clipboard

Adding a test parallelization strategy (#1128)

Open edmondop opened this issue 2 years ago • 9 comments

Description

Adds a test parallelization option to run Delta tests on different JVM addressing #1128:

  • Implements a generic abstraction called GroupingStrategy and a default implementation SimpleHashStrategy that uses a fixed number of Test.Groups each forked in its own JVM.
  • Provides two collection of settings, one that can be added to enable test parallelization, one to use the default strategy which will use 4 JVM unless separately specified by an environment variable called DELTA_TEST_JVM_COUNT
  • Adds those two settings to the core package

Resolves #1128 on local developer machine, but not on the CI Pipeline.

How was this patch tested?

Logging has been introduced using SBT logger so to get some statistics around the distribution of tests

sbt:delta-core> Test/testGrouping
[info] scalastyle using config /Users/edmondoporcu/Development/personal/delta/scalastyle-config.xml
[info] scalastyle Processed 151 file(s)
[info] scalastyle Found 0 errors
[info] scalastyle Found 0 warnings
[info] scalastyle Found 0 infos
[info] scalastyle Finished in 25 ms
[success] created output: /Users/edmondoporcu/Development/personal/delta/core/target
[info] Tests will be grouped in 4 groups
[info] Test group 0 contains 34 tests
[info] Test group 1 contains 31 tests
[info] Test group 2 contains 34 tests
[info] Test group 3 contains 30 tests
[success] Total time: 6 s, completed Jul 3, 2022 3:39:29 PM

Additionally running tops on my Macbook shows 4 JVM running as expected, all with the same Parent PID Screen Shot 2022-07-03 at 3 40 53 PM

Still need to be tested, needs to trigger build on CI/CD

Does this PR introduce any user-facing changes?

No

edmondop avatar Jul 03 '22 19:07 edmondop

@tdas this works only locally, as the major blocker on CI is the worker type that GitHub provides (https://docs.github.com/en/actions/using-github-hosted-runners/about-github-hosted-runners#supported-runners-and-hardware-resources). 2 Core, 7GB of ram are really small to compile large scala projects.

I can use this PR as a basis in the future to allow a more sophisticated parallelization strategy, like so:

  • using dependent jobs in Github, splitting the setup part from the test run, running the tests as X different jobs, each having its own runner node
  • using hosted runners. One option I like a lot is the idea of using AWS Batch with Fargate, however it is not clear who would own the AWS account. An alternative would be to host the runner on Kubernetes

edmondop avatar Jul 04 '22 03:07 edmondop

@edmondo1984 thank you for doing this and my apologies for not getting to this earlier (was down with covid). gonna review this today

tdas avatar Jul 13 '22 15:07 tdas

  1. Implemented
  2. Automatically done by SBT, see below
  3. Not much of a difference because the hosted runner from GitHub are single core and 7GB of ram. We could set up a more advanced strategy, such as using a Kubernetes cluster on AWS or an AWS Batch based system.

This is the difference between the four cores parallelization and the sequential on a four core machine

➜  delta git:(issue-1128) ✗ tail -6 parallel.log
[info] Total number of tests run: 3579
[info] Suites: completed 129, aborted 0
[info] Tests: succeeded 3579, failed 0, canceled 0, ignored 880, pending 0
[info] All tests passed.
[info] Passed: Total 3597, Failed 0, Errors 0, Passed 3597, Ignored 880
[success] Total time: 2748 s (45:48), completed Jul 15, 2022 12:56:13 PM
➜  delta git:(issue-1128) ✗ tail -6 sequential.log
[info] Total number of tests run: 3579
[info] Suites: completed 129, aborted 0
[info] Tests: succeeded 3579, failed 0, canceled 0, ignored 880, pending 0
[info] All tests passed.
[info] Passed: Total 3597, Failed 0, Errors 0, Passed 3597, Ignored 880
[success] Total time: 6265 s (01:44:25), completed Jul 15, 2022 10:32:57 AM

edmondop avatar Jul 13 '22 15:07 edmondop

Some useful metrics trying with 8 cores:

➜  delta git:(issue-1128) ✗ export TEST_RUN_IN_PARALLEL=TRUE
➜  delta git:(issue-1128) ✗ export TEST_PARALLELISM_COUNT=8
➜  delta git:(issue-1128) ✗ sbt test > highparallelism.log
➜  delta git:(issue-1128) ✗ tail -f highparallelism.log
[info] Test org.apache.spark.sql.delta.MergeIntoJavaSuite.checkUpdateAllAndInsertAll started
[info] Test org.apache.spark.sql.delta.MergeIntoJavaSuite.checkBasicApi started
[info] Test run finished: 0 failed, 0 ignored, 4 total, 6.978s
[info] ScalaTest
[info] Run completed in 31 minutes, 29 seconds.
[info] Total number of tests run: 3579
[info] Suites: completed 129, aborted 0
[info] Tests: succeeded 3579, failed 0, canceled 0, ignored 880, pending 0
[info] All tests passed.
[info] Passed: Total 3597, Failed 0, Errors 0, Passed 3597, Ignored 880

edmondop avatar Jul 18 '22 19:07 edmondop

Hey @edmondo1984 . Thank you for answering my questions and all the updates. Most of the changes look good. except the need for two env variables. Just trying to minimize the configs needed. I think its fine to just use TEST_PARALLELISM_COUNT .. which by default is set to 1 making it serial, and it becomes parallel when set to more than 1. What do you think?

Regarding parallelization on Github hosted runners, I thought the runners have 2 cores? Did you try with parallelism = 2? does it not help? And if not, i wonder why! :(

Using external infra like AWS is definitely possible but quite a bit challenging i guess. Another alternative that will still use github runners is to run N runners (per scala version) each running 1 / N split of the tests.

tdas avatar Jul 19 '22 15:07 tdas

Hey @edmondo1984 . Thank you for answering my questions and all the updates. Most of the changes look good. except the need for two env variables. Just trying to minimize the configs needed. I think its fine to just use TEST_PARALLELISM_COUNT .. which by default is set to 1 making it serial, and it becomes parallel when set to more than 1. What do you think?

Regarding parallelization on Github hosted runners, I thought the runners have 2 cores? Did you try with parallelism = 2? does it not help? And if not, i wonder why! :(

Using external infra like AWS is definitely possible but quite a bit challenging i guess. Another alternative that will still use github runners is to run N runners (per scala version) each running 1 / N split of the tests.

With two cores it looks like there is no improvement with the simple hash strategy, I imagine if the longest tests they all end in the same bucket, there is no luck :(

I think larger workers are more user friendly than breaking down the tests into splits in the CI/CD pipeline, what do you think?

edmondop avatar Jul 19 '22 20:07 edmondop

I think larger workers are more user friendly than breaking down the tests into splits in the CI/CD pipeline, what do you think?

I agree that larger workers (self hosted runners) is a better long term solution. But this will be quite a lot of work to integrate with a AWS-base solution and manage it. Lots of uncertainties. Thats the only reason why I am considering an ugly but predictable short term solution.

For this PR, I think if you update the env variable stuff (make it just 1), then I can merge this PR and we can continue brainstorming. For example, can we get a more detailed report of how long does each bucket run? how long does each testsuite run? maybe we can do a more explicit assignment of the larger testsuites into different buckets?

tdas avatar Jul 19 '22 21:07 tdas

I think larger workers are more user friendly than breaking down the tests into splits in the CI/CD pipeline, what do you think?

I agree that larger workers (self hosted runners) is a better long term solution. But this will be quite a lot of work to integrate with a AWS-base solution and manage it. Lots of uncertainties. Thats the only reason why I am considering an ugly but predictable short term solution.

For this PR, I think if you update the env variable stuff (make it just 1), then I can merge this PR and we can continue brainstorming. For example, can we get a more detailed report of how long does each bucket run? how long does each testsuite run? maybe we can do a more explicit assignment of the larger testsuites into different buckets?

@tdas does this last change make sense to you? https://github.com/delta-io/delta/pull/1249/commits/d688779abc902c5d314e18e82e7333995aab8184

edmondop avatar Jul 21 '22 23:07 edmondop

Yep. LGTM.

tdas avatar Jul 22 '22 20:07 tdas

Hi @edmondo1984 - one thing I've noticed after this PR change is that, for example, when my parallelism is set to 20, the order of my sbt logs is all mixed up.

e.g. the below example says that all tests are from SchemaValidationSuite yet some are actually from MergeIntoScalaSuite

[info] SchemaValidationSuite:
[info] - merge without table alias (1 second, 144 milliseconds) <---- MergeIntoScalaSuite
[info] - write - add a column concurrently (1 second, 364 milliseconds)
[info] - disallow to change schema after starting a streaming query (2 seconds, 499 milliseconds)
[info] - pre-resolved exprs: should work in all expressions in absence of duplicate refs (1 second, 318 milliseconds) <---- MergeIntoScalaSuite
[info] - pre-resolved exprs: negative cases with refs resolved to wrong Dataframes (395 milliseconds) <---- MergeIntoScalaSuite
[info] - write - remove a column concurrently (1 second, 218 milliseconds)

Is this a known problem? Do you know of any solution?

scottsand-db avatar Aug 06 '22 02:08 scottsand-db

Hi @edmondo1984 - one thing I've noticed after this PR change is that, for example, when my parallelism is set to 20, the order of my sbt logs is all mixed up.

e.g. the below example says that all tests are from SchemaValidationSuite yet some are actually from MergeIntoScalaSuite

[info] SchemaValidationSuite:
[info] - merge without table alias (1 second, 144 milliseconds) <---- MergeIntoScalaSuite
[info] - write - add a column concurrently (1 second, 364 milliseconds)
[info] - disallow to change schema after starting a streaming query (2 seconds, 499 milliseconds)
[info] - pre-resolved exprs: should work in all expressions in absence of duplicate refs (1 second, 318 milliseconds) <---- MergeIntoScalaSuite
[info] - pre-resolved exprs: negative cases with refs resolved to wrong Dataframes (395 milliseconds) <---- MergeIntoScalaSuite
[info] - write - remove a column concurrently (1 second, 218 milliseconds)

Is this a known problem? Do you know of any solution?

@scottsand-db

Logging frameworks have a two tier architecture where events are placed in a queue, and then emitted on the screen by a consumer of that queue. When the queue is within a single JVM, the code can for example collect all logs for one test before emitting to the console, even if tests are run in parallel. When you fork, this is not possible anymore, and you would have to use inter-process communication primitives or a mutex

You can see here how a separate Test logger is created PerTest, and how they are flushed https://github.com/sbt/sbt/blob/1f29c9053cdc52128fa45cfe0b22e9a4d085cc1b/testing/src/main/scala/sbt/internal/testing/TestLogger.scala#L31

edmondop avatar Aug 09 '22 21:08 edmondop

I see. @edmondo1984 - do you know of any workaround for this?

scottsand-db avatar Aug 10 '22 03:08 scottsand-db

I see. @edmondo1984 - do you know of any workaround for this?

  • We might try to pipe the forked processes and then merge their output sequentially. However, we need to be careful because it will seems that all tests executed by forked process 1 were run before forked process 2, etc.
  • We might try to implement a custom logger that flushes to disk and report the results by giving priority to the test cases which started first

Happy to explore both of these options

edmondop avatar Aug 10 '22 03:08 edmondop