flyte icon indicating copy to clipboard operation
flyte copied to clipboard

[Core feature] Add the support for defining/declaring tasks in classes

Open XinEDprob opened this issue 1 year ago • 12 comments

Motivation: Why do you think this is important?

I wanted to build a Flyte pipeline for an existing project and I noticed that there are lots of classes in this project. If the Flyte can not support the code like the following:

from flytekit import workflow, task


class MyClass:

    @task
    def t1(self):
        return 'hello world'


@workflow
def wf():
    MyClass().t1()

there will be lots of work for us as we need to convert all the classes to functions so that these functions can be decorated as tasks. I believe this feature is important for the following reasons:

  • Encourage people to build more Flyte pipelines for the existing projects
  • The code will be more organized and concise

Goal: What should the final outcome look like, ideally?

In the end, I hope Flyte can support something like the following

from flytekit import workflow, task


class MyClass:

    @task
    def t1(self):
        return 'hello world'


@workflow
def wf():
    MyClass().t1()

That is to define tasks within classes

Describe alternatives you've considered

I don't think I have any alternative for this issue.

Propose: Link/Inline OR Additional context

No response

Are you sure this issue hasn't been raised already?

  • [X] Yes

Have you read the Code of Conduct?

  • [X] Yes

XinEDprob avatar Mar 17 '23 21:03 XinEDprob

Thank you for opening your first issue here! 🛠

welcome[bot] avatar Mar 17 '23 21:03 welcome[bot]

@XinEDprob this is infact already possible using TaskResolvers. Here is a ClassTaskResolver https://github.com/flyteorg/flytekit/blob/34f80ba12eda64431be4c21c78df81b7afbe2758/flytekit/core/class_based_resolver.py#L9

Here is an example in UnionML https://github.com/unionai-oss/unionml/blob/main/unionml/task_resolver.py

The problem is that you have written a simple example of a Class. But what if the class needs specific values for init.py you can write a metaclass based resolver that can allow you to instantiate the class using the input values.

You can also add support for something like this

@task_class
class MyTaskClass:
     
     def __init__(self, i: int):
         ...

     @entrypoint
     def run(self) ->  str:
        ....


@workflow
def wf(i: int) -> str:
    return MyTaskClass(i=i)

In this case a task can be created with inputs i: int and outputs str This is not supported today but possible.

And the final thing is what you said. this is more involved. What you had specified is something like MagicMock Here is what i envision


class MyTaskClass(metaclass=flytekit.TaskClass):
       def __init__(self, i: int):
           ...
   
       @class_task
       def t1(self, j: str) -> str:
          ....


@workflow
def wf(i: int, j: str) -> str:
    return MyTaskClass(i=i).t1(j=j)

kumare3 avatar Mar 18 '23 00:03 kumare3

Hi @kumare3 , thanks for the comments. I am pretty new to Flyte and it may take some time for me to fully understand your comments.

It seems a simple version (or special version) of what I said is possible to be implemented with TaskResolvers. The general case is so involved that the Flyte does not support it yet. So do we have plans to implement for the general case in the future?

XinEDprob avatar Mar 21 '23 00:03 XinEDprob

Hmm we have not implemented as no one has really asked for it and in general priority has been lower. Please join slack.Flyte.org and would love to understand why this is needed and see if there is potential collaboration opportunities

kumare3 avatar Mar 21 '23 01:03 kumare3

Got it. I also need to think more about this and collect a solid use case. I joined the Flyte slack workspace and will ping you when I get some results. Thanks!

XinEDprob avatar Mar 21 '23 02:03 XinEDprob

@XinEDprob can you please comment and tell me more, how do you want the class to look like. which of the 2 versions i recommended would you like an implementation sample for?

kumare3 avatar May 03 '23 04:05 kumare3

Hi @kumare3, sorry for the late reply. The second version (the one that you think is more involved) is more interesting to me.

XinEDprob avatar May 07 '23 21:05 XinEDprob

@XinEDprob I tried this, and seems actually harder than i thought. We need a way to resolve the instance of the class. There are a few tricks, but this is more work than we can do in a short time. I can give you what I have

from functools import wraps

from flytekit import workflow, task
from flytekit.core.tracker import TrackedInstance


def class_task(fn, **kwargs):
    fn._flyte_task = True

    @wraps(fn)
    def wrapper(*args, **kwds):
        return fn(*args, **kwds)
    output_task = task(wrapper, task_resolver="TODO Add task resolver here",  "**kwargs)
    return output_task


class PythonClassTask(type(TrackedInstance)):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def __new__(cls, name, bases, namespace, **kwargs):
        o = super().__new__(cls, name, bases, namespace, **kwargs)
        o._tasks = [value for value in namespace.values() if hasattr(value, '_flyte_task')]
        return o


class MyFlyteTaskClass(TrackedInstance, metaclass=PythonClassTask):

    def __init__(self, i: int, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.i = i

    @class_task
    def task1(self, a: int, b: int) -> int:
        return a + b + self.i

    @class_task
    def task2(self, a: int, b: int) -> int:
        return a * b + self.i


@workflow
def wf(i: int, j: int, k: int) -> (int, int):
    t = MyFlyteTaskClass(i=i)
    return t.task1(a=j, b=k), t.task2(a=j, b=k)


if __name__ == "__main__":
    c = MyFlyteTaskClass(i=1)
    wf(i=1, j=2, k=3)

the task resolver itself needs to be able to create the object of the class, this seems tricky - doable but hard

kumare3 avatar May 25 '23 05:05 kumare3

Thanks for letting me know and the code! I will take a look at the above code and see what I can do for this feature.

XinEDprob avatar May 29 '23 21:05 XinEDprob

Hello 👋, this issue has been inactive for over 9 months. To help maintain a clean and focused backlog, we'll be marking this issue as stale and will engage on it to decide if it is still applicable. Thank you for your contribution and understanding! 🙏

github-actions[bot] avatar Feb 24 '24 00:02 github-actions[bot]

@XinEDprob did you actually get this running? We are thinking of working on somethings that may help here.

kumare3 avatar Apr 26 '24 05:04 kumare3

@XinEDprob did you actually get this running? We are thinking of working on somethings that may help here.↳

@kumare3 , I haven't had the opportunity to address this yet; it remains on my backlog. If you intend to tackle it, that would be appreciated. Otherwise, I'll prioritize it for a later time.

XinEDprob avatar Apr 28 '24 21:04 XinEDprob