flyte
flyte copied to clipboard
[Core feature] Add the support for defining/declaring tasks in classes
Motivation: Why do you think this is important?
I wanted to build a Flyte pipeline for an existing project and I noticed that there are lots of classes in this project. If the Flyte can not support the code like the following:
from flytekit import workflow, task
class MyClass:
@task
def t1(self):
return 'hello world'
@workflow
def wf():
MyClass().t1()
there will be lots of work for us as we need to convert all the classes to functions so that these functions can be decorated as tasks. I believe this feature is important for the following reasons:
- Encourage people to build more Flyte pipelines for the existing projects
- The code will be more organized and concise
Goal: What should the final outcome look like, ideally?
In the end, I hope Flyte can support something like the following
from flytekit import workflow, task
class MyClass:
@task
def t1(self):
return 'hello world'
@workflow
def wf():
MyClass().t1()
That is to define tasks within classes
Describe alternatives you've considered
I don't think I have any alternative for this issue.
Propose: Link/Inline OR Additional context
No response
Are you sure this issue hasn't been raised already?
- [X] Yes
Have you read the Code of Conduct?
- [X] Yes
Thank you for opening your first issue here! 🛠
@XinEDprob this is infact already possible using TaskResolvers. Here is a ClassTaskResolver https://github.com/flyteorg/flytekit/blob/34f80ba12eda64431be4c21c78df81b7afbe2758/flytekit/core/class_based_resolver.py#L9
Here is an example in UnionML https://github.com/unionai-oss/unionml/blob/main/unionml/task_resolver.py
The problem is that you have written a simple example of a Class. But what if the class needs specific values for init.py you can write a metaclass based resolver that can allow you to instantiate the class using the input values.
You can also add support for something like this
@task_class
class MyTaskClass:
def __init__(self, i: int):
...
@entrypoint
def run(self) -> str:
....
@workflow
def wf(i: int) -> str:
return MyTaskClass(i=i)
In this case a task can be created with inputs i: int
and outputs str
This is not supported today but possible.
And the final thing is what you said. this is more involved. What you had specified is something like MagicMock Here is what i envision
class MyTaskClass(metaclass=flytekit.TaskClass):
def __init__(self, i: int):
...
@class_task
def t1(self, j: str) -> str:
....
@workflow
def wf(i: int, j: str) -> str:
return MyTaskClass(i=i).t1(j=j)
Hi @kumare3 , thanks for the comments. I am pretty new to Flyte and it may take some time for me to fully understand your comments.
It seems a simple version (or special version) of what I said is possible to be implemented with TaskResolvers. The general case is so involved that the Flyte does not support it yet. So do we have plans to implement for the general case in the future?
Hmm we have not implemented as no one has really asked for it and in general priority has been lower. Please join slack.Flyte.org and would love to understand why this is needed and see if there is potential collaboration opportunities
Got it. I also need to think more about this and collect a solid use case. I joined the Flyte slack workspace and will ping you when I get some results. Thanks!
@XinEDprob can you please comment and tell me more, how do you want the class to look like. which of the 2 versions i recommended would you like an implementation sample for?
Hi @kumare3, sorry for the late reply. The second version (the one that you think is more involved) is more interesting to me.
@XinEDprob I tried this, and seems actually harder than i thought. We need a way to resolve the instance of the class. There are a few tricks, but this is more work than we can do in a short time. I can give you what I have
from functools import wraps
from flytekit import workflow, task
from flytekit.core.tracker import TrackedInstance
def class_task(fn, **kwargs):
fn._flyte_task = True
@wraps(fn)
def wrapper(*args, **kwds):
return fn(*args, **kwds)
output_task = task(wrapper, task_resolver="TODO Add task resolver here", "**kwargs)
return output_task
class PythonClassTask(type(TrackedInstance)):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def __new__(cls, name, bases, namespace, **kwargs):
o = super().__new__(cls, name, bases, namespace, **kwargs)
o._tasks = [value for value in namespace.values() if hasattr(value, '_flyte_task')]
return o
class MyFlyteTaskClass(TrackedInstance, metaclass=PythonClassTask):
def __init__(self, i: int, *args, **kwargs):
super().__init__(*args, **kwargs)
self.i = i
@class_task
def task1(self, a: int, b: int) -> int:
return a + b + self.i
@class_task
def task2(self, a: int, b: int) -> int:
return a * b + self.i
@workflow
def wf(i: int, j: int, k: int) -> (int, int):
t = MyFlyteTaskClass(i=i)
return t.task1(a=j, b=k), t.task2(a=j, b=k)
if __name__ == "__main__":
c = MyFlyteTaskClass(i=1)
wf(i=1, j=2, k=3)
the task resolver itself needs to be able to create the object of the class, this seems tricky - doable but hard
Thanks for letting me know and the code! I will take a look at the above code and see what I can do for this feature.
Hello 👋, this issue has been inactive for over 9 months. To help maintain a clean and focused backlog, we'll be marking this issue as stale and will engage on it to decide if it is still applicable. Thank you for your contribution and understanding! 🙏
@XinEDprob did you actually get this running? We are thinking of working on somethings that may help here.
@XinEDprob did you actually get this running? We are thinking of working on somethings that may help here.↳
@kumare3 , I haven't had the opportunity to address this yet; it remains on my backlog. If you intend to tackle it, that would be appreciated. Otherwise, I'll prioritize it for a later time.