ai-flow
ai-flow copied to clipboard
action_on_event_received only process events sent by current workflow execution
Describe the feature
Currently, the event is broadcasting, once the wanted event is sent, all workflow executions of this workflow would be triggered.
Describe the solution you'd like
As the scheduler dispatcher would inspect the context of each event to figure out if it contains the workflow execution id, we can inject the runtime context of each task execution to the user-defined event to make sure the event will only effect on the current workflow execution.
We need the following changes:
- Add a global variable
_CURRENT_TASK_CONTEXT
incontext.py
, it is used to store the runtime context of each task execution. - To read and write the
_CURRENT_TASK_CONTEXT
, addget_runtime_task_context
andset_runtime_task_context
functions. - Add a public API called
wrap_execution_context
to inject the runtime info to the context of the event before sending it.
_CURRENT_TASK_CONTEXT: TaskExecutionContext = None
def set_runtime_task_context(context: TaskExecutionContext):
global _CURRENT_TASK_CONTEXT
_CURRENT_TASK_CONTEXT = context
def get_runtime_task_context():
return _CURRENT_TASK_CONTEXT
def wrap_execution_info_to_context(event: Event):
"""
The event whose context is wrapped with workflow execution info would only be processed by specific workflow execution.
"""
pass
How to use it?
def func():
notification_client = get_notification_client()
event = Event(event_key=EVENT_KEY, message='This is a custom message.')
// wrap event with context
wrap_execution_info_to_context(event)
// send event
notification_client.send_event(event)
with Workflow(name='workflow') as w1:
task = PythonOperator(name='task', python_callable=func)