flyte icon indicating copy to clipboard operation
flyte copied to clipboard

[BUG] Flytekit > 1.15, new feature of .with_overrides(pod_template) fails to override the pod_template

Open Clement-Tapway opened this issue 9 months ago • 17 comments

Describe the bug

@arbaobao @pingsutw hi, I intend to use the with_overrides method, to be able to dynamically select which pod_template should be used for a given pod during run time. this is to allow let's say users to choose an AWS EC2 instance type, which associated to a managednodegroup in my EKS cluster

i saw in this PR, i am assuming flytekit>=1.15 onwards will have support for with_overrides method to override task's pod_template at runtime?

however, in my testing with flytekit=1.15.4, pod that is spun up in my flyte workflow always fails to be configured with the correct pod_template that is pass to the with_overrides method

i am wondering if this is a bug, or a gap in understanding of how the with_overrides(pod_template) feature works? because intuitively I expect this type of use case should be one of the primary reason for the introduction of this feature.

Expected behavior

say i have this flyte workflow, pod_b always fail to spin up with the appropriate pod_template, regardless if task pod_b has the compile time podtemplates defined or NOT.

i expect that if i launch a flyte workflow execution with a selected pod_template, it would be able to spin up pod_b with the correct pod_template

@workflow
def wf_a(.., pod_template="a"):
  pod_a(.., pod_template="a")  

@dynamic
def pod_a(..., pod_template="a"):
  pod_b(..).with_overrides(requests=.., limits=.., pod_template="a")

@task
def pod_b(..)
  <other_logic>

Additional context to reproduce

No response

Screenshots

No response

Are you sure this issue hasn't been raised already?

  • [x] Yes

Have you read the Code of Conduct?

  • [x] Yes

Clement-Tapway avatar May 23 '25 01:05 Clement-Tapway

Thank you for opening your first issue here! 🛠

welcome[bot] avatar May 23 '25 01:05 welcome[bot]

Yes, starting with Flytekit 1.15, the .with_overrides(pod_template=...) feature is officially supported and should allow you to override the pod template at runtime for a task invocation, as described in the 1.15.0 release notes and the relevant PR. This is also validated by Flytekit unit tests, which show that the pod template can be overridden using .with_overrides(pod_template=...) in a workflow context (test_node_creation.py).

If you are using Flytekit ≥1.15 and the override is not being applied at runtime, this may indicate a bug or a backend/platform configuration issue. There are no known limitations in the cited sources for using .with_overrides(pod_template=...) in dynamic workflows or tasks, but correct backend support (FlytePropeller, FlyteAdmin) and proper evaluation order of pod templates are required (pod template precedence docs). If the override is not respected, please ensure your backend is also up to date and that no higher-precedence pod template or plugin configuration is interfering.

Would you like a step-by-step troubleshooting guide or a minimal working example?

Sources:


This is an AI-generated response and your feedback is appreciated! Please leave a 👍 if this is helpful and 👎 if it is not.

Mention @runllm to ask follow-up questions. See runllm.com for more!

runllm[bot] avatar May 23 '25 01:05 runllm[bot]

Hi @Clement-Tapway, Thank you for your issue. Could you please post your entire workflow? I think you shouldn’t put the pod_template in the function parameters — instead, you can specify pod_template in the task decorator and override the pod_template in the runtime workflow, as shown in the example below.

@task(pod_template="...")
def t1():
    ....


@workflow
def wf():
    t1().with_override(pod_template="...")

arbaobao avatar May 29 '25 07:05 arbaobao

We just upgraded from flytekit v1.14.6 to v1.15.4 and all tasks using pod_template with_overrides seems to be broken. We are seeing bad task specification errors in propeller because the pod spec has an empty containers list and a primary container name configured as primary. This seems to imply that flytekit is serializing a task template with an empty containers list.

I'm not sure if its relevant but we calling the task and setting overrides inside of a conditional. Pseudo code is below.

@task(requests=...., pod_template=...)
def run_gpu_task():
	....

