[BUG] Flytekit > 1.15, new feature of .with_overrides(pod_template) fails to override the pod_template
Describe the bug
@arbaobao @pingsutw
hi, I intend to use the with_overrides method, to be able to dynamically select which pod_template should be used for a given pod during run time. this is to allow let's say users to choose an AWS EC2 instance type, which associated to a managednodegroup in my EKS cluster
i saw in this PR, i am assuming flytekit>=1.15 onwards will have support for with_overrides method to override task's pod_template at runtime?
however, in my testing with flytekit=1.15.4, pod that is spun up in my flyte workflow always fails to be configured with the correct pod_template that is pass to the with_overrides method
i am wondering if this is a bug, or a gap in understanding of how the with_overrides(pod_template) feature works? because intuitively I expect this type of use case should be one of the primary reason for the introduction of this feature.
Expected behavior
say i have this flyte workflow, pod_b always fail to spin up with the appropriate pod_template, regardless if task pod_b has the compile time podtemplates defined or NOT.
i expect that if i launch a flyte workflow execution with a selected pod_template, it would be able to spin up pod_b with the correct pod_template
@workflow
def wf_a(.., pod_template="a"):
pod_a(.., pod_template="a")
@dynamic
def pod_a(..., pod_template="a"):
pod_b(..).with_overrides(requests=.., limits=.., pod_template="a")
@task
def pod_b(..)
<other_logic>
Additional context to reproduce
No response
Screenshots
No response
Are you sure this issue hasn't been raised already?
- [x] Yes
Have you read the Code of Conduct?
- [x] Yes
Thank you for opening your first issue here! 🛠
Yes, starting with Flytekit 1.15, the .with_overrides(pod_template=...) feature is officially supported and should allow you to override the pod template at runtime for a task invocation, as described in the 1.15.0 release notes and the relevant PR. This is also validated by Flytekit unit tests, which show that the pod template can be overridden using .with_overrides(pod_template=...) in a workflow context (test_node_creation.py).
If you are using Flytekit ≥1.15 and the override is not being applied at runtime, this may indicate a bug or a backend/platform configuration issue. There are no known limitations in the cited sources for using .with_overrides(pod_template=...) in dynamic workflows or tasks, but correct backend support (FlytePropeller, FlyteAdmin) and proper evaluation order of pod templates are required (pod template precedence docs). If the override is not respected, please ensure your backend is also up to date and that no higher-precedence pod template or plugin configuration is interfering.
Would you like a step-by-step troubleshooting guide or a minimal working example?
Sources:
- Flyte 1.15.0 release notes
- Support overriding task pod_template via with_overrides PR
- Flytekit test_node_creation.py (pod_template override)
- pod template precedence docs
This is an AI-generated response and your feedback is appreciated! Please leave a 👍 if this is helpful and 👎 if it is not.
Mention @runllm to ask follow-up questions. See runllm.com for more!
Hi @Clement-Tapway,
Thank you for your issue. Could you please post your entire workflow? I think you shouldn’t put the pod_template in the function parameters — instead, you can specify pod_template in the task decorator and override the pod_template in the runtime workflow, as shown in the example below.
@task(pod_template="...")
def t1():
....
@workflow
def wf():
t1().with_override(pod_template="...")
We just upgraded from flytekit v1.14.6 to v1.15.4 and all tasks using pod_template with_overrides seems to be broken. We are seeing bad task specification errors in propeller because the pod spec has an empty containers list and a primary container name configured as primary. This seems to imply that flytekit is serializing a task template with an empty containers list.
I'm not sure if its relevant but we calling the task and setting overrides inside of a conditional. Pseudo code is below.
@task(requests=...., pod_template=...)
def run_gpu_task():
....
@workflow
def single_sim():
sim_result = (
conditional("sim")
.if_(sim_type == ...)
.then(
run_gpu_task().with_overrides(pod_template=xxx)
)
.elif_(sim_type == ...)
.then(
run_gpu_task().with_overrides(pod_template=xxx)
)
.else_()
.fail(f"Unsupported sim_type: {sim_type}")
)
Removing pod_template in with_overrides has fixed the issue for us but is not ideal
This feature also doesn't work when we eliminate the pod_template from the task annotation and only set it using with_overrides.
Hi @Sovietaced, May I ask what your flyte version is?
Hi @Sovietaced, May I ask what your flyte version is?
V1.15.3 on the server
@Sovietaced I created a workflow using your pseudocode with Flyte v1.15.3 and Flytekit v1.15.4, but I still can’t reproduce your error. My example code is below. Is there anything I might be missing?
@task(pod_template=PodTemplate(
primary_container_name="primary",
labels={"lKeyA": "lValA", "lKeyB": "lValB"},
annotations={"aKeyA": "aValA", "aKeyB": "aValB"},
pod_spec=V1PodSpec(
containers=[
V1Container(
name="primary-nelson",
image="cr.flyte.org/flyteorg/flytekit:py3.10-latest",
env=[V1EnvVar(name="eKeyC", value="eValC"), V1EnvVar(name="eKeyD", value="eValD")],
),
V1Container(
name="primary-nelson2",
image="cr.flyte.org/flyteorg/flytekit:py3.10-latest",
env=[V1EnvVar(name="eKeyC", value="eValC"), V1EnvVar(name="eKeyD", value="eValD")],
),
],
)))
def say_hello() -> str:
return "Hello, World!"
@workflow
def hello_world_wf3(sim_type: int = 1) -> str:
sim_result = (
conditional("sim")
.if_(sim_type == 1)
.then(
say_hello().with_overrides(pod_template=PodTemplate(
primary_container_name="primary",
labels={"lKeyA": "lValA", "lKeyB": "lValB"},
annotations={"aKeyA": "aValA", "aKeyB": "aValB"},
pod_spec=V1PodSpec(
containers=[
V1Container(
name="primary-nelson",
image="cr.flyte.org/flyteorg/flytekit:py3.11-latest",
env=[V1EnvVar(name="eKeyC", value="eValC"), V1EnvVar(name="eKeyD", value="eValD")],
),
V1Container(
name="primary-nelson2",
image="cr.flyte.org/flyteorg/flytekit:py3.12-latest",
env=[V1EnvVar(name="eKeyC", value="eValC"), V1EnvVar(name="eKeyD", value="eValD")],
),
],
)
))
)
.else_()
.fail(f"Unsupported sim_type: {sim_type}")
)
return sim_result
I created a workflow using your pseudocode with Flyte v1.15.3 and Flytekit v1.15.4, but I still can’t reproduce your error. My example code is below. Is there anything I might be missing?
Not sure if you have more sample code but that looks like a pod template for a single task. Also the container names don't match the primary container name which seems problematic.
The pod templates look more like
PodTemplate(
primary_container_name=PodConstants.PRIMARY_CONTAINER_NAME,
pod_spec=V1PodSpec(
volumes=[
V1Volume(
name=PodConstants.DSHM_VOLUME_NAME,
empty_dir=V1EmptyDirVolumeSource(
size_limit=str(shared_volume_memory.size),
medium="Memory",
),
),
],
containers=[
V1Container(
name=PodConstants.PRIMARY_CONTAINER_NAME,
volume_mounts=[
V1VolumeMount(
name=PodConstants.DSHM_VOLUME_NAME,
mount_path=PodConstants.DSHM_VOLUME_MOUNT_PATH,
),
],
),
],
),
)
Hi @Clement-Tapway, Thank you for your issue. Could you please post your entire workflow? I think you shouldn’t put the
pod_templatein the function parameters — instead, you can specifypod_templatein the task decorator and override the pod_template in the runtime workflow, as shown in the example below.@task(pod_template="...") def t1(): .... @workflow def wf(): t1().with_override(pod_template="...")
@arbaobao I tried both variants, and both doesn't work.
this pseudocode that i shared highlights what i intend to do with my flyte workflow. I intend to pass a certain variable (i name it pod_template for easier reference) to the flyte workflow input, that is passed to the flyte @dynamic workflow, that will affect and override the @task's pod_template during run time.
pls let me know if this is a bug that should work, or not a feasible option?
Because from my understanding, the purpose of having with_overrides support for pod_template is to allow me to override a given task's pod template dynamically during runtime (hence the addition of the dynamic workflow in between my workflow and task)
variant A: flyte task with pod_template in task decorator
@workflow
def wf_a(.., pod_template="a"):
pod_a(.., pod_template="a")
@dynamic
def pod_a(..., pod_template="a"):
pod_b(..).with_overrides(requests=.., limits=.., pod_template="a")
@task(pod_template="...")
def pod_b(..)
<other_logic>
variant B: flyte task without pod_template in task decorator
@workflow
def wf_a(.., pod_template="a"):
pod_a(.., pod_template="a")
@dynamic
def pod_a(..., pod_template="a"):
pod_b(..).with_overrides(requests=.., limits=.., pod_template="a")
@task
def pod_b(..)
<other_logic>
I have found the issue and will have a pull request shortly. The issue is that the propeller side implementation does not correctly overlay the pod template onto the base pod spec generated by flyte. It just uses the pod_template as the base pod spec unconditionally so it will blow out anything in the base pod spec that isn't int he pod template.
Simple test case to illustrate the issue.
t.Run("Override pod template", func(t *testing.T) {
taskContext := dummyExecContext(dummyTaskTemplate(), &v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
}}, nil, "", &core.K8SPod{
PrimaryContainerName: "foo",
PodSpec: podSpecStruct,
Metadata: metadata,
})
p, m, _, err := ToK8sPodSpec(context.TODO(), taskContext)
assert.NoError(t, err)
assert.Equal(t, "a", m.Labels["l"])
assert.Equal(t, "b", m.Annotations["a"])
assert.Equal(t, "foo:latest", p.Containers[0].Image)
assert.Equal(t, "foo", p.Containers[0].Name)
assert.Equal(t, []string{"command"}, p.Containers[0].Command) // <-- added to existing test case which fails
})
@Sovietaced
When we use with_overrides(pod_template=...), Flytekit serializes the podspec so that the overrided_podspec can be used directly by propeller. Hence, in this test case, I think it should not have command.
I rewrite my test code according your sample code, and I still can't reproduce your error. I am wondering what your error messages are.
class PodConstants:
PRIMARY_CONTAINER_NAME = "primary-nelson"
DSHM_VOLUME_MOUNT_PATH = "/mnt/dshm"
DSHM_VOLUME_NAME = "nelson-volume"
@task
def say_hello() -> str:
return "Hello, World!"
@workflow
def hello_world_wf3(sim_type: int = 1) -> str:
sim_result = (
conditional("sim")
.if_(sim_type == 1)
.then(
say_hello().with_overrides(pod_template=PodTemplate(
primary_container_name=PodConstants.PRIMARY_CONTAINER_NAME,
pod_spec=V1PodSpec(
volumes=[
V1Volume(
name=PodConstants.DSHM_VOLUME_NAME,
empty_dir=V1EmptyDirVolumeSource(
size_limit=str("10"),
medium="Memory",
),
),
],
containers=[
V1Container(
name=PodConstants.PRIMARY_CONTAINER_NAME,
env=[V1EnvVar(name="eKeyC", value="eValC"), V1EnvVar(name="eKeyD", value="eValD")],
volume_mounts=[
V1VolumeMount(
name=PodConstants.DSHM_VOLUME_NAME,
mount_path=PodConstants.DSHM_VOLUME_MOUNT_PATH,
),
],
),
],
)
))
)
.else_()
.fail(f"Unsupported sim_type: {sim_type}")
)
return sim_result
The most immediate issue we're seeing is that we're getting bad task specification errors from Flyte propeller.
RuntimeExecutionError: max number of system retry attempts [31/30] exhausted. Last known status message: failed at Node[n0-n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [armada]: generating job request: [BadTaskSpecification] invalid TaskSpecification, primary container [primary] not defined
I added a lot of debugging statements to a custom image and found that the pod_spec of the pod_template is empty when it is read in propeller.
podTemplate=metadata:{} pod_spec:{} primary_container_name:"primary"
I assume that @Clement-Tapway probably doesn't specify a primary_container_name in his pod template override so the whole thing is just a no-op for him and doesn't blow up like it does for us.
The most immediate issue we're seeing is that we're getting bad task specification errors from Flyte propeller.
RuntimeExecutionError: max number of system retry attempts [31/30] exhausted. Last known status message: failed at Node[n0-n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [armada]: generating job request: [BadTaskSpecification] invalid TaskSpecification, primary container [primary] not defined
I added a lot of debugging statements to a custom image and found that the pod_spec of the pod_template is empty when it is read in propeller.
podTemplate=metadata:{} pod_spec:{} primary_container_name:"primary"I assume that @Clement-Tapway probably doesn't specify a
primary_container_namein his pod template override so the whole thing is just a no-op for him and doesn't blow up like it does for us.
hi @Sovietaced , yes you are correct i only specify pod_spec in my PodTemplate, may i know if there is any workaround for my case? or a pull request will be opened to fix this issue?
I am attaching the function that generates my pod template for your reference pod_template_generator.py
from kubernetes.client.models import (
V1EnvVar,
V1EnvVarSource,
V1ObjectFieldSelector,
V1PodSpec,
V1Toleration,
V1Affinity,
V1NodeAffinity,
V1NodeSelector,
V1NodeSelectorTerm,
V1PodAntiAffinity,
V1PodAffinityTerm,
V1LabelSelector,
V1Volume,
V1VolumeMount,
V1EmptyDirVolumeSource,
V1Container,
)
import os
import typing
import yaml
from flytekit import PodTemplate
def load_yaml_config(file_name: str = "pod_template.yaml") -> dict:
"""Load and return the YAML configuration."""
file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), file_name)
with open(file_path, "r") as f:
return yaml.safe_load(f)
def generate_train_pod_template(
instance_size: str = "nano",
config_path: str = "pod_template.yaml"
):
"""
Function to generate a pod template for training.
This function constructs a pod template based on the provided node labels and configuration file (pod_template.yaml).
Args:
instance size (str): Select type of EC2 instance to launch for training. Defaults to "nano" if not specified.
"nano" is t2.small instance without GPU just for the entrypoint pod.
"small", "medium", "large" "xlarge", are instances with GPU for the training pod.
config_path (str): Path to config file containing pod template values. Defaults to "pod_template.yaml".
Returns:
PodTemplate: Pod template object for a given pod.
"""
INSTANCE_SIZE_OPTIONS = ["nano", "small", "medium", "large", "xlarge"]
if instance_size not in INSTANCE_SIZE_OPTIONS:
raise ValueError(
f"Invalid instance size: {instance_size}. Choose from {INSTANCE_SIZE_OPTIONS}."
)
# Instance labels for selecting the appropriate EC2 instance type
instance_labels = [
{
"key": "size",
"operator": "In",
"values": [instance_size],
}
]
# Load YAML configuration
config = load_yaml_config(config_path)
train_pod_config = config.get("train_pod", {})
# 1. Construct container
container_config = train_pod_config.get("containers", [{}])[0]
env_vars = []
for env in container_config.get("env", []):
value_from = env.get("value_from", {})
field_ref = value_from.get("field_ref", {})
env_vars.append(
V1EnvVar(
name=env.get("name"),
value_from=V1EnvVarSource(
field_ref=V1ObjectFieldSelector(field_path=field_ref.get("field_path"))
) if value_from else None,
)
)
train_container = V1Container(
name=container_config.get("name", "primary"),
env=env_vars
)
train_container.volume_mounts = [
V1VolumeMount(
name=mount.get("name"),
mount_path=mount.get("mount_path")
) for mount in container_config.get("volume_mounts", [])
]
# 2. Construct volumes
volumes = []
for vol in train_pod_config.get("volumes", []):
empty_dir = vol.get("empty_dir", {})
volumes.append(
V1Volume(
name=vol.get("name"),
empty_dir=V1EmptyDirVolumeSource(medium=empty_dir.get("medium")) if empty_dir else None
)
)
# 3. Construct tolerations
tolerations = [
V1Toleration(
key=tol.get("key"),
value=tol.get("value"),
operator=tol.get("operator"),
effect=tol.get("effect")
) for tol in train_pod_config.get("tolerations", [])
if not (instance_size == "nano" and tol.get("key") == "gpu")
]
# 4a. Construct node affinity
node_affinity_config = train_pod_config.get("affinity", {}).get("node_affinity", {})
required_node = node_affinity_config.get("required_during_scheduling_ignored_during_execution", {})
node_selector_terms = []
for term in required_node.get("node_selector_terms", []):
match_expressions = [
{
"key": expr.get("key"),
"operator": expr.get("operator"),
"values": expr.get("values") if not (instance_size == "nano" and expr.get("key") == "spot") else ["true"]
} for expr in term.get("match_expressions", [])
if not (instance_size == "nano" and expr.get("key") == "gpu")
]
node_selector_terms.append(
V1NodeSelectorTerm(match_expressions=match_expressions + instance_labels)
)
node_affinity = V1NodeAffinity(
required_during_scheduling_ignored_during_execution=V1NodeSelector(
node_selector_terms=node_selector_terms
) if node_selector_terms else None
)
# 4b. Construct pod anti-affinity
pod_anti_affinity_config = train_pod_config.get("affinity", {}).get("pod_anti_affinity", {})
pod_anti_affinity_terms = []
for term in pod_anti_affinity_config.get("required_during_scheduling_ignored_during_execution", []):
label_selector = term.get("label_selector", {})
match_expressions = [
{
"key": expr.get("key"),
"operator": expr.get("operator"),
"values": expr.get("values")
} for expr in label_selector.get("match_expressions", [])
]
pod_anti_affinity_terms.append(
V1PodAffinityTerm(
label_selector=V1LabelSelector(match_expressions=match_expressions),
topology_key=term.get("topology_key"),
namespaces=term.get("namespaces")
)
)
pod_anti_affinity = V1PodAntiAffinity(
required_during_scheduling_ignored_during_execution=pod_anti_affinity_terms
) if pod_anti_affinity_terms else None
# Final: Construct pod spec
pod_spec = V1PodSpec(
service_account_name=train_pod_config.get("service_account_name", "default"),
containers=[train_container],
tolerations=tolerations,
volumes=volumes,
affinity=V1Affinity(
node_affinity=node_affinity,
pod_anti_affinity=pod_anti_affinity
) if node_affinity or pod_anti_affinity else None
)
# Output: Pod template
train_pod_template = PodTemplate(pod_spec=pod_spec)
return train_pod_template
hi @Sovietaced , yes you are correct i only specify
pod_specin myPodTemplate, may i know if there is any workaround for my case? or a pull request will be opened to fix this issue?
No workaround yet. I'm not as familiar with the flytekit side of things so hopefully @arbaobao can come up with something. But I think even when that is fixed there will be more work to do on the server side of things as I've mentioned above.
No workaround yet. I'm not as familiar with the flytekit side of things so hopefully @arbaobao can come up with something. But I think even when that is fixed there will be more work to do on the server side of things as I've mentioned above.
Actually this seems to have allowed things to progress at least: https://github.com/flyteorg/flytekit/pull/3270
@Clement-Tapway Does your pod_template work when it is added to the task decorator?
i only specify
pod_specin myPodTemplate
I think this should be fine.
I'm also having trouble with the new pod template override feature, using flytekit==1.15.0.
Using flytekit.Workflow, registering using FlyteRemote.register_workflow(), and specifying the pod_template...
- Only in the task decorator: the pod spec shows up in flyte, but the task hangs.
- Only using an override: the pod spec no longer shows up in flyte, and the task hangs.
- In both the task decorator and overrides: the original pod spec shows up in flyte, and the task hangs.
Using @workflow, registering using the CLI, and specifying the pod_template...
- Only in the task decorator: the pod spec shows up in flyte and the task runs.
- Only using an override: the pod spec no longer shows up in flyte, and the task hangs.
- In both the task decorator and overrides: the original pod spec shows up in flyte, and the task hangs.
The hang state is always the same: the task waits in a queued state until flytepropeller exhausts its retries.
The only way I can get a pod template to work is in a @workflow, in the task decorator, and without specifying an override.
The pod spec I'm using works fine when it is supplied using flytekitplugins.pod via task_config.
@Clement-Tapway Does your pod_template work when it is added to the task decorator?
i only specify
pod_specin myPodTemplateI think this should be fine.
@arbaobao Yes pod template works when added to the task decorator, but not working if using .with_overrides in the @task call
@samhita-alla @Sovietaced may i know this issue is fixed? and which version of flytekit should i install to receive the fixes?
@samhita-alla @Sovietaced may i know this issue is fixed? and which version of flytekit should i install to receive the fixes?
The issue if fixed, I think it will be in the next version of flytekit. I'd subscribe to new releases to get notified.
Thanks for your great help! @Sovietaced