airflow
airflow copied to clipboard
Trino SQL split trailing semi-colon
Apache Airflow version
Other Airflow 2 version (please specify below)
What happened
We're currently using Airflow 2.6.1 but I believe the issue persists in the latest versions.
The SQLExecuteQueryOperator has the functionality to split_statements using a semi-colon. Trino is one of the few execution engines that does not accept a trailing semi-colon. sqlparse leaves the semi-colon after splitting.
The TrinoHook doesn't implement it's own run method so this is then handled by the DbApiHook. split_statements on the DbApiHook uses sqlparse to do the splitting.
What you think should happen instead
When passing in multiple SQL statements that are semi-colon seperated, they should be split without keeping the semi-colon so that Trino considers it to be valid.
Because of Trino's current implementation if we want to make use of this functionality I believe that custom splitting logic should be implemented in Trino's hook. If the Trino implementation is revised later on so that it accepts semi-colons then this could be removed again.
How to reproduce
Run the SQLExecuteQueryOperator using Trino with multiple SQL statements that are semi-colon seperated. E.g.
SQLExecuteQueryOperator(
task_id="semi_colon_test",
conn_id="trino_default",
sql="SELECT 1; SELECT 2",
split_statements=True,
handler=list,
)
Operating System
Debian GNU/Linux 11 (bullseye)
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==8.0.0 apache-airflow-providers-celery==3.1.0 apache-airflow-providers-common-sql==1.4.0 apache-airflow-providers-databricks==4.1.0 apache-airflow-providers-ftp==3.3.1 apache-airflow-providers-http==4.3.0 apache-airflow-providers-imap==3.1.1 apache-airflow-providers-mysql==5.0.0 apache-airflow-providers-postgres==5.4.0 apache-airflow-providers-presto==5.0.0 apache-airflow-providers-redis==3.1.0 apache-airflow-providers-sqlite==3.3.2 apache-airflow-providers-trino==5.0.0
Deployment
Other
Deployment details
Self hosted in AWS using ECS Fargate, RDS (Postgres) and Elasticache (Redis).
Anything else
Error message
trino.exceptions.TrinoUserError: TrinoUserError(type=USER_ERROR, name=SYNTAX_ERROR, message="line 1:84: mismatched input ';'. Expecting: '.', <EOF>", query_id=20231009_081239_17877_i2yca)
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
@MikeWallis42 Did you try to run operator with split_statements=True ? Could you also check on the latest versions of apache-airflow-providers-common-sql and apache-airflow-providers-trino providers?
@Taragolis apologies I hadn't added that in to my minimal example but I am using split_statements=True
I can check the latest versions but by reading the code I believe that nothing has changed.
I've just checked by upgrading to airflow 2.7.1 with the following provider versions, the issue is still present.
apache-airflow-providers-amazon==8.6.0 apache-airflow-providers-celery==3.3.3 apache-airflow-providers-common-sql==1.7.1 apache-airflow-providers-databricks==4.4.0 apache-airflow-providers-ftp==3.5.1 apache-airflow-providers-http==4.5.1 apache-airflow-providers-imap==3.3.1 apache-airflow-providers-mysql==5.3.0 apache-airflow-providers-postgres==5.6.0 apache-airflow-providers-presto==5.1.3 apache-airflow-providers-redis==3.3.1 apache-airflow-providers-sqlite==3.4.3 apache-airflow-providers-trino==5.3.0
@MikeWallis42 how about passing a list of sql commands instead? Does it address your needs?
SQLExecuteQueryOperator(
task_id="semi_colon_test",
conn_id="trino_default",
sql=["SELECT 1", "SELECT 2"],
handler=list,
)
@Bisk1 I believe that would work if we pased in Python strings but I was hoping to make use of template_searchpath to read in SQL files so that I didn't need to maintain it in code.
The full pattern that I'm hoping to use is:-
template_searchpath- SQL files- dynamic task mapping - there are a variable number of items per day
- Jinja templating - parameterise the SQL
- Multi line SQL - so that I can set Trino session properties before running the SQL or make each mapped task idempotent by running DELETE then INSERT.
The workaround would be to read the contents of each file and split it within the DAG definition which isn't recommended given it would happen on each scheduler heartbeat.
@MikeWallis42 Airflow uses external library to split sql: https://github.com/andialbrecht/sqlparse and it doesn't support removing semicolons. I opened an issue there.
@MikeWallis42 looks like the relevant enhancement was merged to library's main branch and will go out in the next release. Unfortunately there is no due date for the next release https://github.com/andialbrecht/sqlparse/milestone/27
Maybe worth asking the maintainers about the release date and explain the impact that it block fixing this bug
New version of
@MikeWallis42 looks like the relevant enhancement was merged to library's main branch and will go out in the next release. Unfortunately there is no due date for the next release https://github.com/andialbrecht/sqlparse/milestone/27
This was released 2 months ago https://github.com/andialbrecht/sqlparse/milestone/27 :), what would be the next steps?
This was released 2 months ago https://github.com/andialbrecht/sqlparse/milestone/27 :), what would be the next steps?
Up to you, you can upgrade the package in your installation or upgrade to latest version of Airflow - if you look at airlfow constraints, you can see which deps are coming it by "default" (i.e. golden/tested set of deps) you will find that sqlparse is 0.5.0 in both main and 2.9.2. But if you want and airflow in your version allows it you can manually upgrade it. You are deployment manager @ariasjose, it's your decision, but you will find all information about constraints and upgrade scenarios in the docs: https://airflow.apache.org/docs/apache-airflow/stable/installation/installing-from-pypi.html
Closing as it seems to be solved.
@potiuk @Bisk1 Hello!
Sorry to bother you, but it seems like even after sqlparse upgrade some actions still need to be taken on the Airflow providers' side. I mean, after sqlparse got upgraded, the strip_semicolon flag was added to the split() function (see here), but it is False by default, so until its usage in DbApiHook (here) is modified accordingly (or until the run() method is overriden in all the Trino/Presto/other affected hooks), nothing will work.
Feel free to add it @Illumaria - or we can wait until somoene does.
Seems pretty feasible, I'll try to find the time.
The fix PR seems not included in 2.10.5 release. Do we have an estimate when and what version this PR will be in a release?
@willshen99 Hi! As far as I get it, the fix have nothing to do with Airflow version, it's all about provider package version. Did you upgrade your Trino provider package to the version that has this fix included?
I get it working by upgrading Trino provider package and adding split_statements=True in SQLExecuteQueryOperator. Thank you @Illumaria!