@workflow
def single_sim():
	sim_result = (
	        conditional("sim")
	        .if_(sim_type == ...)
	        .then(
	            run_gpu_task().with_overrides(pod_template=xxx)
	        )
	        .elif_(sim_type == ...)
	        .then(
	            run_gpu_task().with_overrides(pod_template=xxx)
	        )
	        .else_()
	        .fail(f"Unsupported sim_type: {sim_type}")
	    )

Removing pod_template in with_overrides has fixed the issue for us but is not ideal

Sovietaced avatar Jun 03 '25 22:06 Sovietaced

This feature also doesn't work when we eliminate the pod_template from the task annotation and only set it using with_overrides.

Sovietaced avatar Jun 03 '25 22:06 Sovietaced

Hi @Sovietaced, May I ask what your flyte version is?

arbaobao avatar Jun 04 '25 07:06 arbaobao

Hi @Sovietaced, May I ask what your flyte version is?

V1.15.3 on the server

Sovietaced avatar Jun 04 '25 15:06 Sovietaced

@Sovietaced I created a workflow using your pseudocode with Flyte v1.15.3 and Flytekit v1.15.4, but I still can’t reproduce your error. My example code is below. Is there anything I might be missing?


@task(pod_template=PodTemplate(
    primary_container_name="primary",
    labels={"lKeyA": "lValA", "lKeyB": "lValB"},
    annotations={"aKeyA": "aValA", "aKeyB": "aValB"},
    pod_spec=V1PodSpec(
        containers=[
            V1Container(
                name="primary-nelson",
                image="cr.flyte.org/flyteorg/flytekit:py3.10-latest",
                env=[V1EnvVar(name="eKeyC", value="eValC"), V1EnvVar(name="eKeyD", value="eValD")],
            ),
            V1Container(
                name="primary-nelson2",
                image="cr.flyte.org/flyteorg/flytekit:py3.10-latest",
                env=[V1EnvVar(name="eKeyC", value="eValC"), V1EnvVar(name="eKeyD", value="eValD")],
            ),
        ],
    )))
def say_hello() -> str:
    return "Hello, World!"


@workflow
def hello_world_wf3(sim_type: int = 1) -> str:
    sim_result = (
        conditional("sim")
        .if_(sim_type == 1)
        .then(
            say_hello().with_overrides(pod_template=PodTemplate(
                primary_container_name="primary",
                labels={"lKeyA": "lValA", "lKeyB": "lValB"},
                annotations={"aKeyA": "aValA", "aKeyB": "aValB"},
                pod_spec=V1PodSpec(
                    containers=[
                        V1Container(
                            name="primary-nelson",
                            image="cr.flyte.org/flyteorg/flytekit:py3.11-latest",
                            env=[V1EnvVar(name="eKeyC", value="eValC"), V1EnvVar(name="eKeyD", value="eValD")],
                        ),
                        V1Container(
                            name="primary-nelson2",
                            image="cr.flyte.org/flyteorg/flytekit:py3.12-latest",
                            env=[V1EnvVar(name="eKeyC", value="eValC"), V1EnvVar(name="eKeyD", value="eValD")],
                        ),
                    ],
                )
            ))
        )
        .else_()
        .fail(f"Unsupported sim_type: {sim_type}")
    )
    
    return sim_result

arbaobao avatar Jun 04 '25 18:06 arbaobao

I created a workflow using your pseudocode with Flyte v1.15.3 and Flytekit v1.15.4, but I still can’t reproduce your error. My example code is below. Is there anything I might be missing?

Not sure if you have more sample code but that looks like a pod template for a single task. Also the container names don't match the primary container name which seems problematic.

The pod templates look more like

PodTemplate(
        primary_container_name=PodConstants.PRIMARY_CONTAINER_NAME,
        pod_spec=V1PodSpec(
            volumes=[
                V1Volume(
                    name=PodConstants.DSHM_VOLUME_NAME,
                    empty_dir=V1EmptyDirVolumeSource(
                        size_limit=str(shared_volume_memory.size),
                        medium="Memory",
                    ),
                ),
            ],
            containers=[
                V1Container(
                    name=PodConstants.PRIMARY_CONTAINER_NAME,
                    volume_mounts=[
                        V1VolumeMount(
                            name=PodConstants.DSHM_VOLUME_NAME,
                            mount_path=PodConstants.DSHM_VOLUME_MOUNT_PATH,
                        ),
                    ],
                ),
            ],
        ),
    )

