airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Redacting the sensitive env variables in env_vars for KPO

Open amoghrajesh opened this issue 1 year ago • 5 comments

env_vars in the KPO can have secret values sometimes and this will be blindly rendered by the rendered template.

To fix this, I am passing the env_vars through the redact filter in secrets masker so that it is hidden from the rendered template as well as from the task logs.

The example DAG i tried and the results are here:

from pendulum import datetime, duration
from airflow import DAG
from airflow.configuration import conf
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import V1EnvVar


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2022, 1, 1),
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": duration(minutes=5),
}
with DAG(
    dag_id="example_kubernetes_pod", schedule="@once", default_args=default_args
) as dag:
    t1 = KubernetesPodOperator(
        namespace="default",
        image="hello-world",
        name="airflow-test-pod",
        task_id="task-1",
        config_file="~/Downloads/ECS\ Kubeconfigs/shared-rke-01",
        get_logs=True,
        env_vars=[
            V1EnvVar(
                name="password",
                value="mypassword",
            ),
            V1EnvVar(
                name="normal",
                value="normalstuff",
            ),
            V1EnvVar(
                name="secret",
                value="ok",
            ),
            V1EnvVar(
                name="thisisok",
                value="password",
            ),
        ],
    )

    t2 = KubernetesPodOperator(
        namespace="default",
        image="hello-world",
        name="airflow-test-pod",
        task_id="task-2",
        config_file="~/Downloads/ECS\ Kubeconfigs/shared-rke-01",
        get_logs=True,
        env_vars= {
            "password": "password1",
            "good": "thisisgood",
            "secret": "normal_stuff",
            "normal": "secret"
        }
    )

    [t1, t2]

image

image


^ Add meaningful description above Read the Pull Request Guidelines for more information. In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

amoghrajesh avatar Apr 14 '24 06:04 amoghrajesh

It reminds me about https://github.com/apache/airflow/issues/28086 cc @RoyNoymanW

eladkal avatar Apr 14 '24 08:04 eladkal

@hussein-awala yea some test coverage would be nice. Just wanted to understand how the portion of interacting with a DB can be tested here. I was trying something along these lines:

    @pytest.mark.parametrize(
        "input",
        [
            [
                V1EnvVar(
                    name="password",
                    value="mypassword",
                ),
                V1EnvVar(
                    name="normal",
                    value="normalstuff",
                ),
                V1EnvVar(
                    name="secret",
                    value="ok",
                ),
                V1EnvVar(
                    name="thisisok",
                    value="password",
                ),
            ],
            {
                "password": "password1",
                "good": "thisisgood",
                "secret": "normal_stuff",
                "normal": "secret"
            }
        ],
    )
    def test_env_vars_redaction(self, input):
        k = KubernetesPodOperator(
            task_id="task-1",
            env_vars=input,
        )

        ctx = create_context(k, persist_to_db=True)
        pod = k.build_pod_request_obj(ctx)
        print(pod)

But I constantly get this:

self = <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite object at 0x115594550>
cursor = <sqlite3.Cursor object at 0x12545f030>
statement = 'INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_parsed_time, last_pickled, last_expired, ... next_dagrun_create_after) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'
parameters = ('dag', None, 0, 0, 0, None, ...)
context = <sqlalchemy.dialects.sqlite.base.SQLiteExecutionContext object at 0x1253d6d90>

    def do_execute(self, cursor, statement, parameters, context=None):
>       cursor.execute(statement, parameters)
E       sqlite3.OperationalError: table dag has no column named dag_display_name

The pod object also contains the un protected env_vars, so we need a DB interaction for sure. Any suggestions?

amoghrajesh avatar Apr 14 '24 09:04 amoghrajesh

@hussein-awala @potiuk @eladkal I need some help in adding the unit tests here. I tried, but looks like I am unable to store the results to a DB and check it. Also, static check issues.

amoghrajesh avatar Apr 16 '24 04:04 amoghrajesh

@hussein-awala @potiuk @eladkal I need some help in adding the unit tests here. I tried, but looks like I am unable to store the results to a DB and check it. Also, static check issues.

I think your problems were with not recreated DB -> the error indicates you need to reset db with latest changes - this column has been added recently.

potiuk avatar Apr 18 '24 14:04 potiuk

@ashb @potiuk @eladkal Need some help with fixing the unit tests here on this change also want to ask what kind of testing needs to be performed here to mark this task complete.

So I am trying to write a UT here which calls KPO init and then checks the DB for the RenderedTaskInstanceFields table and validates the redaction. Didn't get too far with it, so far I have this:

    @provide_session
    @pytest.mark.parametrize(
        "input",
        [
            [
                V1EnvVar(
                    name="password",
                    value="mypassword",
                ),
                V1EnvVar(
                    name="normal",
                    value="normalstuff",
                ),
                V1EnvVar(
                    name="secret",
                    value="ok",
                ),
                V1EnvVar(
                    name="thisisok",
                    value="password",
                ),
            ],
            {
                "password": "password1",
                "good": "thisisgood",
                "secret": "normal_stuff",
                "normal": "secret"
            }
        ],
    )
    def test_env_vars_redaction(self, dag_maker, input, session=None):
        from airflow.models.renderedtifields import RenderedTaskInstanceFields
        # k = KubernetesPodOperator(
        #     task_id="task-1",
        #     env_vars=input,
        # )
        #
        # ctx = create_context(k, persist_to_db=True)
        #
        # pod = k.build_pod_request_obj(ctx)

        # session = Session()
        # all_rows = session.query(RenderedTaskInstanceFields).all()
        #
        # print(all_rows)

        with dag_maker("test-dag", session=session) as dag:
            task = KubernetesPodOperator(
                task_id="task-1",
                env_vars=input,
            )
        ti = dag_maker.create_dagrun().task_instances[0]
        ti.task = task

        session.add(RenderedTaskInstanceFields(ti))
        session.flush()

        y = ti.get_rendered_template_fields(session=session)
        print(y)

Basically i want to validate this:

{"cmds": [], "image": "hello-world", "labels": {}, "volumes": [], "env_from": [], "env_vars": [{"name": "password", "value": "***", "value_from": null}, {"name": "normal", "value": "normalstuff", "value_from": null}, {"name": "secret", "value": "***", "value_from": null}, {"name": "thisisok", "value": "password", "value_from": null}], "arguments": [], "namespace": "default", "annotations": {}, "config_file": "~/Downloads/ECS\\ Kubeconfigs/shared-rke-01", "volume_mounts": [], "cluster_context": null, "pod_template_dict": null, "pod_template_file": null, "container_resources": null}

amoghrajesh avatar Apr 29 '24 04:04 amoghrajesh

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Jun 14 '24 00:06 github-actions[bot]