prefect
prefect copied to clipboard
DBT integration not working with GCP application default credentials using SA impersonation
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar issue and didn't find it.
- [X] I searched the Prefect documentation for this issue.
- [X] I checked that this issue is related to Prefect and not one of its dependencies.
Bug summary
The DBT integration works with standard gcloud application default credentials created with
gcloud auth application-default login
but not credentials created with
gcloud auth application-default login --impersonate-service-account [SERVICE_ACCT_EMAIL]
which return the error:
File "/venv/lib/python3.11/site-packages/google/auth/impersonated_credentials.py", line 100, in _make_iam_token_request
raise exceptions.RefreshError(_REFRESH_ERROR, response_body)
google.auth.exceptions.RefreshError: ('Unable to acquire impersonated credentials', '
{
"error": {
"code": 400,
"message": "Request contains an invalid argument.", "status": "INVALID_ARGUMENT"
}
}
)
Reproduction
from prefect_dbt.cli.commands import run_dbt_model
from prefect_dbt.cli.configs import BigQueryTargetConfigs
from prefect_gcp import GcpCredentials
from prefect_dbt import DbtCliProfile
gcp_credentials = GcpCredentials()
target_configs = BigQueryTargetConfigs(
# also tried below, no effect
#extras={
# "impersonate_service_account": "[email protected]",
#},
gcp_credentials=gcp_credentials,
project="my-project",
schema="testing",
)
dbt_cli_profile = DbtCliProfile(
name="dbt",
target="dev",
target_configs=target_configs,
)
return run_dbt_model(
dbt_cli_profile=dbt_cli_profile,
overwrite_profiles=True,
project_dir="dbt",
profiles_dir="dbt",
create_summary_artifact=True,
)
Error
Traceback (most recent call last):
File "/venv/bin/starr-omop", line 6, in <module>
sys.exit(main())
^^^^^^
File "/venv/lib/python3.11/site-packages/click/core.py", line 1157, in __call__
return self.main(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/click/core.py", line 1078, in main
rv = self.invoke(ctx)
^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/click/core.py", line 1434, in invoke
return ctx.invoke(self.callback, **ctx.params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/click/core.py", line 783, in invoke
return __callback(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/workspaces/starr-data/src/starr_data/prefect/flows/omop.py", line 89, in main
asyncio.run(omop_dev())
^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/flows.py", line 1229, in __call__
return enter_flow_run_engine_from_flow_call(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/engine.py", line 293, in enter_flow_run_engine_from_flow_call
retval = from_sync.wait_for_call_in_loop_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
result = await coro
^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/engine.py", line 396, in create_then_begin_flow_run
return await state.result(fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/venv/lib/python3.11/site-packages/prefect/engine.py", line 877, in orchestrate_flow_run
result = await flow_call.aresult()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/workspaces/starr-data/src/starr_data/prefect/flows/omop.py", line 75, in omop_dev
return run_dbt_model(
^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/tasks.py", line 689, in __call__
return enter_task_run_engine(
^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/engine.py", line 1421, in enter_task_run_engine
return from_sync.wait_for_call_in_loop_thread(begin_run)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
result = await coro
^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/engine.py", line 1555, in get_task_call_return_value
return await future._result()
^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/venv/lib/python3.11/site-packages/prefect/engine.py", line 2103, in orchestrate_task_run
result = await call.aresult()
^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
result = await coro
^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect_dbt/cli/commands.py", line 527, in run_dbt_model
results = await trigger_dbt_cli_command.fn(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect_dbt/cli/commands.py", line 147, in trigger_dbt_cli_command
profile = dbt_cli_profile.get_profile()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect_dbt/cli/credentials.py", line 157, in get_profile
"outputs": {self.target: self.target_configs.get_configs()},
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/prefect_dbt/cli/configs/bigquery.py", line 142, in get_configs
google_credentials.refresh(request)
File "/venv/lib/python3.11/site-packages/google/auth/impersonated_credentials.py", line 250, in refresh
self._update_token(request)
File "/venv/lib/python3.11/site-packages/google/auth/impersonated_credentials.py", line 282, in _update_token
self.token, self.expiry = _make_iam_token_request(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/venv/lib/python3.11/site-packages/google/auth/impersonated_credentials.py", line 100, in _make_iam_token_request
raise exceptions.RefreshError(_REFRESH_ERROR, response_body)
google.auth.exceptions.RefreshError: ('Unable to acquire impersonated credentials', '{\n "error": {\n "code": 400,\n "message": "Request contains an invalid argument.",\n "status": "INVALID_ARGUMENT"\n }\n}\n')
Versions
2.19.1
Additional context
We use application-default-credentials with default impersonation so our developer containers access GCP using the same service account without additional configuration. This works fine for all of our other Prefect workflows which retrieve credentials with gcp_credentials = GcpCredentials()
, just not the DBT integration.