airflow
airflow copied to clipboard
Allow jinja templating connection ids for all third party operators
Description
We use private staging and prod S3s(Ceph clusters for example) in our office.
So there are often cases where DAGs are running with only connection ids changed.
We prefer to use Param rather than to use hardcoded connection ids to make our code reusable.
I only gave an example for Amazon operator, but templating connection ids is required for other operators too.
Why is it needed? Code reusability
Use case/motivation
with DAG(
dag_id="example_s3",
params={
"aws_conn_id": Param("", type="string"),
},
...
) as dag:
create_object = S3CreateObjectOperator(
task_id="create_object",
s3_bucket=bucket_name,
s3_key=key,
data=DATA,
replace=True,
aws_conn_id="{{ params.aws_conn_id }}", # Params enable us to provide runtime configuration
)
Related issues
No response
Are you willing to submit a PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
This might be blocked by https://github.com/apache/airflow/issues/29069 .
In general you could extend S3CreateObjectOperator.template_fields by create custom Operator with required template_fields
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
class AwesomeS3CreateObjectOperator:
template_fields: ("aws_conn_id", *S3CreateObjectOperator.template_fields)
I think it might be a good first issue, I also required to collect all connection ID from the existed operators and list them, so everyone could pick up and make the changes.
Mark as good first issue, so maybe someone could volunteer free time to find at least most of the non-templated connections ids.
I collected all third party operators which have connection IDs.
Third party operators which have connection IDs
[ ] Airbyte
[ ] Alibaba
[ ] Amazon
[ ] Apache Beam
[ ] Apache Cassandra
[ ] Apache Drill
[ ] Apache Druid
[ ] Apache Flink
[ ] Apache HDFS
[ ] Apache Hive
[ ] Apache Impala
[ ] Apache Kafka
[ ] Apache Kylin
[ ] Apache Livy
[ ] Apache Pig
[ ] Apache Pinot
[ ] Apache Spark
[ ] Apprise
[ ] ArangoDB
[ ] Asana
[ ] Atlassian Jira
[ ] Cloudant
[ ] CNCF Kubernetes
[ ] Cohere
[ ] Common IO
[ ] Common SQL
[ ] Databricks
[ ] Datadog
[ ] dbt Cloud
[ ] Dingding
[ ] Discord
[ ] Docker
[ ] Elasticsearch
[ ] Exasol
[ ] Facebook
[ ] File Transfer Protocol (FTP)
[ ] GitHub
[ ] Google
[ ] gRPC
[ ] Hashicorp
[ ] Hypertext Transfer Protocol (HTTP)
[ ] IBM Cloudant
[ ] Influx DB
[ ] Internet Message Access Protocol (IMAP)
[ ] Java Database Connectivity (JDBC)
[ ] Jenkins
[ ] Microsoft Azure
[ ] Microsoft SQL Server (MSSQL)
[ ] Microsoft PowerShell Remoting Protocol (PSRP)
[ ] Microsoft Windows Remote Management (WinRM)
[ ] MongoDB
[ ] MySQL
[ ] Neo4j
[ ] ODBC
[ ] OpenAI
[ ] OpenFaaS
[ ] OpenLineage
[ ] Open Search
[ ] Opsgenie
[ ] Oracle
[ ] Pagerduty
[ ] Papermill
[ ] PgVector
[ ] Pinecone
[ ] PostgreSQL
[ ] Presto
[ ] Qdrant
[ ] Redis
[ ] Salesforce
[ ] Samba
[ ] Segment
[ ] Sendgrid
[ ] SFTP
[ ] Slack
[ ] SMTP
[ ] Snowflake
[ ] SQLite
[ ] SSH
[ ] Tableau
[ ] Tabular
[ ] Telegram
[ ] Teradata
[ ] Trino
[ ] Vertica
[ ] Weaviate
[ ] Yandex
[ ] Zendesk
I think we would just add the connection_id paramter to template_fields for all existing providers operators? However, that would be cumbersome and how to apply that convention moving forward? I'd be happy to make the updates to the operators, but I'm concerned about inconsistency.