prefect
prefect copied to clipboard
can task decorater supports Object.method?
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar request and didn't find it.
- [X] I searched the Prefect documentation for this feature.
Prefect Version
2.x
Describe the current behavior
from prefect import flow as prefect_flow
from prefect import task as prefect_task
class Foo():
def __init__(self,a=1):
self.a = a
#print(a)
def do_method():
print(self.a)
def create_foo():
foo = Foo()
print("get foo!")
return foo
def dummy_flow():
foo = prefect_task(create_foo)()
prefect_task(foo.do_method)()
prefect_flow(dummy_flow)()
File ~/.conda/envs/prefect/lib/python3.8/inspect.py:1808, in _signature_bound_method(sig)
1805 params = tuple(sig.parameters.values())
1807 if not params or params[0].kind in (_VAR_KEYWORD, _KEYWORD_ONLY):
-> 1808 raise ValueError('invalid method signature')
1810 kind = params[0].kind
1811 if kind in (_POSITIONAL_OR_KEYWORD, _POSITIONAL_ONLY):
1812 # Drop first parameter:
1813 # '(p1, p2[, ...])' -> '(p2[, ...])'
ValueError: invalid method signature
Describe the proposed behavior
Such a "prefect_task(foo.do_method)" can be successfully executed.
Example Use
i believe the current behaviour can be executed.
Additional context
Hi, Prefect group! it is exciting that Prefect orion has break the wall of DAG concept, which we can build tasks/flows in a nested way. Dynamically generating task in executing make the "loop" condition possible.
To make such a "task" decorater more convient, can "prefect_task" decorator support the feature of "decorate class method"? I believe in inspect module, we can get the "method" belonging to the object by using "foo.do_method._self_". Maybe we can do something to get such a feature. in my imagination: we can using like this or rename it?
prefect_task(foo.do_method , ismethod=True, task_name= "balabala")
Many thanks ! : )
Thanks for the request! We do not recommend using methods for tasks as managing state during concurrent and distributed execution can be confusing. That said, if someone wants to try it it should be possible. We should handle this without requiring an ismethod
on the task decorator — I believe we can inspect if it is a method. Can you share the full traceback for the failure?
@madkinsz Thanks for your kind reply.
tasks as managing state during concurrent and distributed execution can be confusing
In my case, Is the object created the one I get in each prefect task? I mean every prefect task having foo instance has the same address in memory? Operations on the same object do have potential risks, but i think it depends on the user's usage of the object, i guess.... One thing to confirm is that such an object should be picklable without lock or in some io considering...
I believe we can inspect if it is a method
Yes, i think you are right. inspect module can judge whether the object is method or function, "ismethod" is extra. But I don't know if I should distinguish between func and method, like you consider on concurrent.
And this is the total error.
/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/prefect/flows.py:199: UserWarning: A flow named 'dummy-flow' and defined at '/tmp/ipykernel_237630/3946684294.py:18' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
10:43:07.804 | INFO | prefect.engine - Created flow run 'ultraviolet-cow' for flow 'dummy-flow'
/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/prefect/tasks.py:185: UserWarning: A task named 'create_foo' and defined at '/tmp/ipykernel_237630/3946684294.py:9' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
10:43:07.960 | INFO | Flow run 'ultraviolet-cow' - Created task run 'create_foo-8b412ec3-0' for task 'create_foo'
10:43:07.961 | INFO | Flow run 'ultraviolet-cow' - Executing 'create_foo-8b412ec3-0' immediately...
10:43:08.035 | INFO | Task run 'create_foo-8b412ec3-0' - Finished in state Completed()
10:43:08.038 | ERROR | Flow run 'ultraviolet-cow' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/prefect/engine.py", line 578, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/tmp/ipykernel_237630/3946684294.py", line 21, in dummy_flow
foo = prefect_task(foo.do_method)()
File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/prefect/tasks.py", line 828, in task
Task(
File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/prefect/context.py", line 163, in __register_init__
__init__(__self__, *args, **kwargs)
File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/prefect/tasks.py", line 153, in __init__
raise_for_reserved_arguments(self.fn, ["return_state", "wait_for"])
File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/prefect/utilities/callables.py", line 177, in raise_for_reserved_arguments
function_paremeters = inspect.signature(fn).parameters
File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/inspect.py", line 3093, in signature
return Signature.from_callable(obj, follow_wrapped=follow_wrapped)
File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/inspect.py", line 2842, in from_callable
return _signature_from_callable(obj, sigcls=cls,
File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/inspect.py", line 2228, in _signature_from_callable
return _signature_bound_method(sig)
File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/inspect.py", line 1808, in _signature_bound_method
raise ValueError('invalid method signature')
ValueError: invalid method signature
10:43:08.069 | ERROR | Flow run 'ultraviolet-cow' - Finished in state Failed('Flow run encountered an exception. ValueError: invalid method signature\n')
get foo!
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
Cell In [9], line 23
20 foo = prefect_task(create_foo)()
21 foo = prefect_task(foo.do_method)()
---> 23 prefect_flow(dummy_flow)()
File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/flows.py:429, in Flow.__call__(self, return_state, *args, **kwargs)
425 parameters = get_call_parameters(self.fn, args, kwargs)
427 return_type = "state" if return_state else "result"
--> 429 return enter_flow_run_engine_from_flow_call(
430 self, parameters, return_type=return_type
431 )
File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/engine.py:147, in enter_flow_run_engine_from_flow_call(flow, parameters, return_type)
143 elif in_async_main_thread():
144 # An event loop is already running and we must create a blocking portal to
145 # run async code from this synchronous context
146 with start_blocking_portal() as portal:
--> 147 return portal.call(begin_run)
148 else:
149 # An event loop is not running so we will create one
150 return anyio.run(begin_run)
File ~/.conda/envs/prefect/lib/python3.8/site-packages/anyio/from_thread.py:283, in BlockingPortal.call(self, func, *args)
268 def call(
269 self,
270 func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]],
271 *args: object
272 ) -> T_Retval:
273 """
274 Call the given function in the event loop thread.
275
(...)
281
282 """
--> 283 return cast(T_Retval, self.start_task_soon(func, *args).result())
File ~/.conda/envs/prefect/lib/python3.8/concurrent/futures/_base.py:439, in Future.result(self, timeout)
437 raise CancelledError()
438 elif self._state == FINISHED:
--> 439 return self.__get_result()
440 else:
441 raise TimeoutError()
File ~/.conda/envs/prefect/lib/python3.8/concurrent/futures/_base.py:388, in Future.__get_result(self)
386 def __get_result(self):
387 if self._exception:
--> 388 raise self._exception
389 else:
390 return self._result
File ~/.conda/envs/prefect/lib/python3.8/site-packages/anyio/from_thread.py:219, in BlockingPortal._call_func(self, func, args, kwargs, future)
216 else:
217 future.add_done_callback(callback)
--> 219 retval = await retval
220 except self._cancelled_exc_class:
221 future.cancel()
File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/client/utilities.py:47, in inject_client.<locals>.with_injected_client(*args, **kwargs)
45 async with client_context as new_client:
46 kwargs.setdefault("client", new_client or client)
---> 47 return await fn(*args, **kwargs)
File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/engine.py:229, in create_then_begin_flow_run(flow, parameters, return_type, client)
227 return state
228 elif return_type == "result":
--> 229 return await state.result(fetch=True)
230 else:
231 raise ValueError(f"Invalid return type for flow engine {return_type!r}.")
File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/states.py:74, in _get_state_result(state, raise_on_failure)
70 """
71 Internal implementation for `get_state_result` without async backwards compatibility
72 """
73 if raise_on_failure and (state.is_crashed() or state.is_failed()):
---> 74 raise await get_state_exception(state)
76 if isinstance(state.data, DataDocument):
77 result = result_from_state_with_data_document(
78 state, raise_on_failure=raise_on_failure
79 )
File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/engine.py:578, in orchestrate_flow_run(flow, flow_run, parameters, interruptible, client, partial_flow_run_context)
572 else:
573 run_sync = (
574 run_sync_in_interruptible_worker_thread
575 if interruptible or timeout_scope
576 else run_sync_in_worker_thread
577 )
--> 578 result = await run_sync(flow_call)
580 waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
581 flow_run_context.task_run_futures, client=client
582 )
584 except Exception as exc:
File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/utilities/asyncutils.py:68, in run_sync_in_worker_thread(__fn, *args, **kwargs)
57 """
58 Runs a sync function in a new worker thread so that the main thread's event loop
59 is not blocked
(...)
65 thread may continue running — the outcome will just be ignored.
66 """
67 call = partial(__fn, *args, **kwargs)
---> 68 return await anyio.to_thread.run_sync(call, cancellable=True)
File ~/.conda/envs/prefect/lib/python3.8/site-packages/anyio/to_thread.py:31, in run_sync(func, cancellable, limiter, *args)
10 async def run_sync(
11 func: Callable[..., T_Retval],
12 *args: object,
13 cancellable: bool = False,
14 limiter: Optional[CapacityLimiter] = None
15 ) -> T_Retval:
16 """
17 Call the given function with the given arguments in a worker thread.
18
(...)
29
30 """
---> 31 return await get_asynclib().run_sync_in_worker_thread(
32 func, *args, cancellable=cancellable, limiter=limiter
33 )
File ~/.conda/envs/prefect/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:937, in run_sync_in_worker_thread(func, cancellable, limiter, *args)
935 context.run(sniffio.current_async_library_cvar.set, None)
936 worker.queue.put_nowait((context, func, args, future))
--> 937 return await future
File ~/.conda/envs/prefect/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:867, in WorkerThread.run(self)
865 exception: Optional[BaseException] = None
866 try:
--> 867 result = context.run(func, *args)
868 except BaseException as exc:
869 exception = exc
Cell In [9], line 21, in dummy_flow()
18 def dummy_flow():
20 foo = prefect_task(create_foo)()
---> 21 foo = prefect_task(foo.do_method)()
File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/tasks.py:828, in task(__fn, name, description, tags, version, cache_key_fn, cache_expiration, retries, retry_delay_seconds, persist_result, result_storage, result_serializer)
745 """
746 Decorator to designate a function as a task in a Prefect workflow.
747
(...)
823 >>> return "hello"
824 """
825 if __fn:
826 return cast(
827 Task[P, R],
--> 828 Task(
829 fn=__fn,
830 name=name,
831 description=description,
832 tags=tags,
833 version=version,
834 cache_key_fn=cache_key_fn,
835 cache_expiration=cache_expiration,
836 retries=retries,
837 retry_delay_seconds=retry_delay_seconds,
838 persist_result=persist_result,
839 result_storage=result_storage,
840 result_serializer=result_serializer,
841 ),
842 )
843 else:
844 return cast(
845 Callable[[Callable[P, R]], Task[P, R]],
846 partial(
(...)
859 ),
860 )
File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/context.py:163, in PrefectObjectRegistry.register_instances.<locals>.__register_init__(__self__, *args, **kwargs)
161 registry = cls.get()
162 try:
--> 163 __init__(__self__, *args, **kwargs)
164 except Exception as exc:
165 if not registry or not registry.capture_failures:
File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/tasks.py:153, in Task.__init__(self, fn, name, description, tags, version, cache_key_fn, cache_expiration, retries, retry_delay_seconds, persist_result, result_storage, result_serializer)
150 self.name = name or self.fn.__name__
151 self.version = version
--> 153 raise_for_reserved_arguments(self.fn, ["return_state", "wait_for"])
155 self.tags = set(tags if tags else [])
156 self.task_key = to_qualified_name(self.fn)
File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/utilities/callables.py:177, in raise_for_reserved_arguments(fn, reserved_arguments)
174 def raise_for_reserved_arguments(fn: Callable, reserved_arguments: Iterable[str]):
175 """Raise a ReservedArgumentError if `fn` has any parameters that conflict
176 with the names contained in `reserved_arguments`."""
--> 177 function_paremeters = inspect.signature(fn).parameters
179 for argument in reserved_arguments:
180 if argument in function_paremeters:
File ~/.conda/envs/prefect/lib/python3.8/inspect.py:3093, in signature(obj, follow_wrapped)
3091 def signature(obj, *, follow_wrapped=True):
3092 """Get a signature object for the passed callable."""
-> 3093 return Signature.from_callable(obj, follow_wrapped=follow_wrapped)
File ~/.conda/envs/prefect/lib/python3.8/inspect.py:2842, in Signature.from_callable(cls, obj, follow_wrapped)
2839 @classmethod
2840 def from_callable(cls, obj, *, follow_wrapped=True):
2841 """Constructs Signature for the given callable object."""
-> 2842 return _signature_from_callable(obj, sigcls=cls,
2843 follow_wrapper_chains=follow_wrapped)
File ~/.conda/envs/prefect/lib/python3.8/inspect.py:2228, in _signature_from_callable(obj, follow_wrapper_chains, skip_bound_arg, sigcls)
2221 sig = _signature_from_callable(
2222 obj.__func__,
2223 follow_wrapper_chains=follow_wrapper_chains,
2224 skip_bound_arg=skip_bound_arg,
2225 sigcls=sigcls)
2227 if skip_bound_arg:
-> 2228 return _signature_bound_method(sig)
2229 else:
2230 return sig
File ~/.conda/envs/prefect/lib/python3.8/inspect.py:1808, in _signature_bound_method(sig)
1805 params = tuple(sig.parameters.values())
1807 if not params or params[0].kind in (_VAR_KEYWORD, _KEYWORD_ONLY):
-> 1808 raise ValueError('invalid method signature')
1810 kind = params[0].kind
1811 if kind in (_POSITIONAL_OR_KEYWORD, _POSITIONAL_ONLY):
1812 # Drop first parameter:
1813 # '(p1, p2[, ...])' -> '(p2[, ...])'
ValueError: invalid method signature
Hello, I found a workaround for running a class method as a prefect task. This might be useful for someone. I just declare a task decorated function outside the class scope, and run it using the "pipe" method. Using this, I was able to run the method as a prefect task and use the class object without significant changes to the code. Here's an example:
from time import sleep
from prefect import flow, task
class MyClass:
def __init__(self) -> None:
pass
def pipe(self, func, *args, **kwargs):
try:
self = func(self, *args, **kwargs)
except Exception as e:
print(f'Piping {func.__name__} failed. Exception: {e.message}')
finally:
return self
def f1(self, time=2):
print(f'My time is {time} seconds')
def f2(self, time=2):
print(f'Sleeping {time} seconds')
sleep(time)
@task
def f3(self, time=2):
sleep(time)
print('aaa')
@task
def f1(self, *args, **kwargs):
print('This is a piped function')
return MyClass.f1(self, *args, **kwargs)
@flow(name='encapsulated tasks', log_prints=True)
def main(*args):
c = MyClass()
c.f1()
c.f2()
c.pipe(f1, 5)
try:
c.f3()
except Exception as e:
print(str(e))
main()
Output:
11:05:26.793 | INFO | prefect.engine - Created flow run 'emerald-tody' for flow 'encapsulated tasks'
11:05:26.795 | INFO | Flow run 'emerald-tody' - View at http://127.0.0.1:4200/flow-runs/flow-run/ea0d6fe1-e373-4aa0-8a1b-a621913644a8
11:05:26.948 | INFO | Flow run 'emerald-tody' - My time is 2 seconds
11:05:26.950 | INFO | Flow run 'emerald-tody' - Sleeping 2 seconds
11:05:29.903 | INFO | Flow run 'emerald-tody' - Created task run 'f1-0' for task 'f1'
11:05:29.907 | INFO | Flow run 'emerald-tody' - Executing 'f1-0' immediately...
11:05:30.093 | INFO | Task run 'f1-0' - This is a piped function
11:05:30.094 | INFO | Task run 'f1-0' - My time is 5 seconds
11:05:30.214 | INFO | Task run 'f1-0' - Finished in state Completed()
11:05:30.214 | INFO | Flow run 'emerald-tody' - Error binding parameters for function 'f3': missing a required argument: 'self'.
Function 'f3' has signature 'self, time=2' but received args: () and kwargs: [].
11:05:30.340 | INFO | Flow run 'emerald-tody' - Finished in state Completed('All states completed.')
@canuters how to deal with the f3 method? Is there any way to decorate f3 method as a task?