Sovietaced avatar Jun 05 '25 01:06 Sovietaced

Hi @Clement-Tapway, Thank you for your issue. Could you please post your entire workflow? I think you shouldn’t put the pod_template in the function parameters — instead, you can specify pod_template in the task decorator and override the pod_template in the runtime workflow, as shown in the example below.

@task(pod_template="...")
def t1():
    ....


@workflow
def wf():
    t1().with_override(pod_template="...")

@arbaobao I tried both variants, and both doesn't work. this pseudocode that i shared highlights what i intend to do with my flyte workflow. I intend to pass a certain variable (i name it pod_template for easier reference) to the flyte workflow input, that is passed to the flyte @dynamic workflow, that will affect and override the @task's pod_template during run time. pls let me know if this is a bug that should work, or not a feasible option? Because from my understanding, the purpose of having with_overrides support for pod_template is to allow me to override a given task's pod template dynamically during runtime (hence the addition of the dynamic workflow in between my workflow and task)

variant A: flyte task with pod_template in task decorator

@workflow
def wf_a(.., pod_template="a"):
  pod_a(.., pod_template="a")  

@dynamic
def pod_a(..., pod_template="a"):
  pod_b(..).with_overrides(requests=.., limits=.., pod_template="a")

@task(pod_template="...")
def pod_b(..)
  <other_logic>

variant B: flyte task without pod_template in task decorator

@workflow
def wf_a(.., pod_template="a"):
  pod_a(.., pod_template="a")  

@dynamic
def pod_a(..., pod_template="a"):
  pod_b(..).with_overrides(requests=.., limits=.., pod_template="a")

@task
def pod_b(..)
  <other_logic>

Clement-Tapway avatar Jun 05 '25 03:06 Clement-Tapway

I have found the issue and will have a pull request shortly. The issue is that the propeller side implementation does not correctly overlay the pod template onto the base pod spec generated by flyte. It just uses the pod_template as the base pod spec unconditionally so it will blow out anything in the base pod spec that isn't int he pod template.

Simple test case to illustrate the issue.

	t.Run("Override pod template", func(t *testing.T) {
		taskContext := dummyExecContext(dummyTaskTemplate(), &v1.ResourceRequirements{
			Requests: v1.ResourceList{
				v1.ResourceCPU: resource.MustParse("1024m"),
			}}, nil, "", &core.K8SPod{
			PrimaryContainerName: "foo",
			PodSpec:              podSpecStruct,
			Metadata:             metadata,
		})
		p, m, _, err := ToK8sPodSpec(context.TODO(), taskContext)
		assert.NoError(t, err)
		assert.Equal(t, "a", m.Labels["l"])
		assert.Equal(t, "b", m.Annotations["a"])
		assert.Equal(t, "foo:latest", p.Containers[0].Image)
		assert.Equal(t, "foo", p.Containers[0].Name)
		assert.Equal(t, []string{"command"}, p.Containers[0].Command) // <-- added to existing test case which fails
	})

Sovietaced avatar Jun 09 '25 17:06 Sovietaced

@Sovietaced When we use with_overrides(pod_template=...), Flytekit serializes the podspec so that the overrided_podspec can be used directly by propeller. Hence, in this test case, I think it should not have command.

I rewrite my test code according your sample code, and I still can't reproduce your error. I am wondering what your error messages are.

class PodConstants:
    PRIMARY_CONTAINER_NAME = "primary-nelson"
    DSHM_VOLUME_MOUNT_PATH = "/mnt/dshm"
    DSHM_VOLUME_NAME = "nelson-volume"

@task
def say_hello() -> str:
    return "Hello, World!"

