MissingResult State data is missing.
Occurs when a worker's memory overflows (with_get_dask_client is commented out)
import dask.dataframe
import dask.distributed
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client
client = dask.distributed.Client()
@task
def read_data(start: str, end: str) -> dask.dataframe.DataFrame:
df = dask.datasets.timeseries(start, end, partition_freq="4w")
return df
@task
def process_data(df) -> dask.dataframe.DataFrame:
# with get_dask_client():
df_yearly_avg = df.groupby(df.index.year).mean()
return df_yearly_avg.compute()
@flow(task_runner=DaskTaskRunner(address=client.scheduler.address))
def dask_flow():
df = read_data.submit("1988", "2022")
df_yearly_average = process_data.submit(df)
return df_yearly_average
dask_flow()
Logs:
15:23:47.601 | INFO | Task run 'read_data-5bc97744-0' - Finished in state Completed()
2023-01-10 15:23:51,373 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:51660 (pid=5119) exceeded 95% memory budget. Restarting...
15:23:51.373 | WARNING | distributed.nanny.memory - Worker tcp://127.0.0.1:51660 (pid=5119) exceeded 95% memory budget. Restarting...
15:23:51.463 | INFO | distributed.nanny - Worker process 5119 was killed by signal 15
15:23:51.465 | INFO | distributed.core - Connection to tcp://127.0.0.1:51668 has been closed.
15:23:51.466 | INFO | distributed.scheduler - Remove worker <WorkerState 'tcp://127.0.0.1:51660', name: 1, status: running, memory: 1, processing: 1>
15:23:51.466 | INFO | distributed.core - Removing comms to tcp://127.0.0.1:51660
2023-01-10 15:23:51,469 - distributed.nanny - WARNING - Restarting worker
15:23:51.469 | WARNING | distributed.nanny - Restarting worker
15:23:51.890 | INFO | distributed.scheduler - Register worker <WorkerState 'tcp://127.0.0.1:53762', name: 1, status: init, memory: 0, processing: 0>
15:23:51.892 | INFO | distributed.scheduler - Starting worker compute stream, tcp://127.0.0.1:53762
15:23:51.893 | INFO | distributed.core - Starting established connection to tcp://127.0.0.1:53765
15:23:52.804 | INFO | Task run 'read_data-5bc97744-0' - Task run '9b5215ca-1822-4c6e-930d-34480fca578e' already finished.
15:23:53.025 | ERROR | Task run 'process_data-090555ba-0' - Crash detected! Execution was interrupted by an unexpected exception: MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
2023-01-10 15:23:53,499 - distributed.worker - WARNING - Compute Failed
Key: process_data-090555ba-0-fa34a3b0fd834a6aa2fd9ece3d2c233c-1
Function: begin_task_run
args: ()
kwargs: {'task': <prefect.tasks.Task object at 0x10cc5b6d0>, 'task_run': TaskRun(id=UUID('fa34a3b0-fd83-4a6a-a2fd-9ece3d2c233c'), name='process_data-090555ba-0', flow_run_id=UUID('51a4a2f7-bdbe-4433-b54f-bb0b60b9e95a'), task_key='__main__.process_data', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, retry_jitter_factor=None), tags=[], state_id=UUID('3a84e6eb-1a87-487f-8956-7a268a8fb1c3'), task_inputs={'df': [TaskRunResult(input_type='task_run', id=UUID('9b5215ca-1822-4c6e-930d-34480fca578e'))]}, state_type=StateType.PENDING, state_name='Pending', run_count=0, flow_run_run_count=0, expected_start_time=DateTime(2023, 1, 10, 23, 23, 46, 173061, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=41
Exception: "MissingResult('State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.')"
15:23:53.499 | WARNING | distributed.worker - Compute Failed
Key: process_data-090555ba-0-fa34a3b0fd834a6aa2fd9ece3d2c233c-1
Function: begin_task_run
args: ()
kwargs: {'task': <prefect.tasks.Task object at 0x10cc5b6d0>, 'task_run': TaskRun(id=UUID('fa34a3b0-fd83-4a6a-a2fd-9ece3d2c233c'), name='process_data-090555ba-0', flow_run_id=UUID('51a4a2f7-bdbe-4433-b54f-bb0b60b9e95a'), task_key='__main__.process_data', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, retry_jitter_factor=None), tags=[], state_id=UUID('3a84e6eb-1a87-487f-8956-7a268a8fb1c3'), task_inputs={'df': [TaskRunResult(input_type='task_run', id=UUID('9b5215ca-1822-4c6e-930d-34480fca578e'))]}, state_type=StateType.PENDING, state_name='Pending', run_count=0, flow_run_run_count=0, expected_start_time=DateTime(2023, 1, 10, 23, 23, 46, 173061, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=41
Exception: "MissingResult('State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.')"
Traceback:
---------------------------------------------------------------------------
MissingResult Traceback (most recent call last)
Cell In[1], line 24
21 df_yearly_average = process_data.submit(df)
22 return df_yearly_average
---> 24 dask_flow()
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/flows.py:448, in Flow.__call__(self, return_state, wait_for, *args, **kwargs)
444 parameters = get_call_parameters(self.fn, args, kwargs)
446 return_type = "state" if return_state else "result"
--> 448 return enter_flow_run_engine_from_flow_call(
449 self,
450 parameters,
451 wait_for=wait_for,
452 return_type=return_type,
453 )
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:161, in enter_flow_run_engine_from_flow_call(flow, parameters, wait_for, return_type)
157 elif in_async_main_thread():
158 # An event loop is already running and we must create a blocking portal to
159 # run async code from this synchronous context
160 with start_blocking_portal() as portal:
--> 161 return portal.call(begin_run)
162 else:
163 # An event loop is not running so we will create one
164 return anyio.run(begin_run)
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/from_thread.py:283, in BlockingPortal.call(self, func, *args)
268 def call(
269 self,
270 func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]],
271 *args: object
272 ) -> T_Retval:
273 """
274 Call the given function in the event loop thread.
275
(...)
281
282 """
--> 283 return cast(T_Retval, self.start_task_soon(func, *args).result())
File ~/mambaforge/envs/dask/lib/python3.9/concurrent/futures/_base.py:446, in Future.result(self, timeout)
444 raise CancelledError()
445 elif self._state == FINISHED:
--> 446 return self.__get_result()
447 else:
448 raise TimeoutError()
File ~/mambaforge/envs/dask/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
389 if self._exception:
390 try:
--> 391 raise self._exception
392 finally:
393 # Break a reference cycle with the exception in self._exception
394 self = None
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/from_thread.py:219, in BlockingPortal._call_func(self, func, args, kwargs, future)
216 else:
217 future.add_done_callback(callback)
--> 219 retval = await retval
220 except self._cancelled_exc_class:
221 future.cancel()
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/client/utilities.py:47, in inject_client.<locals>.with_injected_client(*args, **kwargs)
45 async with client_context as new_client:
46 kwargs.setdefault("client", new_client or client)
---> 47 return await fn(*args, **kwargs)
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:244, in create_then_begin_flow_run(flow, parameters, wait_for, return_type, client)
242 return state
243 elif return_type == "result":
--> 244 return await state.result(fetch=True)
245 else:
246 raise ValueError(f"Invalid return type for flow engine {return_type!r}.")
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/states.py:89, in _get_state_result(state, raise_on_failure)
84 raise PausedRun("Run paused.")
86 if raise_on_failure and (
87 state.is_crashed() or state.is_failed() or state.is_cancelled()
88 ):
---> 89 raise await get_state_exception(state)
91 if isinstance(state.data, DataDocument):
92 result = result_from_state_with_data_document(
93 state, raise_on_failure=raise_on_failure
94 )
File ~/Applications/python/prefect-dask/prefect_dask/task_runners.py:269, in DaskTaskRunner.wait(self, key, timeout)
267 future = self._get_dask_future(key)
268 try:
--> 269 return await future.result(timeout=timeout)
270 except distributed.TimeoutError:
271 return None
File ~/Applications/python/test/distributed/distributed/client.py:296, in Future._result(self, raiseit)
294 if raiseit:
295 typ, exc, tb = exc
--> 296 raise exc.with_traceback(tb)
297 else:
298 return exc
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:1303, in begin_task_run()
1297 raise RuntimeError(
1298 f"Cannot orchestrate task run '{task_run.id}'. "
1299 f"Failed to connect to API at {client.api_url}."
1300 ) from connect_error
1302 try:
-> 1303 state = await orchestrate_task_run(
1304 task=task,
1305 task_run=task_run,
1306 parameters=parameters,
1307 wait_for=wait_for,
1308 result_factory=result_factory,
1309 log_prints=log_prints,
1310 interruptible=interruptible,
1311 client=client,
1312 )
1314 if not maybe_flow_run_context:
1315 # When a a task run finishes on a remote worker flush logs to prevent
1316 # loss if the process exits
1317 OrionHandler.flush(block=True)
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:1378, in orchestrate_task_run()
1367 partial_task_run_context = PartialModel(
1368 TaskRunContext,
1369 task_run=task_run,
(...)
1373 log_prints=log_prints,
1374 )
1376 try:
1377 # Resolve futures in parameters into data
-> 1378 resolved_parameters = await resolve_inputs(parameters)
1379 # Resolve futures in any non-data dependencies to ensure they are ready
1380 await resolve_inputs(wait_for, return_data=False)
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:1671, in resolve_inputs()
1668 # Only retrieve the result if requested as it may be expensive
1669 return state.result(raise_on_failure=False, fetch=True) if return_data else None
-> 1671 return await run_sync_in_worker_thread(
1672 visit_collection,
1673 parameters,
1674 visit_fn=resolve_input,
1675 return_data=return_data,
1676 max_depth=max_depth,
1677 remove_annotations=True,
1678 context={},
1679 )
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:91, in run_sync_in_worker_thread()
80 """
81 Runs a sync function in a new worker thread so that the main thread's event loop
82 is not blocked
(...)
88 thread may continue running — the outcome will just be ignored.
89 """
90 call = partial(__fn, *args, **kwargs)
---> 91 return await anyio.to_thread.run_sync(
92 call, cancellable=True, limiter=get_thread_limiter()
93 )
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/to_thread.py:31, in run_sync()
10 async def run_sync(
11 func: Callable[..., T_Retval],
12 *args: object,
13 cancellable: bool = False,
14 limiter: Optional[CapacityLimiter] = None
15 ) -> T_Retval:
16 """
17 Call the given function with the given arguments in a worker thread.
18
(...)
29
30 """
---> 31 return await get_asynclib().run_sync_in_worker_thread(
32 func, *args, cancellable=cancellable, limiter=limiter
33 )
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:937, in run_sync_in_worker_thread()
935 context.run(sniffio.current_async_library_cvar.set, None)
936 worker.queue.put_nowait((context, func, args, future))
--> 937 return await future
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:867, in run()
865 exception: Optional[BaseException] = None
866 try:
--> 867 result = context.run(func, *args)
868 except BaseException as exc:
869 exception = exc
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/collections.py:318, in visit_collection()
316 elif typ in (dict, OrderedDict):
317 assert isinstance(expr, (dict, OrderedDict)) # typecheck assertion
--> 318 items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
319 result = typ(items) if return_data else None
321 elif is_dataclass(expr) and not isinstance(expr, type):
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/collections.py:318, in <listcomp>()
316 elif typ in (dict, OrderedDict):
317 assert isinstance(expr, (dict, OrderedDict)) # typecheck assertion
--> 318 items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
319 result = typ(items) if return_data else None
321 elif is_dataclass(expr) and not isinstance(expr, type):
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/collections.py:264, in visit_nested()
262 def visit_nested(expr):
263 # Utility for a recursive call, preserving options and updating the depth.
--> 264 return visit_collection(
265 expr,
266 visit_fn=visit_fn,
267 return_data=return_data,
268 remove_annotations=remove_annotations,
269 max_depth=max_depth - 1,
270 # Copy the context on nested calls so it does not "propagate up"
271 context=context.copy() if context is not None else None,
272 )
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/collections.py:281, in visit_collection()
278 return visit_fn(expr)
280 # Visit every expression
--> 281 result = visit_expression(expr)
283 if return_data:
284 # Only mutate the expression while returning data, otherwise it could be null
285 expr = result
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/collections.py:276, in visit_expression()
274 def visit_expression(expr):
275 if context is not None:
--> 276 return visit_fn(expr, context)
277 else:
278 return visit_fn(expr)
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/engine.py:1669, in resolve_input()
1664 raise UpstreamTaskError(
1665 f"Upstream task run '{state.state_details.task_run_id}' did not reach a 'COMPLETED' state."
1666 )
1668 # Only retrieve the result if requested as it may be expensive
-> 1669 return state.result(raise_on_failure=False, fetch=True) if return_data else None
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/client/schemas.py:107, in result()
35 """
36 Retrieve the result attached to this state.
37
(...)
103 hello
104 """
105 from prefect.states import get_state_result
--> 107 return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/states.py:74, in get_state_result()
72 return state.data
73 else:
---> 74 return _get_state_result(state, raise_on_failure=raise_on_failure)
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:226, in coroutine_wrapper()
222 return async_fn(*args, **kwargs)
223 elif in_async_worker_thread():
224 # In a sync context but we can access the event loop thread; send the async
225 # call to the parent
--> 226 return run_async_from_worker_thread(async_fn, *args, **kwargs)
227 else:
228 # In a sync context and there is no event loop; just create an event loop
229 # to run the async code then tear it down
230 return run_async_in_new_loop(async_fn, *args, **kwargs)
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:177, in run_async_from_worker_thread()
172 """
173 Runs an async function in the main thread's event loop, blocking the worker
174 thread until completion
175 """
176 call = partial(__fn, *args, **kwargs)
--> 177 return anyio.from_thread.run(call)
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/from_thread.py:49, in run()
46 except AttributeError:
47 raise RuntimeError("This function can only be run from an AnyIO worker thread")
---> 49 return asynclib.run_async_from_thread(func, *args)
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:970, in run_async_from_thread()
964 def run_async_from_thread(
965 func: Callable[..., Coroutine[Any, Any, T_Retval]], *args: object
966 ) -> T_Retval:
967 f: concurrent.futures.Future[T_Retval] = asyncio.run_coroutine_threadsafe(
968 func(*args), threadlocals.loop
969 )
--> 970 return f.result()
File ~/mambaforge/envs/dask/lib/python3.9/concurrent/futures/_base.py:446, in result()
444 raise CancelledError()
445 elif self._state == FINISHED:
--> 446 return self.__get_result()
447 else:
448 raise TimeoutError()
File ~/mambaforge/envs/dask/lib/python3.9/concurrent/futures/_base.py:391, in __get_result()
389 if self._exception:
390 try:
--> 391 raise self._exception
392 finally:
393 # Break a reference cycle with the exception in self._exception
394 self = None
File ~/mambaforge/envs/dask/lib/python3.9/site-packages/prefect/states.py:101, in _get_state_result()
99 return await get_state_exception(state)
100 else:
--> 101 raise MissingResult(
102 "State data is missing. "
103 "Typically, this occurs when result persistence is disabled and the "
104 "state has been retrieved from the API."
105 )
107 else:
108 # The result is attached directly
109 result = state.data
MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
@ahuang11, @madkinsz hello, Did you resolve this issue?
I have met the same issue :(
And it seems to be a little bit random, sometimes my flows are executed without this problem.
I have same iusse in RayTaskRunner. :(
For me, the situation has been consistently reproduced in the following situation:
@task(tags={'load'}, log_prints=True, retries=3, retry_delay_seconds=5,)
async def load_raw(url:str, auth:Auth|None=None, **kwds) -> bytes:
content = await _load_raw_inner(url, auth=auth, **kwds)
return content
_load_raw = load_raw.with_options(
name=f'load-raw-{PROV_NAME.lower()}', tags=load_raw.tags | {PROV_NAME},
cache_key_fn=task_input_hash, persist_result=True,
cache_expiration=_cache_expiration, retries=3, retry_delay_seconds=5,)
@flow
async def epool_rem_gr_active_group_csv():
cont = await _load_raw(URL, verify=False, timeout=15.)
...
Fixed it by moving the caching parameters to the task definition:
@task(
tags={'load'}, log_prints=True,
cache_key_fn=task_input_hash, persist_result=True, retries=3, retry_delay_seconds=5,)
async def load_raw(url:str, auth:Auth|None=None, **kwds) -> bytes:
content = await _load_raw_inner(url, auth=auth, **kwds)
return content
_load_raw = load_raw.with_options(
name=f'load-raw-{PROV_NAME.lower()}', tags=load_raw.tags | {PROV_NAME},
cache_expiration=_cache_expiration,)
@flow
async def epool_rem_gr_active_group_csv():
cont = await _load_raw(URL, verify=False, timeout=15.)
...
Fix do not work. :(
see also #8228
and #8415
Downgrate prefect to 2.16.9 The error has almost disappeared
This pattern of result persistence is updated and fixed in 3.0 - I'm going to close this but if there are other issues that arise, please open a new issue.