prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Cannot pass dict as a parameter 'data' to Completed state on Prefect v3.0.0rc16

Open ihor-ramskyi-globallogic opened this issue 1 year ago • 4 comments

Bug summary

I cannot pass dict as parameter 'data' on Completed state on Prefect v3.0.0rc16. This is the code on which it fails:

from prefect import flow, task
from prefect.states import Completed

@task
def task_a():
    return Completed(message='A', data={'A': 1})

@flow
def some_flow():
    a = task_a.submit().result()
    return

if __name__ == '__main__':
    some_flow()

As result of running this code, I get such log and traceback:

logs and traceback
17:28:58.177 | INFO    | prefect.engine - Created flow run 'energetic-coot' for flow 'some-flow'
17:28:58.179 | INFO    | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/7b207023-a0a2-4eda-816b-218ace94fb46
17:28:58.271 | INFO    | Flow run 'energetic-coot' - Submitting task task_a to thread pool executor...
17:28:58.373 | INFO    | Task run 'task_a-0' - Created task run 'task_a-0' for task 'task_a'
17:28:58.458 | ERROR   | Task run 'task_a-0' - Task run failed with exception: TypeError("Can't instantiate abstract class BaseResult with abstract methods create, get") - Retries are exhausted
Traceback (most recent call last):
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 804, in run_context
    yield self
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 1420, in run_task_sync
    engine.call_task_fn(txn)
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 832, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 6, in task_a
    return Completed(message='A', data={'A': 1})
  File "My_Path\lib\site-packages\prefect\states.py", line 556, in Completed
    return cls(type=StateType.COMPLETED, **kwargs)
  File "My_Path\lib\site-packages\pydantic\main.py", line 176, in __init__
    self.__pydantic_validator__.validate_python(data, self_instance=self)
  File "My_Path\lib\site-packages\prefect\results.py", line 405, in __new__
    return super().__new__(cls)
TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get
17:28:58.495 | ERROR   | Task run 'task_a-0' - Finished in state Failed("Task run encountered an exception TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get")
17:28:58.498 | ERROR   | Flow run 'energetic-coot' - Encountered exception during execution: TypeError("Can't instantiate abstract class BaseResult with abstract methods create, get")
Traceback (most recent call last):
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 636, in run_context
    yield self
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 680, in run_flow_sync
    engine.call_flow_fn()
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 659, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
  File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 11, in some_flow
    a = task_a.submit().result()
  File "My_Path\lib\site-packages\prefect\futures.py", line 164, in result
    _result = self._final_state.result(
  File "My_Path\lib\site-packages\prefect\client\schemas\objects.py", line 261, in result
    return get_state_result(
  File "My_Path\lib\site-packages\prefect\states.py", line 68, in get_state_result
    return _get_state_result(
  File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 392, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
  File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 243, in run_coro_as_sync
    return call.result()
  File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
    return self.future.result(timeout=timeout)
  File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 182, in result
    return self.__get_result()
  File "C:\Users\ihor.ramskyi\AppData\Local\Programs\Python\Python310\lib\concurrent\futures\_base.py", line 403, in __get_result
    raise self._exception
  File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
    result = await coro
  File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 225, in coroutine_wrapper
    return await task
  File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 382, in ctx_call
    result = await async_fn(*args, **kwargs)
  File "My_Path\lib\site-packages\prefect\states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 804, in run_context
    yield self
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 1420, in run_task_sync
    engine.call_task_fn(txn)
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 832, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 6, in task_a
    return Completed(message='A', data={'A': 1})
  File "My_Path\lib\site-packages\prefect\states.py", line 556, in Completed
    return cls(type=StateType.COMPLETED, **kwargs)
  File "My_Path\lib\site-packages\pydantic\main.py", line 176, in __init__
    self.__pydantic_validator__.validate_python(data, self_instance=self)
  File "My_Path\lib\site-packages\prefect\results.py", line 405, in __new__
    return super().__new__(cls)
TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get
17:28:58.543 | ERROR   | Flow run 'energetic-coot' - Finished in state Failed("Flow run encountered an exception: TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get")
Traceback (most recent call last):
  File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 15, in <module>
    some_flow()
  File "My_Path\lib\site-packages\prefect\flows.py", line 1322, in __call__
    return run_flow(
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 802, in run_flow
    return run_flow_sync(**kwargs)
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 682, in run_flow_sync
    return engine.state if return_type == "state" else engine.result()
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 246, in result
    raise self._raised
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 636, in run_context
    yield self
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 680, in run_flow_sync
    engine.call_flow_fn()
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 659, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
  File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 11, in some_flow
    a = task_a.submit().result()
  File "My_Path\lib\site-packages\prefect\futures.py", line 164, in result
    _result = self._final_state.result(
  File "My_Path\lib\site-packages\prefect\client\schemas\objects.py", line 261, in result
    return get_state_result(
  File "My_Path\lib\site-packages\prefect\states.py", line 68, in get_state_result
    return _get_state_result(
  File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 392, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
  File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 243, in run_coro_as_sync
    return call.result()
  File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
    return self.future.result(timeout=timeout)
  File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 182, in result
    return self.__get_result()
  File "C:\Users\ihor.ramskyi\AppData\Local\Programs\Python\Python310\lib\concurrent\futures\_base.py", line 403, in __get_result
    raise self._exception
  File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
    result = await coro
  File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 225, in coroutine_wrapper
    return await task
  File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 382, in ctx_call
    result = await async_fn(*args, **kwargs)
  File "My_Path\lib\site-packages\prefect\states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 804, in run_context
    yield self
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 1420, in run_task_sync
    engine.call_task_fn(txn)
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 832, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 6, in task_a
    return Completed(message='A', data={'A': 1})
  File "My_Path\lib\site-packages\prefect\states.py", line 556, in Completed
    return cls(type=StateType.COMPLETED, **kwargs)
  File "My_Path\lib\site-packages\pydantic\main.py", line 176, in __init__
    self.__pydantic_validator__.validate_python(data, self_instance=self)
  File "My_Path\lib\site-packages\prefect\results.py", line 405, in __new__
    return super().__new__(cls)
TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get

However, same code works correctly (returning Complete state from both the task and the flow), when I run it on Prefect v2.20.0 or when I run it on Prefect v3.0.0rc16 but pass data=1, data='abcde', or even data=(1, {'A': 1}).

Version info (prefect version output)

Version:             3.0.0rc16
API version:         0.8.4
Python version:      3.10.11
Git commit:          aad66936
Built:               Mon, Aug 12, 2024 3:51 PM
OS/Arch:             win32/AMD64
Profile:             default
Server type:         server
Pydantic version:    2.7.1

Additional context

No response

thanks for the issue @ihor-ramskyi-globallogic - we'll take a look here

just out of curiosity, what's your use case for passing data to a Completed state? i.e. how did you use data downstream in 2.x?

zzstoatzz avatar Aug 14 '24 14:08 zzstoatzz

note to self, here's a more concise repro

In [1]: from prefect.states import Completed

In [2]: Completed(data={"A": 1})
...
File ~/github.com/prefecthq/prefect/src/prefect/results.py:407, in BaseResult.__new__(cls, **kwargs)
    405     return super().__new__(subcls)
    406 else:
--> 407     return super().__new__(cls)

TypeError: Can't instantiate abstract class BaseResult without an implementation for abstract methods 'create', 'get'

zzstoatzz avatar Aug 14 '24 15:08 zzstoatzz

@zzstoatzz seems like in this case we should dispatch to an UnpersistedResult type in the creation of the BaseResult?

cicdw avatar Aug 14 '24 16:08 cicdw

thanks for the issue @ihor-ramskyi-globallogic - we'll take a look here

just out of curiosity, what's your use case for passing data to a Completed state? i.e. how did you use data downstream in 2.x?

I return Failed if task should be failed for any reason - and for unified approach, I use Completed otherwise. So, I still have to return data from the task, and I put it as data of Completed state.

However, I cannot use type dict for data for Failed state anyways... Because it says it cannot resolve dict in exception.

@zzstoatzz sorry to bother you, but is there a plan to merge the fix in the near future?

hi @ihor-ramskyi-globallogic - thanks for the bump

i hesitated with that PR because I couldn't reason about the solution, even though it did appear to accomplish the desired outcome. I'll revisit it as soon as I can

zzstoatzz avatar Sep 06 '24 00:09 zzstoatzz

Hi again Thank you for response! However, I have to bother you again - this issue blocks my team from upgrading to Prefect 3 without changing bunch of code. Is there any visible point in time when this gonna be merged?

We just made a lot of updates to our results implementation so we will review @zzstoatzz 's for next week's release. If for some reason we decide it's not a viable solution, we will let you know and give you an update on timing.

cicdw avatar Sep 19 '24 07:09 cicdw

@cicdw thanks for the update!