airflow
airflow copied to clipboard
Implemented MSGraphSensor as a deferrable sensor
Implemented a default response handler which suppresses JSONDecodeError when response is empty or isn't of type json but the default response type json was specified, otherwise will return the contents and if that's empty the response headers. This can for example occur when triggering PowerBI dataset refreshes. Also added a test in TestResponseHandler and added example on how to use the operator to refresh a PowerBI dataset. This PR also removed the response_handler param as lambda expressions can't be serialized and triggers always get serialized to the database when deferred before getting executed. The most import change in thi PR is that the MSGraphSensor now also uses the deferable task mechanism like the MSGraphAsyncOperator, so it doesn't rely on the classic blocking poke method but defers the tasks in a 2 step. First it will execute the call in a deferred way and check the event, if the condition is met the sensor will stop. If not then the sensor will first defer a TimeDeltaTrigger with the specified retry_delay (default is 60 for MSGraphSensor ) and once the task completed it will redefer the poll task, that way the sensor doesn't block the workers.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst
or {issue_number}.significant.rst
, in newsfragments.
FYI: I've seen accross different PR that following test is sometimes randomly failing:
=================================== FAILURES ===================================
_______ TestCeleryExecutor.test_celery_integration[redis://redis:6379/0] _______
self = <tests.integration.executors.test_celery_executor.TestCeleryExecutor object at 0x7f26999c6be0>
broker_url = 'redis://redis:6379/0'
@pytest.mark.flaky(reruns=3)
@pytest.mark.parametrize("broker_url", _prepare_test_bodies())
def test_celery_integration(self, broker_url):
from airflow.providers.celery.executors import celery_executor, celery_executor_utils
success_command = ["airflow", "tasks", "run", "true", "some_parameter"]
fail_command = ["airflow", "version"]
def fake_execute_command(command):
if command != success_command:
raise AirflowException("fail")
with _prepare_app(broker_url, execute=fake_execute_command) as app:
executor = celery_executor.CeleryExecutor()
assert executor.tasks == {}
executor.start()
with start_worker(app=app, logfile=sys.stdout, loglevel="info"):
execute_date = datetime.now()
task_tuples_to_send = [
(
("success", "fake_simple_ti", execute_date, 0),
success_command,
celery_executor_utils.celery_configuration["task_default_queue"],
celery_executor_utils.execute_command,
),
(
("fail", "fake_simple_ti", execute_date, 0),
fail_command,
celery_executor_utils.celery_configuration["task_default_queue"],
celery_executor_utils.execute_command,
),
]
# "Enqueue" them. We don't have a real SimpleTaskInstance, so directly edit the dict
for key, command, queue, _ in task_tuples_to_send:
executor.queued_tasks[key] = (command, 1, queue, None)
executor.task_publish_retries[key] = 1
executor._process_tasks(task_tuples_to_send)
> assert list(executor.tasks.keys()) == [
("success", "fake_simple_ti", execute_date, 0),
("fail", "fake_simple_ti", execute_date, 0),
]
E AssertionError: assert equals failed
E [ [
E (
E 'success',
E 'fake_simple_ti',
E datetime.datetime(2024, 4,
E 29, 13, 8, 45, 559826),
E 0,
E ),
E ( (
E 'fail', 'fail',
E 'fake_simple_ti', 'fake_simple_ti',
E datetime.datetime(2024, 4, datetime.datetime(2024, 4,
E 29, 13, 8, 45, 559826), 29, 13, 8, 45, 559826),
E 0, 0,
E ), ),
E ] ]
tests/integration/executors/test_celery_executor.py:149: AssertionError