airflow icon indicating copy to clipboard operation
airflow copied to clipboard

SparkSqlOperator and SparkSubmitOperator are using different types for configurations

Open duhizjame opened this issue 1 year ago • 1 comments

Apache Airflow Provider(s)

apache-spark

Versions of Apache Airflow Providers

apache-airflow==2.9.2 apache-airflow-providers-apache-spark==4.8.2

Apache Airflow version

2.9.2

Operating System

MacOS

Deployment

Docker-Compose

Deployment details

No response

What happened

The SparkSubmitOperator uses a dictionary to handle the 'conf' property of the operator The SparkSqlOperator uses a string in format PARAM=VALUE,PARAM2=VALUE2 to handle the 'conf' property.

The first option allows a config like this to be passed:

conf = {
        'spark.jars.packages': 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.80.0',
        'spark.driver.extraJavaOptions': '-Divy.cache.dir=/tmp -Divy.home=/tmp',
        'spark.sql.extensions': 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions'
}

while the second option will always split the packages into: --conf spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2 --conf org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.80.0 due to it being split on the comma as a delimiter. This effectively does not allow adding any of the comma delimited configurations of spark.

The SparkSubmitOperator also has a bigger list of available properties; including the --packages flag which is available as well on the spark/bin/spark-sql script.

What you think should happen instead

The first option allows for more flexibility when adding configs, and a dictionary seems the right way to store the configs. It would enforce the same behaviour on both spark operators, making it easier to adjust/maintain. Also less documentation to keep :)

https://github.com/apache/airflow/blob/54dfead4ccfe7a5b41eab3433c5a38159434e014/airflow/providers/apache/spark/hooks/spark_sql.py#L146

This is the place where the config is split on ','

How to reproduce

Create a dag and task:

conf = {
        'spark.jars.packages': 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.80.0',
        'spark.driver.extraJavaOptions': '-Divy.cache.dir=/tmp -Divy.home=/tmp',
        'spark.sql.extensions': 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions'
}

config_string = ','.join([f"{key}={value}" for key, value in conf.items()])

merge_branch = SparkSqlOperator(
    name="merge_branch",
    task_id="merge_branch",
    conf=config_string, # requires a string instead of a dict
    conn_id='spark',
    dag=dag,
    sql=f"MERGE BRANCH {ref} INTO main IN nessie",
    retries=0
)

Anything else

No response

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

duhizjame avatar Jun 30 '24 13:06 duhizjame

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

boring-cyborg[bot] avatar Jun 30 '24 13:06 boring-cyborg[bot]

@duhizjame feel free to raise a PR

aritra24 avatar Jul 01 '24 07:07 aritra24