pipelines
pipelines copied to clipboard
[sdk] Unable to aggregate results over ParallelFor in Kubeflow V2 using V1 workarounds such as `.after()`
Environment
- KFP version: V2 backend. 2.0.1.
- KFP SDK version: 2.3.0 (failing), 1.8.21 (working)
- All dependencies version:
kfp 2.3.0
kfp-pipeline-spec 0.2.2
kfp-server-api 2.0.1
Steps to reproduce
The new Kubeflow V2 backend does not yet support the use of dsl.Collected(), which is a functionality I was really looking forward to. I have a process where I run several components within a dsl.ParallelFor and then want to aggregate the results of all of these components after the dsl.ParallelFor finishes.
In V1 using SDK version kfp==1.8.21, I was able to work around the lack of any released fan-in mechanism by using the .after() method. A dummy snippet of code would be as follows, where comp_in_parfor executes some process and saves the results to S3, and then collector collects the results after the comp_in_parfor has finished by reading in the results from S3.
with dsl.ParallelFor(parfor_list) as args:
# saves out some artifacts to S3 to make DIY fan-in logic
comp_in_parfor = comp_template(a=args.a, b=args.b)
# reads in those same artifacts from S3
collector = collector_template(...).after(comp_in_parfor)
However, if I try to do the same thing using kfp==2.3.0 for the SDK, I get the following error, which means I cannot use the .after() method in V2. However, because the new dsl.Collected method also does not work in the V2 open-source backend (only Vertex AI from what I can tell), there is no way to fan-in from a ParallelFor, either DIY or properly.
kfp.compiler.compiler_utils.InvalidTopologyException: Illegal task dependency across DSL context managers. A downstream task
cannot depend on an upstream task within a dsl.ParallelFor context unless the downstream is within that context too or the
outputs are begin fanned-in to a list using dsl.Collected. Found task square-root which depends on upstream task square-and
sum within an uncommon dsl.ParallelFor context.
This error can easily be reproduced with this example:
import os
import kfp
from kfp import dsl
from typing import List
@dsl.component
def add(x: float, y: float) -> float:
return x + y
@dsl.component
def square_root(x: float) -> float:
return x ** .5
@dsl.pipeline
def dummy(parfor_list: List[float] = [1.2, 2.4, 3.6], b: float = 1.2) -> float:
with dsl.ParallelFor(parfor_list) as args:
sq_and_sum_task = add(x=args, y=args)
square_root_comp = square_root(x=b).after(sq_and_sum_task)
return square_root_comp.output
Expected result
My expected result can be one of two things:
dsl.Collected()is properly implemented in V2 KFP backend, rendering the need for.after()moot..after()works, so that users can properly lay out the sequential structure of their DAG and workaround the lack of fan-in logic present in kubeflow.
Am I missing a workaround that allows users to fan-in from a ParallelFor in the V2 SDK? If not, is there any way that .after() can be restored until dsl.Collected() is implemented properly in the backend? Until then, everyone blocked by this issue will be totally unable to use the V2 Kubeflow with their own DIY fan-in logic. It's a major bummer because V2 has some great functionalities. Namely I'm quite excited to have sub-dags to logically break up my repeated ParallelFor units and reduce UI strain due to overly large DAG structures.
Materials and Reference
- https://github.com/kubeflow/pipelines/issues/3412
- https://www.kubeflow.org/docs/components/pipelines/v2/pipelines/control-flow/#parallel-looping-dslparallelfor
Impacted by this bug? Give it a 👍.
/assign @connor-mccarthy
/assign @connor-mccarthy
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
This is fixed by https://github.com/kubeflow/pipelines/pull/10257 and will be released in kfp==2.5.0.
This issue is present in the newest release kfp==2.6.0 but not kfp==2.5.0
I can confirm the presence of the issue on 2.6.0
We are also impacted by this. We see this as a critical blocker for us since we have more and more tasks that we want to run in parallel. Specially dealing with gigantic images in the Gb size per image, so we cannot do this not in parallel. However, we must aggregate the results after we are done.
so it was fixed in 2.5.0 but regressed in 2.6.0?
are we missing a test?
I just tested this on Kubeflow 1.8.1 and using KFP SDK 2.5.0 and 2.8.0, but none work. Here's the sample dummy pipeline
from kfp import compiler, dsl
from typing import List
@dsl.component
def print_op(message: str):
print(s)
@dsl.pipeline()
def my_pipeline():
with dsl.ParallelFor([1, 2, 3]):
one = print_op(message='foo')
two = print_op(message='bar').after(one)
if __name__ == "__main__":
compiler.Compiler().compile(my_pipeline, __file__ + ".yaml")
there are no compilation errors but when I try to run the pipeline I get this error
{"error":"Failed to create a new run: InternalServerError: Failed to validate workflow for (): templates.entrypoint.tasks.root templates.root sorting failed: invalid dependency for-loop-2","code":13,"message":"Failed to create a new run: InternalServerError: Failed to validate workflow for (): templates.entrypoint.tasks.root templates.root sorting failed: invalid dependency for-loop-2","details":[{"@type":"type.googleapis.com/google.rpc.Status","code":13,"message":"Internal Server Error"}]}
My guess is that something changed in the KFP backend, not on KFP SDK that broke this.
@connor-mccarthy could we reopen this issue based on the comments above?
/reopen
@gregsheremeta: Reopened this issue.
In response to this:
/reopen
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
/lifecycle frozen
I use kfp==2.9.0
This is a major issue for me in V2. I have a V1 pipeline that used .after() to collect ParallelFor results. Any workaround for this? It is not working now and I don't see an alternative...
I really don't understand the V2 development priority. I'm upgrading kubeflow 1.x installation. I'd expect that the very first thing for a V2 release is to support the V1 capabilities. Really frustrating...
We have many new ML use cases that we are onboarding and we are forced to do so in V1 pipeline because of this bug fix. Thanks everybody for the hard work 👍, I hope we can get some permanent solution for this soon
One possible way of replacing .after seems to be using dsl.ExitHandler with the exit_task as the one to execute after.
This has a lot of limitations:
- The exit handler will always be executed, even if earlier tasks fail. It should be possible to use the kfp sdk within the exit task to determine if tasks have failed, but this will be complicated.
- Doesn't play nicely if you already have an exit handler - have to add a sub-pipeline between them, messy.
- Can't pass outputs in to it so have to use some form of external storage (I think this is just the same as using
.afterthough).
Not a simple workaround, but maybe useful for someone too desperate to wait for a proper fix.
Thanks for the idea @gfkeith since we already do have an exit handler, I believe it would be best to wait for a proper solution. I'm curious if this is part of the triage @gregsheremeta @juliusvonkohout?
/assign @zazulam
@rimolive: GitHub didn't allow me to assign the following users: zazulam.
Note that only kubeflow members with read permissions, repo collaborators and people who have commented on this issue/PR can be assigned. Additionally, issues/PRs can only have 10 assignees at the same time. For more information please see the contributor guide
In response to this:
/assign @zazulam
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.
@rimolive: GitHub didn't allow me to assign the following users: zazulam.
Note that only kubeflow members with read permissions, repo collaborators and people who have commented on this issue/PR can be assigned. Additionally, issues/PRs can only have 10 assignees at the same time. For more information please see the contributor guide
cc @HumairAK
@papagala @gfkeith We discussed that issue in the Pipelines WG meeting today. We have the SDK bits implemented but it's still missing the backend one. We targetted this issue to be done in KFP 2.5.0 so as of now I'd suggest use the workaround until the final solution is available.
Hope that helps.
This is great news @rimolive 🎉 . We are very much looking forward to get a permanent fix so we can have a more robust way of working with highly parallelized pipelines. I'm assuming this issue will be rolled out as part of Kubeflow 1.10? Thank you again!
Kubeflow 1.10 will ship KFP 2.4.0, we don't have time to upgrade to KFP 2.5.0.
Noted @rimolive thank you for the quick feedback. I'll cross my fingers for Kubeflow 1.10.1 😄. Still happy to hear that this got attention.
/assign @zazulam
I will probably merge KFP 2.5.0 to the kubeflow/manifests master branch as soon as possible after the kf 1.10.0 release so no worries.
com.google.cloud.ai.platform.common.errors.AiPlatformException: code=INVALID_ARGUMENT, message=Maximum number of events exceeded. Maximum allowed: 100, requested: 101, cause=null
I got this error while running vertex pipeline. Is there any size limitations for dsl.Collected?
@shehsu can you create a new issue and add more information such as the the version of kfp used along with a sample to reproduce the error you are seeing? I'm not fully aware of the divergence of backends with Vertex and KFP so there may be a chance that this might not be solvable from our side.