astronomer-cosmos icon indicating copy to clipboard operation
astronomer-cosmos copied to clipboard

AthenaAccessKeyProfileMapping does not work as expected locally

Open tuan-h opened this issue 9 months ago • 4 comments

When defining an AWS connection like so in airflow_settings.yaml:

airflow:
  connections:
    - conn_id: aws_default
      conn_type: aws
      conn_host:
      conn_schema:
      conn_login: {aws_access_key_id}
      conn_password: {aws_secret_access_key} 
      conn_port:
      conn_extra:

Running a dag with AthenaAccessKeyProfileMapping returns this error:

botocore.exceptions.ClientError: An error occurred (InvalidClientTokenId) when calling the GetCallerIdentity operation: The security token included in the request is invalid

And that's because AwsGenericHook returns frozen_credentials as defined above, i.e. no session token, when calling get_credentials in AthenaAccessKeyProfileMapping.

To make the dag run successfully, I have to use AWS CLI aws sts get-session-token and then define the connection like this:

airflow:
  connections:
    - conn_id: aws_default
      conn_type: aws
      conn_host:
      conn_schema:
      conn_login: {session_access_key_id}
      conn_password: {session_secret_access_key} 
      conn_port:
      conn_extra:
        aws_session_token: {session_token)

However, this is annoying to do every time a session expires. To fix this issue, I subclassed AthenaAccessKeyProfileMapping to make the call to sts in code:

from botocore.credentials import ReadOnlyCredentials
from cosmos.profiles import (
    AthenaAccessKeyProfileMapping as BaseAthenaAccessKeyProfileMapping,
)


class AthenaAccessKeyProfileMapping(BaseAthenaAccessKeyProfileMapping):
    def _get_temporary_credentials(self):  # type: ignore
        """
        Helper function to retrieve temporary short lived credentials
        Returns an object including access_key, secret_key and token
        """
        # use StsHook to get session
        from airflow.providers.amazon.aws.hooks.sts import StsHook

        hook = StsHook(self.conn_id)  # type: ignore
        duration = self.profile_args.get("duration_seconds", 3600)
        sts_response = hook.conn.get_session_token(DurationSeconds=duration)
        credentials = ReadOnlyCredentials(
            sts_response["Credentials"]["AccessKeyId"],
            sts_response["Credentials"]["SecretAccessKey"],
            sts_response["Credentials"]["SessionToken"],
        )
        return credentials

I'm not sure if this is the best solution. Additionally, I don't think this will work with a deployed Airflow that runs with an IAM role that already has a session. That is, it will fail to call get_session_token due to session limitation:

The temporary security credentials created by GetSessionToken can be used to make API calls to any Amazon Web Services service with the following exceptions: You cannot call any STS API except AssumeRole or GetCallerIdentity

tuan-h avatar May 24 '24 09:05 tuan-h