pipelines icon indicating copy to clipboard operation
pipelines copied to clipboard

[sdk] Unable to aggregate results over ParallelFor in Kubeflow V2 using V1 workarounds such as `.after()`

Open TristanGreathouse opened this issue 2 years ago • 12 comments

Environment

  • KFP version: V2 backend. 2.0.1.
  • KFP SDK version: 2.3.0 (failing), 1.8.21 (working)
  • All dependencies version:
kfp                        2.3.0     
kfp-pipeline-spec          0.2.2     
kfp-server-api             2.0.1 

Steps to reproduce

The new Kubeflow V2 backend does not yet support the use of dsl.Collected(), which is a functionality I was really looking forward to. I have a process where I run several components within a dsl.ParallelFor and then want to aggregate the results of all of these components after the dsl.ParallelFor finishes.

In V1 using SDK version kfp==1.8.21, I was able to work around the lack of any released fan-in mechanism by using the .after() method. A dummy snippet of code would be as follows, where comp_in_parfor executes some process and saves the results to S3, and then collector collects the results after the comp_in_parfor has finished by reading in the results from S3.

with dsl.ParallelFor(parfor_list) as args:
    # saves out some artifacts to S3 to make DIY fan-in logic
    comp_in_parfor = comp_template(a=args.a, b=args.b)
    
# reads in those same artifacts from S3
 collector = collector_template(...).after(comp_in_parfor)

However, if I try to do the same thing using kfp==2.3.0 for the SDK, I get the following error, which means I cannot use the .after() method in V2. However, because the new dsl.Collected method also does not work in the V2 open-source backend (only Vertex AI from what I can tell), there is no way to fan-in from a ParallelFor, either DIY or properly.

kfp.compiler.compiler_utils.InvalidTopologyException: Illegal task dependency across DSL context managers. A downstream task
cannot depend on an upstream task within a dsl.ParallelFor context unless the downstream is within that context too or the
outputs are begin fanned-in to a list using dsl.Collected. Found task square-root which depends on upstream task square-and
sum within an uncommon dsl.ParallelFor context.

This error can easily be reproduced with this example:

import os
import kfp
from kfp import dsl
from typing import List


@dsl.component
def add(x: float, y: float) -> float:
    return x + y

@dsl.component
def square_root(x: float) -> float:
    return x ** .5

@dsl.pipeline
def dummy(parfor_list: List[float] = [1.2, 2.4, 3.6], b: float = 1.2) -> float:
    with dsl.ParallelFor(parfor_list) as args:
        sq_and_sum_task = add(x=args, y=args)
    square_root_comp = square_root(x=b).after(sq_and_sum_task)
    return square_root_comp.output

Expected result

My expected result can be one of two things:

  1. dsl.Collected() is properly implemented in V2 KFP backend, rendering the need for .after() moot.
  2. .after() works, so that users can properly lay out the sequential structure of their DAG and workaround the lack of fan-in logic present in kubeflow.

Am I missing a workaround that allows users to fan-in from a ParallelFor in the V2 SDK? If not, is there any way that .after() can be restored until dsl.Collected() is implemented properly in the backend? Until then, everyone blocked by this issue will be totally unable to use the V2 Kubeflow with their own DIY fan-in logic. It's a major bummer because V2 has some great functionalities. Namely I'm quite excited to have sub-dags to logically break up my repeated ParallelFor units and reduce UI strain due to overly large DAG structures.

Materials and Reference

  • https://github.com/kubeflow/pipelines/issues/3412
  • https://www.kubeflow.org/docs/components/pipelines/v2/pipelines/control-flow/#parallel-looping-dslparallelfor

Impacted by this bug? Give it a 👍.

TristanGreathouse avatar Oct 02 '23 23:10 TristanGreathouse

/assign @connor-mccarthy

zijianjoy avatar Oct 05 '23 22:10 zijianjoy

/assign @connor-mccarthy

zijianjoy avatar Oct 05 '23 23:10 zijianjoy

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Jan 04 '24 07:01 github-actions[bot]

This is fixed by https://github.com/kubeflow/pipelines/pull/10257 and will be released in kfp==2.5.0.

connor-mccarthy avatar Jan 05 '24 19:01 connor-mccarthy

This issue is present in the newest release kfp==2.6.0 but not kfp==2.5.0

mitchell-lawson avatar Feb 01 '24 19:02 mitchell-lawson

I can confirm the presence of the issue on 2.6.0

espoirMur avatar Feb 06 '24 15:02 espoirMur

We are also impacted by this. We see this as a critical blocker for us since we have more and more tasks that we want to run in parallel. Specially dealing with gigantic images in the Gb size per image, so we cannot do this not in parallel. However, we must aggregate the results after we are done.

papagala avatar Aug 01 '24 07:08 papagala

so it was fixed in 2.5.0 but regressed in 2.6.0?

are we missing a test?

gregsheremeta avatar Aug 04 '24 19:08 gregsheremeta

I just tested this on Kubeflow 1.8.1 and using KFP SDK 2.5.0 and 2.8.0, but none work. Here's the sample dummy pipeline

from kfp import compiler, dsl
from typing import List


@dsl.component
def print_op(message: str):
    print(s)


@dsl.pipeline()
def my_pipeline():
    with dsl.ParallelFor([1, 2, 3]):
        one = print_op(message='foo')
    two = print_op(message='bar').after(one)

if __name__ == "__main__":
    compiler.Compiler().compile(my_pipeline, __file__ + ".yaml")

