airflow
airflow copied to clipboard
SparkSqlOperator and SparkSubmitOperator are using different types for configurations
Apache Airflow Provider(s)
apache-spark
Versions of Apache Airflow Providers
apache-airflow==2.9.2 apache-airflow-providers-apache-spark==4.8.2
Apache Airflow version
2.9.2
Operating System
MacOS
Deployment
Docker-Compose
Deployment details
No response
What happened
The SparkSubmitOperator uses a dictionary to handle the 'conf' property of the operator
The SparkSqlOperator uses a string in format PARAM=VALUE,PARAM2=VALUE2 to handle the 'conf' property.
The first option allows a config like this to be passed:
conf = {
'spark.jars.packages': 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.80.0',
'spark.driver.extraJavaOptions': '-Divy.cache.dir=/tmp -Divy.home=/tmp',
'spark.sql.extensions': 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions'
}
while the second option will always split the packages into:
--conf spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2 --conf org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.80.0 due to it being split on the comma as a delimiter.
This effectively does not allow adding any of the comma delimited configurations of spark.
The SparkSubmitOperator also has a bigger list of available properties; including the --packages flag which is available as well on the spark/bin/spark-sql script.
What you think should happen instead
The first option allows for more flexibility when adding configs, and a dictionary seems the right way to store the configs. It would enforce the same behaviour on both spark operators, making it easier to adjust/maintain. Also less documentation to keep :)
https://github.com/apache/airflow/blob/54dfead4ccfe7a5b41eab3433c5a38159434e014/airflow/providers/apache/spark/hooks/spark_sql.py#L146
This is the place where the config is split on ','
How to reproduce
Create a dag and task:
conf = {
'spark.jars.packages': 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.80.0',
'spark.driver.extraJavaOptions': '-Divy.cache.dir=/tmp -Divy.home=/tmp',
'spark.sql.extensions': 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions'
}
config_string = ','.join([f"{key}={value}" for key, value in conf.items()])
merge_branch = SparkSqlOperator(
name="merge_branch",
task_id="merge_branch",
conf=config_string, # requires a string instead of a dict
conn_id='spark',
dag=dag,
sql=f"MERGE BRANCH {ref} INTO main IN nessie",
retries=0
)
Anything else
No response
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
@duhizjame feel free to raise a PR