datafusion-comet icon indicating copy to clipboard operation
datafusion-comet copied to clipboard

Test CometDriverPlugin in Kube/Yarn

Open comphead opened this issue 1 year ago • 5 comments

In Comet there is a Spark driver plugin that overrides spark.executor.memoryOverhead value to respect native part needed for the Comet.

Resource managers like YARN/Kube considers spark.executor.memoryOverhead param when containers get started.

Its needed to test the plugin and see if the container created with extra native memory amount set with

spark.comet.memoryOverhead
spark.comet.memory.overhead.factor
--conf spark.plugins=org.apache.spark.CometPlugin

Originally posted by @comphead in https://github.com/apache/datafusion-comet/issues/684#issuecomment-2287067051

comphead avatar Aug 13 '24 20:08 comphead

Related to https://github.com/apache/datafusion-comet/issues/605

comphead avatar Aug 13 '24 20:08 comphead

I tested this on YARN and I can confirm that --conf spark.plugins=org.apache.spark.CometPlugin increases Spark memory overhead, but this is not visible in the session parameters.

I launch a spark shell with two containers on YARN with no Comet. YARN reports 7168MB allocated, which corresponds to (2048 MB memory + 1024 MB memory overhead) + 1024 MB YARN AM container, the cluster's default settings.

spark-shell \
    --master yarn \
    --num-executors 2

Then I launch a spark shell with Comet enabled:

spark-shell \
    --master yarn \
    --num-executors 2 \
    --jars $COMET_JAR_LOCATION \
    --conf spark.driver.extraClassPath=$COMET_JAR \
    --conf spark.executor.extraClassPath=$COMET_JAR \
    --conf spark.plugins=org.apache.spark.CometPlugin \
    --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
    --conf spark.comet.nativeLoadRequired=true \
    --conf spark.comet.exec.enabled=true \
    --conf spark.comet.exec.all.enabled=true \
    --conf spark.comet.explainFallback.enabled=true \
    --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
    --conf spark.comet.exec.shuffle.enabled=true \
    --conf spark.comet.exec.shuffle.mode=auto \
    --conf spark.sql.adaptive.coalescePartitions.enabled=false

Spark session doesn't report anything unusual:

+-------------------------------------+-----+
|key                                  |value|
+-------------------------------------+-----+
|spark.comet.debug.enabled            |true |
|spark.comet.exceptionOnDatetimeRebase|true |
|spark.comet.exec.all.enabled         |true |
|spark.comet.exec.enabled             |true |
|spark.comet.exec.shuffle.enabled     |true |
|spark.comet.exec.shuffle.mode        |auto |
|spark.comet.explainFallback.enabled  |true |
|spark.comet.nativeLoadRequired       |true |
|spark.driver.memory                  |4g   |
|spark.driver.memoryOverhead          |1g   |
|spark.executor.memory                |2g   |
|spark.executor.memoryOverhead        |1g   |
+-------------------------------------+-----+

but YARN reports 9216 MB in use, an increase of 1024 MB per container.

Spark History Server reports that the new memory overhead is 1433M, which is just 409MB per container. This matches the expected increase, since it's equal to 2048*0.2, the default value of spark.comet.memory.overhead.factor.

The difference is explained by the minimum allocation setting of YARN, it allocates memory in chunks of 1GB, which I have proven by running:

spark-shell \
    --master yarn \
    --num-executors 2 \
    --jars $COMET_JAR_LOCATION \
    --conf spark.executor.memoryOverhead=1025M

and getting 9216 MB allocated in YARN.


Changing spark.comet.memoryOverhead works, YARN reports 11264 MB in use when I add --conf spark.comet.memoryOverhead=2048 (2048+1024+2048)*2+1024=11264.


Changing spark.comet.memory.overhead.factor works, YARN reports 15360 MB in used when I add --conf spark.comet.memory.overhead.factor=2.0 (2048+1024+2048*2)*2+1024=15360.

orthoxerox avatar Aug 14 '24 10:08 orthoxerox

Thanks @orthoxerox for running the experiment. Please correct me if I'm wrong so both Comet memory params increases the YARN container memory usage, at the same time spark.executor.memoryOverhead shows the original value (without Comet)

comphead avatar Aug 14 '24 18:08 comphead

@comphead That's right, but this applies only to the introspection from inside the Spark application. If you open Spark history server/application master, the Executor tab shows the amended value of spark.executor.memoryOverhead.

orthoxerox avatar Aug 15 '24 06:08 orthoxerox

Historical server has slightly other implementation. Great, thanks for the help @orthoxerox I'm planning to document this today

comphead avatar Aug 15 '24 15:08 comphead