astronomer-providers
astronomer-providers copied to clipboard
Implement Async Hive Operators
Implement async versions for the following operators:
- [ ]
HiveOperator
Acceptance Criteria:
- [ ] Unit Tests coverage in the PR (90% Code Coverage -- We will need to add CodeCov separately to measure code cov) with all of them passing
- [ ] Example DAG using the async Operator that can be used to run Integration tests that are parametrized via Environment variables. Example - https://github.com/apache/airflow/blob/8a03a505e1df0f9de276038c5509135ac569a667/airflow/providers/google/cloud/example_dags/example_bigquery_to_gcs.py#L33-L35
- [ ] Add proper docstrings for each of the methods and functions including Example DAG on how it should be used (populate
- [ ] Exception Handling in case of errors
- [ ] Improve the OSS Docs to make sure it covers the following:
- [ ] Has an example DAG for the sync version
- [ ] How to add a connection via Environment Variable & explain each of the fields. Example - https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/connections/postgres.html
- [ ] How to use Guide for the Operator - example: https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/operators/postgres_operator_howto_guide.html
Please refer to this https://www.notion.so/astronomerio/HiveOperator-Roblox-work-2dbfbc70418844379d7a71764fd12498 documentation to try creating a Hive connection from Airflow
using pyhive execute query async_ has been implemented reference -https://pypi.org/project/PyHive/ if use_beeline is set to true we need to execute using beeline, checking with subprocess shell and run the query
Steps to connect to Amazon Hive from airflow worker container:
- Open SSH tunnel from the airflow worker node using the below command and then airflow container would consider localhost 8158 port would be same as EMR master node 10000 port on which Amazon Hive runs.
astro@18f56a4ee86e:/usr/local/airflow$ ssh -i /usr/local/providers_team_keypair.pem -N -L 127.0.0.1:8158:ec2-3-128-76-33.us-east-2.compute.amazonaws.com:10000 [email protected]
-
Copy key pair file and hive-site.xml from EMR master node to airflow worker
-
Create airflow connection using Hive cli wrapper as given below
- The task should now be able to run successfully.
[2022-03-25, 12:41:30 UTC] {hive.py:254} INFO - Connected to: Apache Hive (version 2.3.8-amzn-0) [2022-03-25, 12:41:30 UTC] {hive.py:254} INFO - Driver: Hive JDBC (version 1.1.0) [2022-03-25, 12:41:30 UTC] {hive.py:254} INFO - Transaction isolation: TRANSACTION_REPEATABLE_READ [2022-03-25, 12:41:31 UTC] {hive.py:254} INFO - 0: jdbc:hive2://localhost:8158/default> USE default; [2022-03-25, 12:41:32 UTC] {hive.py:254} INFO - No rows affected (1.315 seconds) [2022-03-25, 12:41:33 UTC] {hive.py:254} INFO - 0: jdbc:hive2://localhost:8158/default> show databases; [2022-03-25, 12:41:35 UTC] {hive.py:254} INFO - +----------------+--+ [2022-03-25, 12:41:35 UTC] {hive.py:254} INFO - | database_name | [2022-03-25, 12:41:35 UTC] {hive.py:254} INFO - +----------------+--+ [2022-03-25, 12:41:35 UTC] {hive.py:254} INFO - | default | [2022-03-25, 12:41:35 UTC] {hive.py:254} INFO - | providers_db | [2022-03-25, 12:41:35 UTC] {hive.py:254} INFO - +----------------+--+ [2022-03-25, 12:41:35 UTC] {hive.py:254} INFO - 2 rows selected (2.14 seconds) [2022-03-25, 12:41:36 UTC] {hive.py:254} INFO - 0: jdbc:hive2://localhost:8158/default> [2022-03-25, 12:41:36 UTC] {hive.py:254} INFO - Closing: 0: jdbc:hive2://localhost:8158/default;auth=none [2022-03-25, 12:41:36 UTC] {taskinstance.py:1272} INFO - Marking task as SUCCESS. dag_id=example_hive_dag, task_id=hive_query, execution_date=20220325T124114, start_date=20220325T124117, end_date=20220325T124136 [2022-03-25, 12:41:36 UTC] {local_task_job.py:154} INFO - Task exited with return code 0 [2022-03-25, 12:41:37 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
While creating SSH tunnel from the worker node in airflow if ssh isn't present by default install the ssh using apt install ssh
and after once ssh is made to the hadoop when running the query if you encounter "channel 1: open failed: administratively prohibited: open failed" . Connection got refused then set the below options
These options can be found in /etc/ssh/sshd_config.
You should ensure that:
AllowTCPForwarding is either not present, is commented out, or is set to yes PermitOpen is either not present, is commented out, or is set to any
Surprising why I didnt had to make the changes you've mentioned @rajaths010494
Pyhive library supports async way of execute
from pyhive import hive
from TCLIService.ttypes import TOperationState
cursor = hive.connect('localhost').cursor()
cursor.execute('SELECT * FROM my_awesome_data LIMIT 10', async=True)
status = cursor.poll().operationState
but doesn't return any query id or job id to poll we need to use cursor object to poll in trigger. In trigger we cannot send cursor object since its not JSON-serializable. as they are stored in db. Same is applicable for cloudera/impyla
Beeline to execute a query in sync version uses a sub process to submit the query and wait for it end . Similarly if we use the async sub process to run the query and wait for async sub process to run we need to wait for the sub process to end to get the query id in the same time the query is also gotten executed along with the sub process itself so we cant poll for status of the query. All the beeline properties can be found here
Parked for now due to limitations on PyHive and Impyla libraries. @rajaths010494 can you update the contributing guide as discussed