[BUG] Spark Pod Template Resources not getting applied
Describe the bug
Resources defined in Flyte Pod Templates do not seem to get applied to Flyte Spark Tasks. I am able to submit the following sample code but dont see the resources getting propagated down to the driver or executor pods. Sample Workflow im using:
from operator import add
import flytekit
from flytekit import ImageSpec, Resources, task, workflow, dynamic, PodTemplate
from flytekitplugins.spark import Spark
from utils import f
from kubernetes.client import (
V1Container,
V1PodSpec,
)
container_image = ImageSpec(
name="sparkfun",
registry="ghcr.io/fiedlernr9",
python_version="3.9",
packages=["flytekitplugins-spark", "kubernetes"],
)
pod_template = PodTemplate(
pod_spec=V1PodSpec(
containers=[
V1Container(
# name="spark-kubernetes-executor",
name="primary",
resources={
"requests": {"cpu": "4" ,"ephemeral-storage": "10Gi"},
"limits": {"cpu": "4", "ephemeral-storage": "10Gi"},
},
)
]
)
)
@task(
task_config=Spark(
# This configuration is applied to the Spark cluster
spark_conf={
"spark.driver.memory": "1000M",
"spark.executor.memory": "4000M",
"spark.executor.cores": "3",
"spark.executor.instances": "5",
"spark.driver.cores": "2",
}
),
pod_template=pod_template,
container_image=container_image,
)
def hello_spark(partitions: int) -> float:
print("Starting Spark with Partitions:! {}".format(partitions))
n = 1 * partitions
sess = flytekit.current_context().spark_session
count = (
sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
)
print("Im done")
pi_val = 4.0 * count / n
return pi_val
@task(
container_image=container_image,
pod_template=pod_template,
)
def print_every_time(value_to_print: float):
print("My printed value: {}".format(value_to_print))
@workflow
def wf(partitions: int = 1000) -> float:
pi = hello_spark(partitions=partitions)
print_every_time(value_to_print=pi)
return pi
Describing Resources for driver:
Limits: │
│ cpu: 1 │
│ memory: 2867Mi │
│ Requests: │
│ cpu: 1 │
│ memory: 2867Mi
Describing Resources for executors:
Limits: │
│ cpu: 3 │
│ memory: 5600Mi │
│ Requests: │
│ cpu: 3 │
│ memory: 5600Mi
Expected behavior
I would expect the pod template resources to take precedence and see them in the underlying k8s pod definition
Additional context to reproduce
No response
Screenshots
No response
Are you sure this issue hasn't been raised already?
- [x] Yes
Have you read the Code of Conduct?
- [x] Yes
@fiedlerNr9 not sure if completely addresses your issue but support for Driver/Executor podTemplates was added to 1.15.3 in https://github.com/flyteorg/flytekit/pull/3016
Thanks for the hint @davidmirror-ops . I just tried that myself but experiencing the same outcome. cc @machichima
Hi @fiedlerNr9 ,
You should be able to set the resources for spark by spark_conf, it it also not working for using spark_conf?
Thanks for coming back @machichima and yes im aware of setting resources in spark_conf like "spark.executor.memory": "4000M". Maybe I should have given more context: The reason Im raising this comes from me wanting to set ephemeral-storage for spark driver/executor pods. According to Spark documentation there is no config such spark.executor.ephemeral_storage - thats why I was searching for a workaround using pod templates
Hi @fiedlerNr9 , thanks for giving more information. I'll work on this and ping you if any update is done. 😄
Quick update here, to set the ephemeral-storage in resources, we need to update package github.com/GoogleCloudPlatform/spark-on-k8s-operator/ to newer version, which is github.com/kubeflow/spark-operator. However, there's some dependencies version conflict to fix.
I am not sure if there's any work around without upgrading this package.
@fiedlerNr9 , can we confirm which version of flyte (backend) you're running this on?
I ran this in Union, with the latest flyte fork on our side @eapolinario
@pingsutw, can you take a look when you have a moment?
@machichima
Quick update here, to set the ephemeral-storage in resources, we need to update package github.com/GoogleCloudPlatform/spark-on-k8s-operator/ to newer version, which is github.com/kubeflow/spark-operator. However, there's some dependencies version conflict to fix.
Could you elaborate a bit more? I think we are getting to the point where we'll need to bump this dependency version
I've just been discussing a related issue with @davidmirror-ops. For us its priorityClassName that is causing issues, but we would also be interested in ephemeral-storage.
I also came to the conclusion that the spark-operator dependency will need to be updated. I think to set priorityClassName requires at least v2.0.0 and I think ephemeral storage requires at least v2.1.0.
I can also share that we've been using Flyte with spark operator 2.1.1 for some time, so probably the upgrade is not too painful.
@machichima are you actively working on this? If not then we might make a contribution.
@machichima are you actively working on this? If not then we might make a contribution.
@Tom-Newton
No, I'm not currently working on this. Feel free to take over 😁
"Hello 👋, this issue has been inactive for over 90 days. To help maintain a clean and focused backlog, we'll be marking this issue as stale and will close the issue if we detect no activity in the next 7 days. Thank you for your contribution and understanding! 🙏"
hey @Tom-Newton any chance you could contribute? It'd be great to enable more flexibility here
hey @Tom-Newton any chance you could contribute? It'd be great to enable more flexibility here
Maybe, but I can't make any promises. We ended up changing the priorityClassName on basically everything else in our cluster to work around not being able to configure it on Spark tasks through Flyte.
I think we Might be able to do this in v2.0?