prefect icon indicating copy to clipboard operation
prefect copied to clipboard

`state.results` is empty in `terminal_state_handler` when running through an agent

Open Kazy opened this issue 4 years ago • 4 comments

Description

Following this discussion on Slack, I tried to implement a terminal_state_handler to output the results of some of the tasks to Slack. This worked great locally (i.e. though flow.run()). But when moving to the Cloud, I kept getting an empty dictionary in state.result of the handler.

I looked into Prefect's code and the reason is that the return_tasks argument in FlowRunner.run function is always None, which means that in get_flow_run_state return_states is empty. If I'm understanding things correctly, this first go through cli/execute.py where we don't give any additional argument to the run() function. In Flow._run though, which is called when I'm developing locally, we have return_tasks=self.tasks, giving the expected behavior.

Expected Behavior

One of two things:

  • Have access to state.result in terminal_state_handler. A lower requirements is to just have access to a mapping of tasks with their results.
  • Or have an other way to implement what I'm trying to do, that is an handler run at the end of a flow, with access to the tasks and their associated tasks, in order to send a summary message on Slack.

Reproduction

from typing import Set

import prefect
from prefect import Flow, task
from prefect.engine.state import State


@task
def foo() -> None:
    return None


@task
def it_fails() -> None:
    raise RuntimeError()


def terminal_state_handler(
    flow: Flow, state: State, reference_task_states: Set[State]
) -> None:
    logger = prefect.context.get("logger")
    logger.info(state.result.items())


with Flow("test-flow", terminal_state_handler=terminal_state_handler) as flow:
    foo()
    it_fails()

WORKING_PATH = True

if WORKING_PATH:
    # state.result won't be empty
    flow.run()
else:
    # Adapt to your own environment, and then run the flow after registering it
    flow.register(project_name="dev")

Environment

Kazy avatar May 24 '21 13:05 Kazy

I got an answer from Kevin Kho on Slack the moment I posted this: this seems to be as excepted, and one should go through Prefect's API to obtain the tasks state. I'll let you decide if this should be closed or not !

Kazy avatar May 24 '21 13:05 Kazy

I think we should clarify the documentation here or improve this state handler to meet expectations (or both).

zanieb avatar May 26 '21 05:05 zanieb

+1 @madkinsz I think we should do both. I was hoping to wait until your flow.run improvements were shipped to take another pass at both the state handler and the docs

zangell44 avatar May 27 '21 15:05 zangell44

Adding this issue here since it seems to be the same. A user asked on Slack why a list of task run states can be accessed in a state handler in a local flow run, but not in a backend-triggered flow run. Here is an example that shows the issue:

import prefect
from prefect import task, Flow
from prefect.engine.state import Finished


@task
def say_hi():
    print("Hi")


@task
def say_hello():
    print("Hello")


def flow_state_handler(obj, old_state, new_state):
    if isinstance(new_state, Finished):
        logger = prefect.context.get("logger")
        logger.info(new_state)
        logger.info(new_state.result)
        logger.info(new_state.result.items())


with Flow("print_states_flow_level", state_handlers=[flow_state_handler]) as flow:
    t1 = say_hi()
    t2 = say_hello()
    flow.set_reference_tasks([t1, t2])


if __name__ == "__main__":
    state = flow.run()

The output:

[2022-01-14 15:00:19+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'print_states_flow_level'
[2022-01-14 15:00:19+0100] INFO - prefect.TaskRunner | Task 'say_hi': Starting task run...
Hi
[2022-01-14 15:00:19+0100] INFO - prefect.TaskRunner | Task 'say_hi': Finished task run for task with final state: 'Success'
[2022-01-14 15:00:19+0100] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
Hello
[2022-01-14 15:00:19+0100] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
[2022-01-14 15:00:19+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2022-01-14 15:00:19+0100] INFO - prefect | <Success: "All reference tasks succeeded.">
[2022-01-14 15:00:19+0100] INFO - prefect | {<Task: say_hi>: <Success: "Task run succeeded.">, <Task: say_hello>: <Success: "Task run succeeded.">}
[2022-01-14 15:00:19+0100] INFO - prefect | dict_items([(<Task: say_hi>, <Success: "Task run succeeded.">), (<Task: say_hello>, <Success: "Task run succeeded.">)])

But running the same through Prefect Cloud results in an empty dictionary:

└── 15:00:02 | DEBUG   | Checking flow run state...
└── 15:00:02 | INFO    | Flow run SUCCESS: all reference tasks succeeded
└── 15:00:02 | DEBUG   | Flow 'print_states_flow_level': Handling state change from Running to Success
└── 15:00:02 | INFO    | <Success: "All reference tasks succeeded.">
└── 15:00:02 | INFO    | {}
└── 15:00:02 | INFO    | dict_items([])
└── 15:00:03 | DEBUG   | Exited flow run subprocess.
Flow run succeeded!

The same is true when attaching a terminal_state_handler instead of a normal flow-level state handler:

import prefect
from prefect import task, Flow


@task
def say_hi():
    print("Hi")


@task
def say_hello():
    print("Hello")


def flow_state_handler(flow, state, reference_task_states):
    logger = prefect.context.get("logger")
    logger.info(state)
    logger.info(state.result)
    logger.info(state.result.items())


with Flow("print_states_terminal", terminal_state_handler=flow_state_handler) as flow:
    t1 = say_hi()
    t2 = say_hello()


if __name__ == "__main__":
    state = flow.run()

Local:

[2022-01-14 15:03:37+0100] INFO - prefect | <Success: "All reference tasks succeeded.">
[2022-01-14 15:03:37+0100] INFO - prefect | {<Task: say_hi>: <Success: "Task run succeeded.">, <Task: say_hello>: <Success: "Task run succeeded.">}
[2022-01-14 15:03:37+0100] INFO - prefect | dict_items([(<Task: say_hi>, <Success: "Task run succeeded.">), (<Task: say_hello>, <Success: "Task run succeeded.">)])

Cloud:

...
└── 15:04:54 | DEBUG   | Checking flow run state...
└── 15:04:54 | INFO    | Flow run SUCCESS: all reference tasks succeeded
└── 15:04:54 | INFO    | <Success: "All reference tasks succeeded.">
└── 15:04:54 | INFO    | {}
└── 15:04:54 | INFO    | dict_items([])
└── 15:04:54 | DEBUG   | Flow 'print_states_terminal': Handling state change from Running to Success
└── 15:04:55 | DEBUG   | Exited flow run subprocess.
Flow run succeeded!

anna-geller avatar Jan 14 '22 14:01 anna-geller

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

github-actions[bot] avatar Jan 17 '23 10:01 github-actions[bot]

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

github-actions[bot] avatar Feb 25 '23 16:02 github-actions[bot]

This issue was closed because it has been stale for 14 days with no activity. If this issue is important or you have more to add feel free to re-open it.

github-actions[bot] avatar Mar 11 '23 17:03 github-actions[bot]