Cannot pass dict as a parameter 'data' to Completed state on Prefect v3.0.0rc16
Bug summary
I cannot pass dict as parameter 'data' on Completed state on Prefect v3.0.0rc16. This is the code on which it fails:
from prefect import flow, task
from prefect.states import Completed
@task
def task_a():
return Completed(message='A', data={'A': 1})
@flow
def some_flow():
a = task_a.submit().result()
return
if __name__ == '__main__':
some_flow()
As result of running this code, I get such log and traceback:
logs and traceback
17:28:58.177 | INFO | prefect.engine - Created flow run 'energetic-coot' for flow 'some-flow'
17:28:58.179 | INFO | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/7b207023-a0a2-4eda-816b-218ace94fb46
17:28:58.271 | INFO | Flow run 'energetic-coot' - Submitting task task_a to thread pool executor...
17:28:58.373 | INFO | Task run 'task_a-0' - Created task run 'task_a-0' for task 'task_a'
17:28:58.458 | ERROR | Task run 'task_a-0' - Task run failed with exception: TypeError("Can't instantiate abstract class BaseResult with abstract methods create, get") - Retries are exhausted
Traceback (most recent call last):
File "My_Path\lib\site-packages\prefect\task_engine.py", line 804, in run_context
yield self
File "My_Path\lib\site-packages\prefect\task_engine.py", line 1420, in run_task_sync
engine.call_task_fn(txn)
File "My_Path\lib\site-packages\prefect\task_engine.py", line 832, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 6, in task_a
return Completed(message='A', data={'A': 1})
File "My_Path\lib\site-packages\prefect\states.py", line 556, in Completed
return cls(type=StateType.COMPLETED, **kwargs)
File "My_Path\lib\site-packages\pydantic\main.py", line 176, in __init__
self.__pydantic_validator__.validate_python(data, self_instance=self)
File "My_Path\lib\site-packages\prefect\results.py", line 405, in __new__
return super().__new__(cls)
TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get
17:28:58.495 | ERROR | Task run 'task_a-0' - Finished in state Failed("Task run encountered an exception TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get")
17:28:58.498 | ERROR | Flow run 'energetic-coot' - Encountered exception during execution: TypeError("Can't instantiate abstract class BaseResult with abstract methods create, get")
Traceback (most recent call last):
File "My_Path\lib\site-packages\prefect\flow_engine.py", line 636, in run_context
yield self
File "My_Path\lib\site-packages\prefect\flow_engine.py", line 680, in run_flow_sync
engine.call_flow_fn()
File "My_Path\lib\site-packages\prefect\flow_engine.py", line 659, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 11, in some_flow
a = task_a.submit().result()
File "My_Path\lib\site-packages\prefect\futures.py", line 164, in result
_result = self._final_state.result(
File "My_Path\lib\site-packages\prefect\client\schemas\objects.py", line 261, in result
return get_state_result(
File "My_Path\lib\site-packages\prefect\states.py", line 68, in get_state_result
return _get_state_result(
File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 392, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 243, in run_coro_as_sync
return call.result()
File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
return self.future.result(timeout=timeout)
File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 182, in result
return self.__get_result()
File "C:\Users\ihor.ramskyi\AppData\Local\Programs\Python\Python310\lib\concurrent\futures\_base.py", line 403, in __get_result
raise self._exception
File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
result = await coro
File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 225, in coroutine_wrapper
return await task
File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 382, in ctx_call
result = await async_fn(*args, **kwargs)
File "My_Path\lib\site-packages\prefect\states.py", line 127, in _get_state_result
raise await get_state_exception(state)
File "My_Path\lib\site-packages\prefect\task_engine.py", line 804, in run_context
yield self
File "My_Path\lib\site-packages\prefect\task_engine.py", line 1420, in run_task_sync
engine.call_task_fn(txn)
File "My_Path\lib\site-packages\prefect\task_engine.py", line 832, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 6, in task_a
return Completed(message='A', data={'A': 1})
File "My_Path\lib\site-packages\prefect\states.py", line 556, in Completed
return cls(type=StateType.COMPLETED, **kwargs)
File "My_Path\lib\site-packages\pydantic\main.py", line 176, in __init__
self.__pydantic_validator__.validate_python(data, self_instance=self)
File "My_Path\lib\site-packages\prefect\results.py", line 405, in __new__
return super().__new__(cls)
TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get
17:28:58.543 | ERROR | Flow run 'energetic-coot' - Finished in state Failed("Flow run encountered an exception: TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get")
Traceback (most recent call last):
File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 15, in <module>
some_flow()
File "My_Path\lib\site-packages\prefect\flows.py", line 1322, in __call__
return run_flow(
File "My_Path\lib\site-packages\prefect\flow_engine.py", line 802, in run_flow
return run_flow_sync(**kwargs)
File "My_Path\lib\site-packages\prefect\flow_engine.py", line 682, in run_flow_sync
return engine.state if return_type == "state" else engine.result()
File "My_Path\lib\site-packages\prefect\flow_engine.py", line 246, in result
raise self._raised
File "My_Path\lib\site-packages\prefect\flow_engine.py", line 636, in run_context
yield self
File "My_Path\lib\site-packages\prefect\flow_engine.py", line 680, in run_flow_sync
engine.call_flow_fn()
File "My_Path\lib\site-packages\prefect\flow_engine.py", line 659, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 11, in some_flow
a = task_a.submit().result()
File "My_Path\lib\site-packages\prefect\futures.py", line 164, in result
_result = self._final_state.result(
File "My_Path\lib\site-packages\prefect\client\schemas\objects.py", line 261, in result
return get_state_result(
File "My_Path\lib\site-packages\prefect\states.py", line 68, in get_state_result
return _get_state_result(
File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 392, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 243, in run_coro_as_sync
return call.result()
File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
return self.future.result(timeout=timeout)
File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 182, in result
return self.__get_result()
File "C:\Users\ihor.ramskyi\AppData\Local\Programs\Python\Python310\lib\concurrent\futures\_base.py", line 403, in __get_result
raise self._exception
File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
result = await coro
File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 225, in coroutine_wrapper
return await task
File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 382, in ctx_call
result = await async_fn(*args, **kwargs)
File "My_Path\lib\site-packages\prefect\states.py", line 127, in _get_state_result
raise await get_state_exception(state)
File "My_Path\lib\site-packages\prefect\task_engine.py", line 804, in run_context
yield self
File "My_Path\lib\site-packages\prefect\task_engine.py", line 1420, in run_task_sync
engine.call_task_fn(txn)
File "My_Path\lib\site-packages\prefect\task_engine.py", line 832, in call_task_fn
result = call_with_parameters(self.task.fn, parameters)
File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 6, in task_a
return Completed(message='A', data={'A': 1})
File "My_Path\lib\site-packages\prefect\states.py", line 556, in Completed
return cls(type=StateType.COMPLETED, **kwargs)
File "My_Path\lib\site-packages\pydantic\main.py", line 176, in __init__
self.__pydantic_validator__.validate_python(data, self_instance=self)
File "My_Path\lib\site-packages\prefect\results.py", line 405, in __new__
return super().__new__(cls)
TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get
However, same code works correctly (returning Complete state from both the task and the flow), when I run it on Prefect v2.20.0 or when I run it on Prefect v3.0.0rc16 but pass data=1, data='abcde', or even data=(1, {'A': 1}).
Version info (prefect version output)
Version: 3.0.0rc16
API version: 0.8.4
Python version: 3.10.11
Git commit: aad66936
Built: Mon, Aug 12, 2024 3:51 PM
OS/Arch: win32/AMD64
Profile: default
Server type: server
Pydantic version: 2.7.1
Additional context
No response
thanks for the issue @ihor-ramskyi-globallogic - we'll take a look here
just out of curiosity, what's your use case for passing data to a Completed state? i.e. how did you use data downstream in 2.x?
note to self, here's a more concise repro
In [1]: from prefect.states import Completed
In [2]: Completed(data={"A": 1})
...
File ~/github.com/prefecthq/prefect/src/prefect/results.py:407, in BaseResult.__new__(cls, **kwargs)
405 return super().__new__(subcls)
406 else:
--> 407 return super().__new__(cls)
TypeError: Can't instantiate abstract class BaseResult without an implementation for abstract methods 'create', 'get'
@zzstoatzz seems like in this case we should dispatch to an UnpersistedResult type in the creation of the BaseResult?
thanks for the issue @ihor-ramskyi-globallogic - we'll take a look here
just out of curiosity, what's your use case for passing
datato aCompletedstate? i.e. how did you usedatadownstream in 2.x?
I return Failed if task should be failed for any reason - and for unified approach, I use Completed otherwise. So, I still have to return data from the task, and I put it as data of Completed state.
However, I cannot use type dict for data for Failed state anyways... Because it says it cannot resolve dict in exception.
@zzstoatzz sorry to bother you, but is there a plan to merge the fix in the near future?
hi @ihor-ramskyi-globallogic - thanks for the bump
i hesitated with that PR because I couldn't reason about the solution, even though it did appear to accomplish the desired outcome. I'll revisit it as soon as I can
Hi again Thank you for response! However, I have to bother you again - this issue blocks my team from upgrading to Prefect 3 without changing bunch of code. Is there any visible point in time when this gonna be merged?
We just made a lot of updates to our results implementation so we will review @zzstoatzz 's for next week's release. If for some reason we decide it's not a viable solution, we will let you know and give you an update on timing.
@cicdw thanks for the update!