@workflow
def hello_world_wf3(sim_type: int = 1) -> str:
    sim_result = (
        conditional("sim")
        .if_(sim_type == 1)
        .then(
            say_hello().with_overrides(pod_template=PodTemplate(
                primary_container_name=PodConstants.PRIMARY_CONTAINER_NAME,
                pod_spec=V1PodSpec(
                    volumes=[
                        V1Volume(
                            name=PodConstants.DSHM_VOLUME_NAME,
                            empty_dir=V1EmptyDirVolumeSource(
                                size_limit=str("10"),
                                medium="Memory",
                            ),
                        ),
                    ],
                    containers=[
                        V1Container(
                            name=PodConstants.PRIMARY_CONTAINER_NAME,
                            env=[V1EnvVar(name="eKeyC", value="eValC"), V1EnvVar(name="eKeyD", value="eValD")],
                            volume_mounts=[
                                V1VolumeMount(
                                    name=PodConstants.DSHM_VOLUME_NAME,
                                    mount_path=PodConstants.DSHM_VOLUME_MOUNT_PATH,
                                ),
                            ],
                        ),
                    ],
                )
            ))
        )
        .else_()
        .fail(f"Unsupported sim_type: {sim_type}")
    )

    return sim_result

arbaobao avatar Jun 09 '25 23:06 arbaobao

The most immediate issue we're seeing is that we're getting bad task specification errors from Flyte propeller.

RuntimeExecutionError: max number of system retry attempts [31/30] exhausted. Last known status message: failed at Node[n0-n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [armada]: generating job request: [BadTaskSpecification] invalid TaskSpecification, primary container [primary] not defined

I added a lot of debugging statements to a custom image and found that the pod_spec of the pod_template is empty when it is read in propeller.

podTemplate=metadata:{} pod_spec:{} primary_container_name:"primary"

I assume that @Clement-Tapway probably doesn't specify a primary_container_name in his pod template override so the whole thing is just a no-op for him and doesn't blow up like it does for us.

Sovietaced avatar Jun 10 '25 07:06 Sovietaced

The most immediate issue we're seeing is that we're getting bad task specification errors from Flyte propeller.

RuntimeExecutionError: max number of system retry attempts [31/30] exhausted. Last known status message: failed at Node[n0-n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [armada]: generating job request: [BadTaskSpecification] invalid TaskSpecification, primary container [primary] not defined

I added a lot of debugging statements to a custom image and found that the pod_spec of the pod_template is empty when it is read in propeller.

podTemplate=metadata:{} pod_spec:{} primary_container_name:"primary"

I assume that @Clement-Tapway probably doesn't specify a primary_container_name in his pod template override so the whole thing is just a no-op for him and doesn't blow up like it does for us.

hi @Sovietaced , yes you are correct i only specify pod_spec in my PodTemplate, may i know if there is any workaround for my case? or a pull request will be opened to fix this issue?

I am attaching the function that generates my pod template for your reference pod_template_generator.py

from kubernetes.client.models import (
    V1EnvVar,
    V1EnvVarSource,
    V1ObjectFieldSelector,
    V1PodSpec,
    V1Toleration,
    V1Affinity,
    V1NodeAffinity,
    V1NodeSelector,
    V1NodeSelectorTerm,
    V1PodAntiAffinity,
    V1PodAffinityTerm,
    V1LabelSelector,
    V1Volume,
    V1VolumeMount,
    V1EmptyDirVolumeSource,
    V1Container,
)
import os
import typing
import yaml
from flytekit import PodTemplate

def load_yaml_config(file_name: str = "pod_template.yaml") -> dict:
    """Load and return the YAML configuration."""
    file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), file_name)
    with open(file_path, "r") as f:
        return yaml.safe_load(f)

