renaissance icon indicating copy to clipboard operation
renaissance copied to clipboard

Spark worker scaling parameters

Open ceresek opened this issue 3 years ago • 10 comments

Currently, the performance of the Spark benchmarks does not change with the configured number of executors, except for ALS, which partitions the input data based on the configuration. This may be a relevant note in the Spark documentation:

Spark properties mainly can be divided into two kinds: one is related to deploy, like “spark.driver.memory”, “spark.executor.instances”, this kind of properties may not be affected when setting programmatically through SparkConf in runtime, or the behavior is depending on which cluster manager and deploy mode you choose, so it would be suggested to set through configuration file or spark-submit command line options; another is mainly related to Spark runtime control, like “spark.task.maxFailures”, this kind of properties can be set in either way.

This is quite vague, but may explain why our code (https://github.com/renaissance-benchmarks/renaissance/blob/510b3b9e8f01dc397f35b64a7b5f8a943b75d012/benchmarks/apache-spark/src/main/scala/org/renaissance/apache/spark/SparkUtil.scala#L66) does not behave as expected.

ceresek avatar Jun 07 '21 10:06 ceresek

workers-page-rank

ceresek avatar Jun 07 '21 10:06 ceresek

... seeing how our Spark instance is hardcoded to run locally (https://github.com/renaissance-benchmarks/renaissance/blob/510b3b9e8f01dc397f35b64a7b5f8a943b75d012/benchmarks/apache-spark/src/main/scala/org/renaissance/apache/spark/SparkUtil.scala#L61) I'm not sure what we're trying to do with the worker instance count ?

ceresek avatar Jun 07 '21 10:06 ceresek

I was under the impression that the number of executors influenced the default number of partitions created by default when creating RDDs directly from files on disk. It was also one of the reasons I avoided creating RDDs by just preparing data collections in memory and calling parallelize() on them, which would avoid I/O, but use a different class underneath. Maybe it's something we should actually do (or maybe repartition the existing RDDs explicitly).

lbulej avatar Jun 07 '21 10:06 lbulej

... seeing how our Spark instance is hardcoded to run locally (

https://github.com/renaissance-benchmarks/renaissance/blob/510b3b9e8f01dc397f35b64a7b5f8a943b75d012/benchmarks/apache-spark/src/main/scala/org/renaissance/apache/spark/SparkUtil.scala#L61 ) I'm not sure what we're trying to do with the worker instance count ?

This was in the original code from the very beginning. I probably did a cursory check of the documentation at some point to see what that means, but at this point I only recall that the whole idea was to control the number of executors (thread pools) and the number of threads per executor. If this does not work as expected, then we should revisit this completely.

lbulej avatar Jun 07 '21 11:06 lbulej

I think in local mode there is always only one (in process) Executor (with one thread pool) created here https://github.com/apache/spark/blob/7ce7aa47585f579a84dc6dd8f116a48174cba988/core/src/main/scala/org/apache/spark/SparkContext.scala#L2873 and here https://github.com/apache/spark/blob/7ce7aa47585f579a84dc6dd8f116a48174cba988/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala#L62. We can only tweak the number of threads.

Not sure if some other functions (like partitioning) react to the setting of spark.executor.instances even in local mode, my guess is this would be unlikely (since there is also the option to allocate workers dynamically and therefore static value is possibly wrong).

If we aim for single JVM execution, then I think we can drop the executor count thing, as well as few other config bits, and just set the master to local[N] where N is the number of cores used.

ceresek avatar Jun 07 '21 11:06 ceresek

The thing with controlling the number of executor instances appears to originate from #145 and #147 and at that time, it seemed to work for @farquet.

Looking at the allowed master URLs, the recommended setting for local master appears to be to set the number of worker threads to the number of cores on the machine, i.e., local[*], which however led to the problem @farquet observed in #145 (on a big machine) in the first place.

I wonder if we should perhaps set the master to local[*] and see if we can limit the level of parallelism by explicitly setting the number of partitions? It's probably not much different from using local[N] with a benchmark-specific N, I was just thinking that if partitioning can control the parallelism, I would not push it into Spark configuration and instead make it part of "job description".

lbulej avatar Jun 07 '21 13:06 lbulej

#274 removes the configuration of executor instances (along with the benchmark parameter) as well as explicit input data partitioning (for now).

I was wondering whether it would make sense to have a parameter, e.g., input_partititions_scaling, as a multiplier of spark_thread_count to control the number of input data partitions. Reasonable values of the parameter would be probably limited to 0 (producing 1 partition), 0.5, 1, 1.5 (maybe), and 2.

lbulej avatar Jun 21 '21 17:06 lbulej

I have updated the PR and the measurement bundle (plugins work now). For testing, I added als-ml benchmark, which uses ml.ALS instead of mllib.ALS. Both do a conversion to RDD, but the mllib version seems more efficient.

lbulej avatar Jun 23 '21 06:06 lbulej

Adding a whole bunch of plots showing how performance depends on the number of threads for individual benchmarks, collected on dual socket Xeon Gold 6230 (2 packages, 20 cores per package, 2 threads per core).


threads-als


threads-als-ml


threads-chi-square


threads-dec-tree


threads-gauss-mix


threads-log-regression


threads-movie-lens


threads-naive-bayes


threads-page-rank

ceresek avatar Jul 01 '21 08:07 ceresek

Assuming other machines behave similarly, I think we should cap the number of threads used as follows (with a warning if more cores are available):

  • 2 or slightly more (exact value not important) - chi-square, gauss-mix
  • 4 or slightly more (exact value not important) - dec-tree
  • 8 - movie-lens
  • 12 - als, als-ml, log-regression, page-rank
  • unlimited - naive-bayes

That is until we tackle the scaling issue more systematically.

ceresek avatar Jul 01 '21 08:07 ceresek