airflow icon indicating copy to clipboard operation
airflow copied to clipboard

feat: added `request_body` support in the `PowerBIDatasetRefreshOperator` (enables support for enhanced dataset refreshes)

Open ramonvermeulen opened this issue 7 months ago • 2 comments

Closes https://github.com/apache/airflow/issues/50529

TL;DR;

Original implementation was done in https://github.com/apache/airflow/pull/40356

  • Enables support for providing data with the PowerBIDatasetRefreshOperator, in the form of a request_body that should map 1:1 with the DatasetRefreshRequest spec.
  • Enables users to refresh PowerBI Datasets on a more granular level, such as specific tables or partitions within a dataset, and provide more arguments such as retryCount, timeout (on the PBI side), maxParallelism, or notifyOption.

Manual testing:

For manual testing I used the breeze set-up which is described in the contribution quick-start guide. Besides that I registered an Azure Application and used option 1 in the Airflow Microsoft Azure connection guide to configure a power_bi_default connection within the local Airflow instance. For PowerBI I made use of the 60 days trial, which allows you to have a pro-license for 60 days and makes it possible to test this functionality.

For manual testing I used the following sample DAG, and tweaked around with the original parameters group_id, dataset_id, and the newly added parameter request_body:
from airflow import DAG
from datetime import timedelta

from airflow.providers.microsoft.azure.operators.powerbi import PowerBIDatasetRefreshOperator
from airflow.providers.standard.operators.bash import BashOperator

default_args = {
    "retries": 0,
    "retry_delay": timedelta(minutes=1),
}

DATASET_ID = ""
GROUP_ID = ""
REQUEST_BODY = None

with DAG(
    dag_id="trigger_powerbi_dataset_refresh",
    default_args=default_args,
    schedule="*/1 0 * * *",
    catchup=False,
    tags=["power_bi"],
) as dag:
    sample_task = BashOperator(
        task_id="sample_task",
        bash_command="echo 'This is a sample task to demonstrate Power BI dataset refresh trigger.'",
    )

    refresh_powerbi_dataset = PowerBIDatasetRefreshOperator(
        conn_id="powerbi_default",
        task_id="refresh_powerbi_dataset",
        dataset_id=DATASET_ID,
        group_id=GROUP_ID,
        check_interval=30,
        timeout=120,
        request_body=REQUEST_BODY, # newly added argument
    )

    sample_task >> refresh_powerbi_dataset
✅ Test case 1 - invalid dataset_id and group_id should cause 404

input:

DATASET_ID = "foo"
GROUP_ID = "bar"
REQUEST_BODY = None

logs:

