flyte icon indicating copy to clipboard operation
flyte copied to clipboard

[BUG] Spark Pod Template Resources not getting applied

Open fiedlerNr9 opened this issue 9 months ago • 16 comments

Describe the bug

Resources defined in Flyte Pod Templates do not seem to get applied to Flyte Spark Tasks. I am able to submit the following sample code but dont see the resources getting propagated down to the driver or executor pods. Sample Workflow im using:

from operator import add

import flytekit
from flytekit import ImageSpec, Resources, task, workflow, dynamic, PodTemplate
from flytekitplugins.spark import Spark
from utils import f
from kubernetes.client import (
    V1Container,
    V1PodSpec,
)

container_image = ImageSpec(
    name="sparkfun",
    registry="ghcr.io/fiedlernr9",
    python_version="3.9",
    packages=["flytekitplugins-spark", "kubernetes"],
)

pod_template = PodTemplate(
    pod_spec=V1PodSpec(
        containers=[
            V1Container(
                # name="spark-kubernetes-executor",
                name="primary",
                resources={
                    "requests": {"cpu": "4" ,"ephemeral-storage": "10Gi"},
                    "limits": {"cpu": "4", "ephemeral-storage": "10Gi"},
                },
            )
        ]
    )
)


@task(
    task_config=Spark(
        # This configuration is applied to the Spark cluster
        spark_conf={
            "spark.driver.memory": "1000M",
            "spark.executor.memory": "4000M",
            "spark.executor.cores": "3",
            "spark.executor.instances": "5",
            "spark.driver.cores": "2",
        }
    ),
    pod_template=pod_template,
    container_image=container_image,
)
def hello_spark(partitions: int) -> float:
    print("Starting Spark with Partitions:! {}".format(partitions))

    n = 1 * partitions
    sess = flytekit.current_context().spark_session
    count = (
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    )
    print("Im done")
    pi_val = 4.0 * count / n
    return pi_val


@task(
    container_image=container_image,
    pod_template=pod_template,
)
def print_every_time(value_to_print: float):
    print("My printed value: {}".format(value_to_print))


@workflow
def wf(partitions: int = 1000) -> float:
    pi = hello_spark(partitions=partitions)
    print_every_time(value_to_print=pi)
    return pi

Describing Resources for driver:

Limits:                                                                                                                                                                          │
│       cpu:     1                                                                                                                                                                     │
│       memory:  2867Mi                                                                                                                                                                │
│     Requests:                                                                                                                                                                        │
│       cpu:     1                                                                                                                                                                     │
│       memory:  2867Mi

Describing Resources for executors:

Limits:                                                                                                                                                                          │
│       cpu:     3                                                                                                                                                                     │
│       memory:  5600Mi                                                                                                                                                                │
│     Requests:                                                                                                                                                                        │
│       cpu:     3                                                                                                                                                                     │
│       memory:  5600Mi

Expected behavior

I would expect the pod template resources to take precedence and see them in the underlying k8s pod definition

Additional context to reproduce

No response

Screenshots

No response

Are you sure this issue hasn't been raised already?

  • [x] Yes

Have you read the Code of Conduct?

  • [x] Yes

fiedlerNr9 avatar Mar 19 '25 15:03 fiedlerNr9

@fiedlerNr9 not sure if completely addresses your issue but support for Driver/Executor podTemplates was added to 1.15.3 in https://github.com/flyteorg/flytekit/pull/3016

davidmirror-ops avatar Mar 19 '25 17:03 davidmirror-ops

Thanks for the hint @davidmirror-ops . I just tried that myself but experiencing the same outcome. cc @machichima

fiedlerNr9 avatar Mar 19 '25 19:03 fiedlerNr9

Hi @fiedlerNr9 , You should be able to set the resources for spark by spark_conf, it it also not working for using spark_conf?

machichima avatar Mar 20 '25 02:03 machichima

Thanks for coming back @machichima and yes im aware of setting resources in spark_conf like "spark.executor.memory": "4000M". Maybe I should have given more context: The reason Im raising this comes from me wanting to set ephemeral-storage for spark driver/executor pods. According to Spark documentation there is no config such spark.executor.ephemeral_storage - thats why I was searching for a workaround using pod templates

fiedlerNr9 avatar Mar 20 '25 02:03 fiedlerNr9

Hi @fiedlerNr9 , thanks for giving more information. I'll work on this and ping you if any update is done. 😄

machichima avatar Mar 21 '25 08:03 machichima

Quick update here, to set the ephemeral-storage in resources, we need to update package github.com/GoogleCloudPlatform/spark-on-k8s-operator/ to newer version, which is github.com/kubeflow/spark-operator. However, there's some dependencies version conflict to fix.

I am not sure if there's any work around without upgrading this package.

machichima avatar Mar 23 '25 13:03 machichima

@fiedlerNr9 , can we confirm which version of flyte (backend) you're running this on?

eapolinario avatar Mar 27 '25 17:03 eapolinario

I ran this in Union, with the latest flyte fork on our side @eapolinario

fiedlerNr9 avatar Mar 27 '25 18:03 fiedlerNr9

@pingsutw, can you take a look when you have a moment?

eapolinario avatar Mar 27 '25 21:03 eapolinario

@machichima

Quick update here, to set the ephemeral-storage in resources, we need to update package github.com/GoogleCloudPlatform/spark-on-k8s-operator/ to newer version, which is github.com/kubeflow/spark-operator. However, there's some dependencies version conflict to fix.

Could you elaborate a bit more? I think we are getting to the point where we'll need to bump this dependency version

davidmirror-ops avatar Jun 10 '25 17:06 davidmirror-ops

I've just been discussing a related issue with @davidmirror-ops. For us its priorityClassName that is causing issues, but we would also be interested in ephemeral-storage.

I also came to the conclusion that the spark-operator dependency will need to be updated. I think to set priorityClassName requires at least v2.0.0 and I think ephemeral storage requires at least v2.1.0.

I can also share that we've been using Flyte with spark operator 2.1.1 for some time, so probably the upgrade is not too painful.

@machichima are you actively working on this? If not then we might make a contribution.

Tom-Newton avatar Jun 10 '25 19:06 Tom-Newton

@machichima are you actively working on this? If not then we might make a contribution.

@Tom-Newton

No, I'm not currently working on this. Feel free to take over 😁

machichima avatar Jun 10 '25 23:06 machichima

"Hello 👋, this issue has been inactive for over 90 days. To help maintain a clean and focused backlog, we'll be marking this issue as stale and will close the issue if we detect no activity in the next 7 days. Thank you for your contribution and understanding! 🙏"

github-actions[bot] avatar Sep 09 '25 00:09 github-actions[bot]

hey @Tom-Newton any chance you could contribute? It'd be great to enable more flexibility here

davidmirror-ops avatar Oct 15 '25 15:10 davidmirror-ops

hey @Tom-Newton any chance you could contribute? It'd be great to enable more flexibility here

Maybe, but I can't make any promises. We ended up changing the priorityClassName on basically everything else in our cluster to work around not being able to configure it on Spark tasks through Flyte.

Tom-Newton avatar Oct 15 '25 15:10 Tom-Newton

I think we Might be able to do this in v2.0?

kumare3 avatar Oct 16 '25 03:10 kumare3