prefect icon indicating copy to clipboard operation
prefect copied to clipboard

DBT integration not working with GCP application default credentials using SA impersonation

Open jmesterh opened this issue 9 months ago • 0 comments

First check

  • [X] I added a descriptive title to this issue.
  • [X] I used the GitHub search to find a similar issue and didn't find it.
  • [X] I searched the Prefect documentation for this issue.
  • [X] I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

The DBT integration works with standard gcloud application default credentials created with

gcloud auth application-default login

but not credentials created with

gcloud auth application-default login --impersonate-service-account [SERVICE_ACCT_EMAIL]

which return the error:

File "/venv/lib/python3.11/site-packages/google/auth/impersonated_credentials.py", line 100, in _make_iam_token_request
    raise exceptions.RefreshError(_REFRESH_ERROR, response_body)
google.auth.exceptions.RefreshError: ('Unable to acquire impersonated credentials', '
{
  "error": {
      "code": 400,
      "message": "Request contains an invalid argument.", "status": "INVALID_ARGUMENT"
   }
}
)

Reproduction

from prefect_dbt.cli.commands import run_dbt_model
from prefect_dbt.cli.configs import BigQueryTargetConfigs
from prefect_gcp import GcpCredentials
from prefect_dbt import DbtCliProfile

gcp_credentials = GcpCredentials()
target_configs = BigQueryTargetConfigs(
        # also tried below, no effect
	#extras={
	#    "impersonate_service_account": "[email protected]",
	#},

	gcp_credentials=gcp_credentials,
	project="my-project",
	schema="testing",
)
dbt_cli_profile = DbtCliProfile(
	name="dbt",
	target="dev",
	target_configs=target_configs,
)
return run_dbt_model(
	dbt_cli_profile=dbt_cli_profile,
	overwrite_profiles=True,
	project_dir="dbt",
	profiles_dir="dbt",
	create_summary_artifact=True,
)

Error

Traceback (most recent call last):
  File "/venv/bin/starr-omop", line 6, in <module>
    sys.exit(main())
             ^^^^^^
  File "/venv/lib/python3.11/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
         ^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workspaces/starr-data/src/starr_data/prefect/flows/omop.py", line 89, in main
    asyncio.run(omop_dev())
                ^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/flows.py", line 1229, in __call__
    return enter_flow_run_engine_from_flow_call(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/engine.py", line 293, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/engine.py", line 396, in create_then_begin_flow_run
    return await state.result(fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/venv/lib/python3.11/site-packages/prefect/engine.py", line 877, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workspaces/starr-data/src/starr_data/prefect/flows/omop.py", line 75, in omop_dev
    return run_dbt_model(
           ^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/tasks.py", line 689, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/engine.py", line 1421, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 218, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/engine.py", line 1555, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/venv/lib/python3.11/site-packages/prefect/engine.py", line 2103, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect_dbt/cli/commands.py", line 527, in run_dbt_model
    results = await trigger_dbt_cli_command.fn(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect_dbt/cli/commands.py", line 147, in trigger_dbt_cli_command
    profile = dbt_cli_profile.get_profile()
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect_dbt/cli/credentials.py", line 157, in get_profile
    "outputs": {self.target: self.target_configs.get_configs()},
                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/prefect_dbt/cli/configs/bigquery.py", line 142, in get_configs
    google_credentials.refresh(request)
  File "/venv/lib/python3.11/site-packages/google/auth/impersonated_credentials.py", line 250, in refresh
    self._update_token(request)
  File "/venv/lib/python3.11/site-packages/google/auth/impersonated_credentials.py", line 282, in _update_token
    self.token, self.expiry = _make_iam_token_request(
                              ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.11/site-packages/google/auth/impersonated_credentials.py", line 100, in _make_iam_token_request
    raise exceptions.RefreshError(_REFRESH_ERROR, response_body)
google.auth.exceptions.RefreshError: ('Unable to acquire impersonated credentials', '{\n  "error": {\n    "code": 400,\n    "message": "Request contains an invalid argument.",\n    "status": "INVALID_ARGUMENT"\n  }\n}\n')

Versions

2.19.1

Additional context

We use application-default-credentials with default impersonation so our developer containers access GCP using the same service account without additional configuration. This works fine for all of our other Prefect workflows which retrieve credentials with gcp_credentials = GcpCredentials(), just not the DBT integration.

jmesterh avatar May 21 '24 13:05 jmesterh