def generate_train_pod_template(
    instance_size: str = "nano",
    config_path: str = "pod_template.yaml"
):
    """
    Function to generate a pod template for training.
    This function constructs a pod template based on the provided node labels and configuration file (pod_template.yaml).

    Args:
        instance size (str): Select type of EC2 instance to launch for training. Defaults to "nano" if not specified.
                             "nano" is t2.small instance without GPU just for the entrypoint pod.
                             "small", "medium", "large" "xlarge", are instances with GPU for the training pod.
        config_path (str): Path to config file containing pod template values. Defaults to "pod_template.yaml".

    Returns:
        PodTemplate: Pod template object for a given pod.
    """
    INSTANCE_SIZE_OPTIONS = ["nano", "small", "medium", "large", "xlarge"]
    if instance_size not in INSTANCE_SIZE_OPTIONS:
        raise ValueError(
            f"Invalid instance size: {instance_size}. Choose from {INSTANCE_SIZE_OPTIONS}."
        )
    
    # Instance labels for selecting the appropriate EC2 instance type
    instance_labels = [
        {
            "key": "size",
            "operator": "In",
            "values": [instance_size], 
        }
    ]
    
    # Load YAML configuration
    config = load_yaml_config(config_path)
    train_pod_config = config.get("train_pod", {})

    # 1. Construct container
    container_config = train_pod_config.get("containers", [{}])[0]
    env_vars = []
    for env in container_config.get("env", []):
        value_from = env.get("value_from", {})
        field_ref = value_from.get("field_ref", {})
        env_vars.append(
            V1EnvVar(
                name=env.get("name"),
                value_from=V1EnvVarSource(
                    field_ref=V1ObjectFieldSelector(field_path=field_ref.get("field_path"))
                ) if value_from else None,
            )
        )

    train_container = V1Container(
        name=container_config.get("name", "primary"),
        env=env_vars
    )
    train_container.volume_mounts = [
        V1VolumeMount(
            name=mount.get("name"),
            mount_path=mount.get("mount_path")
        ) for mount in container_config.get("volume_mounts", [])
    ]

    # 2. Construct volumes
    volumes = []
    for vol in train_pod_config.get("volumes", []):
        empty_dir = vol.get("empty_dir", {})
        volumes.append(
            V1Volume(
                name=vol.get("name"),
                empty_dir=V1EmptyDirVolumeSource(medium=empty_dir.get("medium")) if empty_dir else None
            )
        )

    # 3. Construct tolerations
    tolerations = [
        V1Toleration(
            key=tol.get("key"),
            value=tol.get("value"),
            operator=tol.get("operator"),
            effect=tol.get("effect")
        ) for tol in train_pod_config.get("tolerations", [])
        if not (instance_size == "nano" and tol.get("key") == "gpu")
    ]

    # 4a. Construct node affinity
    node_affinity_config = train_pod_config.get("affinity", {}).get("node_affinity", {})
    required_node = node_affinity_config.get("required_during_scheduling_ignored_during_execution", {})
    node_selector_terms = []
    for term in required_node.get("node_selector_terms", []):
        match_expressions = [
            {
                "key": expr.get("key"),
                "operator": expr.get("operator"),
                "values": expr.get("values") if not (instance_size == "nano" and expr.get("key") == "spot") else ["true"]
            } for expr in term.get("match_expressions", [])
            if not (instance_size == "nano" and expr.get("key") == "gpu")
        ]
        node_selector_terms.append(
            V1NodeSelectorTerm(match_expressions=match_expressions + instance_labels)
        )

    node_affinity = V1NodeAffinity(
        required_during_scheduling_ignored_during_execution=V1NodeSelector(
            node_selector_terms=node_selector_terms
        ) if node_selector_terms else None
    )

    # 4b. Construct pod anti-affinity
    pod_anti_affinity_config = train_pod_config.get("affinity", {}).get("pod_anti_affinity", {})
    pod_anti_affinity_terms = []
    for term in pod_anti_affinity_config.get("required_during_scheduling_ignored_during_execution", []):
        label_selector = term.get("label_selector", {})
        match_expressions = [
            {
                "key": expr.get("key"),
                "operator": expr.get("operator"),
                "values": expr.get("values")
            } for expr in label_selector.get("match_expressions", [])
        ]
        pod_anti_affinity_terms.append(
            V1PodAffinityTerm(
                label_selector=V1LabelSelector(match_expressions=match_expressions),
                topology_key=term.get("topology_key"),
                namespaces=term.get("namespaces")
            )
        )

    pod_anti_affinity = V1PodAntiAffinity(
        required_during_scheduling_ignored_during_execution=pod_anti_affinity_terms
    ) if pod_anti_affinity_terms else None

    # Final: Construct pod spec
    pod_spec = V1PodSpec(
        service_account_name=train_pod_config.get("service_account_name", "default"),
        containers=[train_container],
        tolerations=tolerations,
        volumes=volumes,
        affinity=V1Affinity(
            node_affinity=node_affinity,
            pod_anti_affinity=pod_anti_affinity
        ) if node_affinity or pod_anti_affinity else None
    )
    
    # Output: Pod template
    train_pod_template = PodTemplate(pod_spec=pod_spec)

    return train_pod_template

