Redacting the sensitive env variables in env_vars for KPO
env_vars in the KPO can have secret values sometimes and this will be blindly rendered by the rendered template.
To fix this, I am passing the env_vars through the redact filter in secrets masker so that it is hidden from the rendered template as well as from the task logs.
The example DAG i tried and the results are here:
from pendulum import datetime, duration
from airflow import DAG
from airflow.configuration import conf
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
from kubernetes.client import V1EnvVar
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2022, 1, 1),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": duration(minutes=5),
}
with DAG(
dag_id="example_kubernetes_pod", schedule="@once", default_args=default_args
) as dag:
t1 = KubernetesPodOperator(
namespace="default",
image="hello-world",
name="airflow-test-pod",
task_id="task-1",
config_file="~/Downloads/ECS\ Kubeconfigs/shared-rke-01",
get_logs=True,
env_vars=[
V1EnvVar(
name="password",
value="mypassword",
),
V1EnvVar(
name="normal",
value="normalstuff",
),
V1EnvVar(
name="secret",
value="ok",
),
V1EnvVar(
name="thisisok",
value="password",
),
],
)
t2 = KubernetesPodOperator(
namespace="default",
image="hello-world",
name="airflow-test-pod",
task_id="task-2",
config_file="~/Downloads/ECS\ Kubeconfigs/shared-rke-01",
get_logs=True,
env_vars= {
"password": "password1",
"good": "thisisgood",
"secret": "normal_stuff",
"normal": "secret"
}
)
[t1, t2]
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.
It reminds me about https://github.com/apache/airflow/issues/28086 cc @RoyNoymanW
@hussein-awala yea some test coverage would be nice. Just wanted to understand how the portion of interacting with a DB can be tested here. I was trying something along these lines:
@pytest.mark.parametrize(
"input",
[
[
V1EnvVar(
name="password",
value="mypassword",
),
V1EnvVar(
name="normal",
value="normalstuff",
),
V1EnvVar(
name="secret",
value="ok",
),
V1EnvVar(
name="thisisok",
value="password",
),
],
{
"password": "password1",
"good": "thisisgood",
"secret": "normal_stuff",
"normal": "secret"
}
],
)
def test_env_vars_redaction(self, input):
k = KubernetesPodOperator(
task_id="task-1",
env_vars=input,
)
ctx = create_context(k, persist_to_db=True)
pod = k.build_pod_request_obj(ctx)
print(pod)
But I constantly get this:
self = <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite object at 0x115594550>
cursor = <sqlite3.Cursor object at 0x12545f030>
statement = 'INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_parsed_time, last_pickled, last_expired, ... next_dagrun_create_after) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'
parameters = ('dag', None, 0, 0, 0, None, ...)
context = <sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext object at 0x1253d6d90>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
E sqlite3.OperationalError: table dag has no column named dag_display_name
The pod object also contains the un protected env_vars, so we need a DB interaction for sure. Any suggestions?
@hussein-awala @potiuk @eladkal I need some help in adding the unit tests here. I tried, but looks like I am unable to store the results to a DB and check it. Also, static check issues.
@hussein-awala @potiuk @eladkal I need some help in adding the unit tests here. I tried, but looks like I am unable to store the results to a DB and check it. Also, static check issues.
I think your problems were with not recreated DB -> the error indicates you need to reset db with latest changes - this column has been added recently.
@ashb @potiuk @eladkal Need some help with fixing the unit tests here on this change also want to ask what kind of testing needs to be performed here to mark this task complete.
So I am trying to write a UT here which calls KPO init and then checks the DB for the RenderedTaskInstanceFields table and validates the redaction. Didn't get too far with it, so far I have this:
@provide_session
@pytest.mark.parametrize(
"input",
[
[
V1EnvVar(
name="password",
value="mypassword",
),
V1EnvVar(
name="normal",
value="normalstuff",
),
V1EnvVar(
name="secret",
value="ok",
),
V1EnvVar(
name="thisisok",
value="password",
),
],
{
"password": "password1",
"good": "thisisgood",
"secret": "normal_stuff",
"normal": "secret"
}
],
)
def test_env_vars_redaction(self, dag_maker, input, session=None):
from airflow.models.renderedtifields import RenderedTaskInstanceFields
# k = KubernetesPodOperator(
# task_id="task-1",
# env_vars=input,
# )
#
# ctx = create_context(k, persist_to_db=True)
#
# pod = k.build_pod_request_obj(ctx)
# session = Session()
# all_rows = session.query(RenderedTaskInstanceFields).all()
#
# print(all_rows)
with dag_maker("test-dag", session=session) as dag:
task = KubernetesPodOperator(
task_id="task-1",
env_vars=input,
)
ti = dag_maker.create_dagrun().task_instances[0]
ti.task = task
session.add(RenderedTaskInstanceFields(ti))
session.flush()
y = ti.get_rendered_template_fields(session=session)
print(y)
Basically i want to validate this:
{"cmds": [], "image": "hello-world", "labels": {}, "volumes": [], "env_from": [], "env_vars": [{"name": "password", "value": "***", "value_from": null}, {"name": "normal", "value": "normalstuff", "value_from": null}, {"name": "secret", "value": "***", "value_from": null}, {"name": "thisisok", "value": "password", "value_from": null}], "arguments": [], "namespace": "default", "annotations": {}, "config_file": "~/Downloads/ECS\\ Kubeconfigs/shared-rke-01", "volume_mounts": [], "cluster_context": null, "pod_template_dict": null, "pod_template_file": null, "container_resources": null}
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.