astronomer-cosmos
astronomer-cosmos copied to clipboard
AthenaAccessKeyProfileMapping does not work as expected locally
When defining an AWS connection like so in airflow_settings.yaml
:
airflow:
connections:
- conn_id: aws_default
conn_type: aws
conn_host:
conn_schema:
conn_login: {aws_access_key_id}
conn_password: {aws_secret_access_key}
conn_port:
conn_extra:
Running a dag with AthenaAccessKeyProfileMapping
returns this error:
botocore.exceptions.ClientError: An error occurred (InvalidClientTokenId) when calling the GetCallerIdentity operation: The security token included in the request is invalid
And that's because AwsGenericHook
returns frozen_credentials
as defined above, i.e. no session token, when calling get_credentials
in AthenaAccessKeyProfileMapping
.
To make the dag run successfully, I have to use AWS CLI aws sts get-session-token
and then define the connection like this:
airflow:
connections:
- conn_id: aws_default
conn_type: aws
conn_host:
conn_schema:
conn_login: {session_access_key_id}
conn_password: {session_secret_access_key}
conn_port:
conn_extra:
aws_session_token: {session_token)
However, this is annoying to do every time a session expires. To fix this issue, I subclassed AthenaAccessKeyProfileMapping
to make the call to sts
in code:
from botocore.credentials import ReadOnlyCredentials
from cosmos.profiles import (
AthenaAccessKeyProfileMapping as BaseAthenaAccessKeyProfileMapping,
)
class AthenaAccessKeyProfileMapping(BaseAthenaAccessKeyProfileMapping):
def _get_temporary_credentials(self): # type: ignore
"""
Helper function to retrieve temporary short lived credentials
Returns an object including access_key, secret_key and token
"""
# use StsHook to get session
from airflow.providers.amazon.aws.hooks.sts import StsHook
hook = StsHook(self.conn_id) # type: ignore
duration = self.profile_args.get("duration_seconds", 3600)
sts_response = hook.conn.get_session_token(DurationSeconds=duration)
credentials = ReadOnlyCredentials(
sts_response["Credentials"]["AccessKeyId"],
sts_response["Credentials"]["SecretAccessKey"],
sts_response["Credentials"]["SessionToken"],
)
return credentials
I'm not sure if this is the best solution. Additionally, I don't think this will work with a deployed Airflow that runs with an IAM role that already has a session. That is, it will fail to call get_session_token
due to session limitation:
The temporary security credentials created by GetSessionToken can be used to make API calls to any Amazon Web Services service with the following exceptions: You cannot call any STS API except AssumeRole or GetCallerIdentity