there are no compilation errors but when I try to run the pipeline I get this error

{"error":"Failed to create a new run: InternalServerError: Failed to validate workflow for (): templates.entrypoint.tasks.root templates.root sorting failed: invalid dependency for-loop-2","code":13,"message":"Failed to create a new run: InternalServerError: Failed to validate workflow for (): templates.entrypoint.tasks.root templates.root sorting failed: invalid dependency for-loop-2","details":[{"@type":"type.googleapis.com/google.rpc.Status","code":13,"message":"Internal Server Error"}]}

My guess is that something changed in the KFP backend, not on KFP SDK that broke this.

papagala avatar Aug 21 '24 12:08 papagala

@connor-mccarthy could we reopen this issue based on the comments above?

papagala avatar Aug 21 '24 14:08 papagala

/reopen

gregsheremeta avatar Aug 23 '24 21:08 gregsheremeta

@gregsheremeta: Reopened this issue.

In response to this:

/reopen

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

google-oss-prow[bot] avatar Aug 23 '24 21:08 google-oss-prow[bot]

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Oct 23 '24 07:10 github-actions[bot]

/lifecycle frozen

gregsheremeta avatar Oct 23 '24 11:10 gregsheremeta

I use kfp==2.9.0 This is a major issue for me in V2. I have a V1 pipeline that used .after() to collect ParallelFor results. Any workaround for this? It is not working now and I don't see an alternative... I really don't understand the V2 development priority. I'm upgrading kubeflow 1.x installation. I'd expect that the very first thing for a V2 release is to support the V1 capabilities. Really frustrating...

asaff1 avatar Oct 27 '24 17:10 asaff1

We have many new ML use cases that we are onboarding and we are forced to do so in V1 pipeline because of this bug fix. Thanks everybody for the hard work 👍, I hope we can get some permanent solution for this soon

papagala avatar Nov 04 '24 09:11 papagala

One possible way of replacing .after seems to be using dsl.ExitHandler with the exit_task as the one to execute after.

This has a lot of limitations:

  • The exit handler will always be executed, even if earlier tasks fail. It should be possible to use the kfp sdk within the exit task to determine if tasks have failed, but this will be complicated.
  • Doesn't play nicely if you already have an exit handler - have to add a sub-pipeline between them, messy.
  • Can't pass outputs in to it so have to use some form of external storage (I think this is just the same as using .after though).

Not a simple workaround, but maybe useful for someone too desperate to wait for a proper fix.

gfkeith avatar Jan 13 '25 05:01 gfkeith

Thanks for the idea @gfkeith since we already do have an exit handler, I believe it would be best to wait for a proper solution. I'm curious if this is part of the triage @gregsheremeta @juliusvonkohout?

papagala avatar Jan 27 '25 15:01 papagala

/assign @zazulam

rimolive avatar Jan 29 '25 18:01 rimolive

@rimolive: GitHub didn't allow me to assign the following users: zazulam.

Note that only kubeflow members with read permissions, repo collaborators and people who have commented on this issue/PR can be assigned. Additionally, issues/PRs can only have 10 assignees at the same time. For more information please see the contributor guide

In response to this:

/assign @zazulam

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

google-oss-prow[bot] avatar Jan 29 '25 18:01 google-oss-prow[bot]

@rimolive: GitHub didn't allow me to assign the following users: zazulam.

Note that only kubeflow members with read permissions, repo collaborators and people who have commented on this issue/PR can be assigned. Additionally, issues/PRs can only have 10 assignees at the same time. For more information please see the contributor guide

cc @HumairAK

rimolive avatar Jan 29 '25 18:01 rimolive

@papagala @gfkeith We discussed that issue in the Pipelines WG meeting today. We have the SDK bits implemented but it's still missing the backend one. We targetted this issue to be done in KFP 2.5.0 so as of now I'd suggest use the workaround until the final solution is available.

Hope that helps.

rimolive avatar Jan 29 '25 19:01 rimolive

This is great news @rimolive 🎉 . We are very much looking forward to get a permanent fix so we can have a more robust way of working with highly parallelized pipelines. I'm assuming this issue will be rolled out as part of Kubeflow 1.10? Thank you again!

papagala avatar Feb 03 '25 10:02 papagala

Kubeflow 1.10 will ship KFP 2.4.0, we don't have time to upgrade to KFP 2.5.0.

rimolive avatar Feb 03 '25 11:02 rimolive

Noted @rimolive thank you for the quick feedback. I'll cross my fingers for Kubeflow 1.10.1 😄. Still happy to hear that this got attention.

papagala avatar Feb 03 '25 11:02 papagala

/assign @zazulam

zazulam avatar Feb 13 '25 18:02 zazulam

I will probably merge KFP 2.5.0 to the kubeflow/manifests master branch as soon as possible after the kf 1.10.0 release so no worries.

juliusvonkohout avatar Feb 22 '25 11:02 juliusvonkohout

com.google.cloud.ai.platform.common.errors.AiPlatformException: code=INVALID_ARGUMENT, message=Maximum number of events exceeded. Maximum allowed: 100, requested: 101, cause=null

I got this error while running vertex pipeline. Is there any size limitations for dsl.Collected?

shehsu avatar Sep 05 '25 12:09 shehsu

@shehsu can you create a new issue and add more information such as the the version of kfp used along with a sample to reproduce the error you are seeing? I'm not fully aware of the divergence of backends with Vertex and KFP so there may be a chance that this might not be solvable from our side.

zazulam avatar Sep 08 '25 14:09 zazulam