Clement-Tapway avatar Jun 10 '25 07:06 Clement-Tapway

hi @Sovietaced , yes you are correct i only specify pod_spec in my PodTemplate, may i know if there is any workaround for my case? or a pull request will be opened to fix this issue?

No workaround yet. I'm not as familiar with the flytekit side of things so hopefully @arbaobao can come up with something. But I think even when that is fixed there will be more work to do on the server side of things as I've mentioned above.

Sovietaced avatar Jun 10 '25 07:06 Sovietaced

No workaround yet. I'm not as familiar with the flytekit side of things so hopefully @arbaobao can come up with something. But I think even when that is fixed there will be more work to do on the server side of things as I've mentioned above.

Actually this seems to have allowed things to progress at least: https://github.com/flyteorg/flytekit/pull/3270

Sovietaced avatar Jun 10 '25 07:06 Sovietaced

@Clement-Tapway Does your pod_template work when it is added to the task decorator?

i only specify pod_spec in my PodTemplate

I think this should be fine.

arbaobao avatar Jun 10 '25 08:06 arbaobao

I'm also having trouble with the new pod template override feature, using flytekit==1.15.0.

Using flytekit.Workflow, registering using FlyteRemote.register_workflow(), and specifying the pod_template...

  • Only in the task decorator: the pod spec shows up in flyte, but the task hangs.
  • Only using an override: the pod spec no longer shows up in flyte, and the task hangs.
  • In both the task decorator and overrides: the original pod spec shows up in flyte, and the task hangs.

Using @workflow, registering using the CLI, and specifying the pod_template...

  • Only in the task decorator: the pod spec shows up in flyte and the task runs.
  • Only using an override: the pod spec no longer shows up in flyte, and the task hangs.
  • In both the task decorator and overrides: the original pod spec shows up in flyte, and the task hangs.

The hang state is always the same: the task waits in a queued state until flytepropeller exhausts its retries.

The only way I can get a pod template to work is in a @workflow, in the task decorator, and without specifying an override.

The pod spec I'm using works fine when it is supplied using flytekitplugins.pod via task_config.

geekysuavo avatar Jun 24 '25 18:06 geekysuavo

@Clement-Tapway Does your pod_template work when it is added to the task decorator?

i only specify pod_spec in my PodTemplate

I think this should be fine.

@arbaobao Yes pod template works when added to the task decorator, but not working if using .with_overrides in the @task call

Clement-Tapway avatar Jun 25 '25 01:06 Clement-Tapway

@samhita-alla @Sovietaced may i know this issue is fixed? and which version of flytekit should i install to receive the fixes?

Clement-Tapway avatar Jun 25 '25 08:06 Clement-Tapway

@samhita-alla @Sovietaced may i know this issue is fixed? and which version of flytekit should i install to receive the fixes?

The issue if fixed, I think it will be in the next version of flytekit. I'd subscribe to new releases to get notified.

Sovietaced avatar Jun 29 '25 01:06 Sovietaced

Thanks for your great help! @Sovietaced

Clement-Tapway avatar Jun 30 '25 02:06 Clement-Tapway