airflow
airflow copied to clipboard
Adding ClickHouse Provider
closes: #10893
Description
Adding ClickHouse provider based on its Python SDK https://clickhouse-driver.readthedocs.io/en/latest/
Users can create their own custom operators leveraging the ClickHouseHook directly or building their operator on ClickHouseOperator by providing result_processor method,
operator = ClickHouseOperator(
task_id='clickhouse_operator',
sql="SELECT * FROM gettingstarted.clickstream",
dag=dag,
result_processor=lambda cursor: print(cursor)
)
The sensor can be implemented by SQL
sensor = ClickHouseSensor(
task_id="clickhouse_sensor",
sql="SELECT * FROM gettingstarted.clickstream where customer_id='customer1'",
timeout=60,
poke_interval=10,
dag=dag,
)
Testing providers with sql and template files(*_tf)
Hi @pateash
I'm interested in having airflow support ClickHouse, but I don't want to have such a standard implementation.
- You realisation very similar Anton Bryzgalov's rather old code https://github.com/bryzgaloff/airflow-clickhouse-plugin. His code is suitable for plugins, but it looks strange in the provider
- I'm very confused by calling self.get_conn() in a constructor. The connection is not closed anywhere.
- clickhouse-driver supports DB API 2.0 so inheriting from DbApiHook and ClickHouseOperator will provide many out of the box solutions. insert_rows( ) can be overridden with Cursor.executemany or just rise exception.
cursor.executemany('INSERT INTO test (x) VALUES', [[200]])
- Your ClickHouseHook provides very limited functionality, compare for example with ExasolHook (also an analytical database).
- Your ClickHouseOperator does not provide the same capabilities (last result, many requests in template, xcom ). I would get these features (and more) just using SQLExecuteQueryOperator
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.