airflow icon indicating copy to clipboard operation
airflow copied to clipboard

feat: Add dynamic parameter config source support for DAG params

Open chen0427ok opened this issue 2 weeks ago • 1 comments

Description

Closes #56632

This PR adds support for loading DAG parameter options dynamically from external configuration files (INI, JSON, YAML) with filtering capabilities.

Changes

New Files

  • airflow-core/src/airflow/utils/param_config_loader.py: Utility functions to load options from config files
    • load_options_from_ini() - Load from INI files with filtering
    • load_options_from_json() - Load from JSON files with filtering
    • load_options_from_yaml() - Load from YAML files with filtering
    • All support filter conditions and custom key fields

Modified Files

  • task-sdk/src/airflow/sdk/definitions/param.py: Extended Param class
    • Added config_source parameter to __init__()
    • Added _load_options_from_config() method to load options from external files
    • Updated __copy__(), serialize(), and deserialize() methods
    • Automatically populates schema["enum"] from config files at DAG parse time

Test Files

  • airflow-core/tests/unit/utils/test_param_config_loader.py: 18 unit tests for config loader
  • task-sdk/tests/task_sdk/definitions/test_param.py: 4 new tests for Param class config_source feature

Documentation

  • airflow-core/newsfragments/56632.feature.rst: Release note for this feature

Usage Example

Config file (/path/to/interfaces.ini):

[InterfaceA]
TYPE = Script
DESCRIPTION = Script based interface A

[InterfaceB]
TYPE = EBICS
DESCRIPTION = Electronic Banking interface

[InterfaceC]
TYPE = Script
DESCRIPTION = Another script interface

DAG definition:
from airflow import DAG
from airflow.sdk import Param
from datetime import datetime

with DAG(
    dag_id="example_dynamic_params",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    params={
        "interface_name": Param(
            default="InterfaceA",
            config_source={
                "file": "/path/to/interfaces.ini",
                "filter": {"TYPE": "Script"},  # Only show Script type interfaces
                "key_field": "section",
            },
            description="Select an interface (dynamically loaded)",
        ),
    },
) as dag:
    # ... tasks

Result in UI:
- Renders a dropdown with only InterfaceA and InterfaceC (filtered by TYPE = Script)
- No frontend code changes needed - uses existing enum schema mechanism

Supported Config Formats

INI Files

- Filter by section properties
- Extract section names or specific field values
- Example: filter interfaces by type, environment, etc.

JSON Files

- Support array of objects
- Filter by object properties
- Extract specific field as option value

YAML Files

- Support array of objects
- Filter by object properties
- Extract specific field as option value

Testing

✅ All 22 unit tests passing:
- 18 tests for param_config_loader.py covering all formats and edge cases
- 4 tests for Param class config_source integration

✅ Manual testing in Airflow UI:
- Created test DAG with config_source parameter
- Verified dropdown rendering with filtered options
- Verified task execution with selected parameter values
- Tested with all three config formats (INI, JSON, YAML)

✅ Static checks:
- All prek hooks passed
- ruff formatting/linting passed
- mypy type checking passed

Test Commands

# Test config loader
uv run pytest airflow-core/tests/unit/utils/test_param_config_loader.py -v

# Test Param class integration
uv run pytest task-sdk/tests/task_sdk/definitions/test_param.py -k "config_source" -v

Implementation Notes

- Config files are loaded at DAG parse time, not at UI render time
- Options are stored in the enum field of the parameter's JSON schema
- If config loading fails (file not found, parsing error), a warning is logged and the parameter falls back to a regular string field
- No database schema changes required
- No UI code changes required - reuses existing enum dropdown rendering
- Backward compatible - existing DAGs without config_source continue to work unchanged

Follow-up Work

This PR provides the foundation for dynamic parameter options. Potential future enhancements:
- Support for database queries as config sources
- Support for API endpoints as config sources
- Caching mechanism for frequently accessed config files
- UI for managing config files

Checklist

- Feature implementation completed
- Unit tests added and passing (22/22)
- Manual testing in UI completed
- Static checks passing
- Newsfragment added
- Code follows Airflow coding standards
- Documentation in code (docstrings)
- Backward compatible

---

chen0427ok avatar Dec 11 '25 06:12 chen0427ok

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst) Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits. Apache Airflow is a community-driven project and together we are making it better 🚀. In case of doubts contact the developers at: Mailing List: [email protected] Slack: https://s.apache.org/airflow-slack

boring-cyborg[bot] avatar Dec 11 '25 06:12 boring-cyborg[bot]

While the idea is sound - we are in the middle of making task-sdk independent from airflow-core. We should not add anything in task-sdk that uses anything from airflow-core - so please move the code entirely to task-sdk.

potiuk avatar Dec 11 '25 08:12 potiuk

While the idea is sound - we are in the middle of making task-sdk independent from airflow-core. We should not add anything in task-sdk that uses anything from airflow-core - so please move the code entirely to task-sdk.

Thanks for the feedback! I've refactored the code to move param_config_loader entirely into the task-sdk as requested.

Summary of changes:

Moved files: Relocated source and test files from airflow-core to their respective paths in task-sdk.

Updated imports: Changed references in param.py to use airflow.sdk.definitions.param_config_loader.

Cleanup: Removed the try-except ImportError block since the dependency is now internal to the SDK.

chen0427ok avatar Dec 11 '25 09:12 chen0427ok