[2025-06-05, 20:30:26] ERROR - Trigger failed:
Traceback (most recent call last):
  File "/opt/airflow/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py", line 205, in trigger_dataset_refresh
    response = await self.run(
  File "/opt/airflow/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/msgraph.py", line 402, in run
    response = await self.send_request(
  File "/opt/airflow/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/msgraph.py", line 426, in send_request
    return await self.get_conn().send_no_response_content_async(
  File "/usr/local/lib/python3.9/site-packages/kiota_http/httpx_request_adapter.py", line 387, in send_no_response_content_async
    return await response_handler.handle_response_async(response, error_map)
  File "/opt/airflow/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/msgraph.py", line 93, in handle_response_async
    raise AirflowNotFoundException(message)
airflow.exceptions.AirflowNotFoundException: {'error': {'code': 'ItemNotFound', 'message': 'Dataset foo is not found! please verify datasetId is correct and user have sufficient permissions.'}}
image
✅ Test case 2 - correct dataset_id and group_id (empty request_body) should refresh dataset as a whole (so all objects within the dataset)

input:

DATASET_ID = "XXXXX" # masked but supposed to be a correct id
GROUP_ID = "XXXXX" # masked but supposed to be a correct id
REQUEST_BODY = None

logs:

[2025-06-05, 21:41:10] INFO - Pausing task as DEFERRED. : dag_id="trigger_powerbi_dataset_refresh": task_id="refresh_powerbi_dataset": run_id="manual__2025-06-05T19:41:05.248590+00:00": source="task"
[2025-06-05, 21:41:12] INFO - trigger trigger_powerbi_dataset_refresh/manual__2025-06-05T19:41:05.248590+00:00/refresh_powerbi_dataset/-1/1 (ID 7) starting
[2025-06-05, 21:41:12] INFO - Executing url 'myorg/groups/{group_id}/datasets/{dataset_id}/refreshes' as 'GET': source="airflow.task.hooks.airflow.providers.microsoft.azure.hooks.powerbi.PowerBIHook"
[2025-06-05, 21:41:12] INFO - ClientSecretCredential.get_token succeeded: source="azure.identity._internal.get_token_mixin"
[2025-06-05, 21:41:12] INFO - Executing url 'myorg/groups/{group_id}/datasets/{dataset_id}/refreshes' as 'GET': source="airflow.task.hooks.airflow.providers.microsoft.azure.hooks.powerbi.PowerBIHook"
[2025-06-05, 21:41:12] INFO - ClientSecretCredential.get_token succeeded: source="azure.identity._internal.get_token_mixin"
[2025-06-05, 21:41:12] INFO - Sleeping for 30. The dataset refresh status is In Progress.: source="airflow.providers.microsoft.azure.triggers.powerbi.PowerBITrigger"
[2025-06-05, 21:41:42] INFO - Executing url 'myorg/groups/{group_id}/datasets/{dataset_id}/refreshes' as 'GET': source="airflow.task.hooks.airflow.providers.microsoft.azure.hooks.powerbi.PowerBIHook"
[2025-06-05, 21:41:42] INFO - ClientSecretCredential.get_token succeeded: source="azure.identity._internal.get_token_mixin"
[2025-06-05, 21:41:42] INFO - Trigger fired event: name="trigger_powerbi_dataset_refresh/manual__2025-06-05T19:41:05.248590+00:00/refresh_powerbi_dataset/-1/1 (ID 7)": result="TriggerEvent<{'status': 'success', 'dataset_refresh_status': 'Completed', 'message': 'The dataset refresh f7ee659e-288e-44fc-ab3f-fd7279346a6f has Completed.', 'dataset_refresh_id': 'f7ee6[59](http://localhost:28080/dags/trigger_powerbi_dataset_refresh/runs/manual__2025-06-05T19:41:05.248590+00:00/tasks/refresh_powerbi_dataset?try_number=1#59)e-288e-44fc-ab3f-fd7279346a6f'}>"
[2025-06-05, 21:41:42] INFO - trigger completed: name="trigger_powerbi_dataset_refresh/manual__2025-06-05T19:41:05.248590+00:00/refresh_powerbi_dataset/-1/1 (ID 7)"
[2025-06-05, 21:41:43] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-05, 21:41:43] INFO - Filling up the DagBag from /files/dags/powerbi_refresh_example_dag.py: source="airflow.models.dagbag.DagBag"
image

example

image
✅ Test case 3 - correct dataset_id and group_id refreshing a specific table via the new request_body param

input:

DATASET_ID = "XXXXX" # masked but supposed to be a correct id
GROUP_ID = "XXXXX" # masked but supposed to be a correct id
REQUEST_BODY = {
    "objects": [
        {
            "table": "crime",
        },
        {
            "table": "waste_and_diversion",
        },
    ]
}

logs:

[2025-06-05, 22:05:17] INFO - Certificate data: False: source="airflow.task.hooks.airflow.providers.microsoft.azure.hooks.powerbi.PowerBIHook"
[2025-06-05, 22:05:17] INFO - Authority: None: source="airflow.task.hooks.airflow.providers.microsoft.azure.hooks.powerbi.PowerBIHook"
[2025-06-05, 22:05:17] INFO - Disable instance discovery: False: source="airflow.task.hooks.airflow.providers.microsoft.azure.hooks.powerbi.PowerBIHook"
[2025-06-05, 22:05:17] INFO - MSAL Proxies: {}: source="airflow.task.hooks.airflow.providers.microsoft.azure.hooks.powerbi.PowerBIHook"
[2025-06-05, 22:05:17] INFO - Pausing task as DEFERRED. : dag_id="trigger_powerbi_dataset_refresh": task_id="refresh_powerbi_dataset": run_id="manual__2025-06-05T20:05:12.[46](http://localhost:28080/dags/trigger_powerbi_dataset_refresh/runs/manual__2025-06-05T20:05:12.464732+00:00/tasks/refresh_powerbi_dataset?try_number=1#46)4732+00:00": source="task"
[2025-06-05, 22:05:19] INFO - trigger trigger_powerbi_dataset_refresh/manual__2025-06-05T20:05:12.464732+00:00/refresh_powerbi_dataset/-1/1 (ID 9) starting
[2025-06-05, 22:05:19] INFO - Executing url 'myorg/groups/{group_id}/datasets/{dataset_id}/refreshes' as 'GET': source="airflow.task.hooks.airflow.providers.microsoft.azure.hooks.powerbi.PowerBIHook"
[2025-06-05, 22:05:19] INFO - ClientSecretCredential.get_token succeeded: source="azure.identity._internal.get_token_mixin"
[2025-06-05, 22:05:19] INFO - Executing url 'myorg/groups/{group_id}/datasets/{dataset_id}/refreshes' as 'GET': source="airflow.task.hooks.airflow.providers.microsoft.azure.hooks.powerbi.PowerBIHook"
[2025-06-05, 22:05:19] INFO - ClientSecretCredential.get_token succeeded: source="azure.identity._internal.get_token_mixin"
[2025-06-05, 22:05:19] INFO - Sleeping for 30. The dataset refresh status is In Progress.: source="airflow.providers.microsoft.azure.triggers.powerbi.PowerBITrigger"
[2025-06-05, 22:05:49] INFO - Executing url 'myorg/groups/{group_id}/datasets/{dataset_id}/refreshes' as 'GET': source="airflow.task.hooks.airflow.providers.microsoft.azure.hooks.powerbi.PowerBIHook"
[2025-06-05, 22:05:49] INFO - ClientSecretCredential.get_token succeeded: source="azure.identity._internal.get_token_mixin"
[2025-06-05, 22:05:49] INFO - Trigger fired event: name="trigger_powerbi_dataset_refresh/manual__2025-06-05T20:05:12.46[47](http://localhost:28080/dags/trigger_powerbi_dataset_refresh/runs/manual__2025-06-05T20:05:12.464732+00:00/tasks/refresh_powerbi_dataset?try_number=1#47)32+00:00/refresh_powerbi_dataset/-1/1 (ID 9)": result="TriggerEvent<{'status': 'success', 'dataset_refresh_status': 'Completed', 'message': 'The dataset refresh 8071[48](http://localhost:28080/dags/trigger_powerbi_dataset_refresh/runs/manual__2025-06-05T20:05:12.464732+00:00/tasks/refresh_powerbi_dataset?try_number=1#48)99-a0a3-4eae-a530-99dbb7217051 has Completed.', 'dataset_refresh_id': '80714899-a0a3-4eae-a530-99dbb7217051'}>"
[2025-06-05, 22:05:[49](http://localhost:28080/dags/trigger_powerbi_dataset_refresh/runs/manual__2025-06-05T20:05:12.464732+00:00/tasks/refresh_powerbi_dataset?try_number=1#49)] INFO - trigger completed: name="trigger_powerbi_dataset_refresh/manual__2025-06-05T20:05:12.464732+00:00/refresh_powerbi_dataset/-1/1 (ID 9)"
[2025-06-05, 22:05:51] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-05, 22:05:51] INFO - Filling up the DagBag from /files/dags/powerbi_refresh_example_dag.py: source="airflow.models.dagbag.DagBag"
image

(Shows specifically via enhanced API for this request with a body, targeting specific objects): example2

I really tried looking for a place in the PowerBI service app portal where I can show that specifically crime and waste_and_diversion are refreshed, but I couldn't find this anywhere.

✅ Test case 4 - correct dataset_id and group_id with a request_body containing tables that do not exist

input

DATASET_ID = "XXXXX" # masked but supposed to be a correct id
GROUP_ID = "XXXXX" # masked but supposed to be a correct id
REQUEST_BODY = {
    "objects": [
        {
            "table": "xxx",
        },
        {
            "table": "aaa",
        },
    ]
}
image

logs

[2025-06-05, 22:15:57] ERROR - Task failed with exception: source="task"
AirflowException: The dataset refresh d119acd0-58ed-4690-ad06-3a361ab6a393 has Failed. Error: {"errorCode":"ModelRefresh_ShortMessage_ProcessingError","errorDescription":"The specified table 'xxx' not found in the target model."}
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 888 in run
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1180 in _execute_task
File "/opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py", line 1608 in resume_execution
File "/opt/airflow/providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py", line 177 in execute_complete

I also did a test putting entries in the request body that are not supported by the API reference, it seems like the API just ignores these entries (does not throw a 400 bad request for example).

Could improve the PR by adding a pydantic / typeddict / dataclass for the request body, to improve the UX. On the other hand it will also increase the coupling (e.g. requires "client side" changes when the upstream API changes it's spec, for example due to added fields), want to keep this decision up to a reviewer with more provider experience.


^ Add meaningful description above Read the Pull Request Guidelines for more information. In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

ramonvermeulen avatar Jun 04 '25 11:06 ramonvermeulen

Some tests are failing

vatsrahul1001 avatar Jun 11 '25 11:06 vatsrahul1001

Some tests are failing

@vatsrahul1001

I have the feeling it is not related to my changes, because they are failing since I rebased this morning. Logs also do not really show failures on unit/system tests, but on a docker image that cannot be found instead:

Executable permissions on entrypoints are OK
The image /mnt/ci-image-save-v3-linux_amd64-3.9.tar does not exist.
Error: Process completed with exit code 1.

Just did another rebase, maybe it will solve the issue.

ramonvermeulen avatar Jun 11 '25 13:06 ramonvermeulen