Add request-reply operator to Microsoft Azure provider
This PR adds a request-reply operator to implement the design pattern from Enterprise Integration Patterns, Hohpe, Woolf, Addison-Wesley, 2003
In particular, this means one could: a) Create a service bus queue and topic for a batch process b) set up an auto-scaling Azure Container Job listening to an Azure Service Bus queue for messages c) create a DAG using the request-reply operator to start the Azure Container Job and capture the reply when it finishes.
Potential improvements:
- have the operator background itself while waiting for a reply. No need to tie up a worker thread while a remote process runs
- Provide more parameters to control the subscription to the reply queue. Right now it deregisters itself if not used for 6 hours and drops messages after 1 hour. This seems reasonable to me since this subscription should only exist for the life of the operator, but more configuration might help some use case I haven't thought of.
A working Azure Container App Job can be built using the scripts/event-job-aca.zsh in the repo https://github.com/perry2of5/http-file-rtrvr
A working DAG is provided below:
from datetime import datetime
from airflow import DAG
from airflow.utils.context import Context
from airflow.operators.python import PythonOperator
from airflow.providers.microsoft.azure.operators.asb import AzureServiceBusRequestReplyOperator
from azure.servicebus import ServiceBusMessage
import json
dag = DAG('test-http-req-reply-dag', description='Test sending message to HTTP download service',
schedule_interval='0 12 * * *',
start_date=datetime(2017, 3, 20), catchup=False)
def print_hello():
return 'Hello world from first Airflow DAG!'
def body_generator(context: Context):
# Define the request body here
return '''
{
"method": "GET",
"url": "http://example.com/index.html",
"save_to": "example/dag/1",
"timeout_seconds": 5
}
'''
def process_reply(message: ServiceBusMessage, context: Context):
# Process the reply message here
print(f"Received reply: {message}")
body = json.loads(str(message))
context['ti'].xcom_push(key='URL', value=body['saved_to_fqn'])
context['ti'].xcom_push(key='STATUS_CODE', value=body['status'])
def print_url(**context):
url = context['ti'].xcom_pull(task_ids='send_request', key='URL')
print('url:', url)
def print_status_code(**context):
status_code = context['ti'].xcom_pull(task_ids='send_request', key='STATUS_CODE')
print("status_code", status_code)
hello_operator = PythonOperator(task_id='hello_task', dag=dag, python_callable=print_hello)
send_request = AzureServiceBusRequestReplyOperator(
task_id='send_request',
dag=dag,
request_queue_name="file-rtrvr-request",
request_body_generator=body_generator,
reply_topic_name="file-rtrvr-complete",
max_wait_time=360, # 6 minutes, poll for messages is 5 minutes in Azure Container App Job
reply_message_callback=process_reply,
azure_service_bus_conn_id="azure_service_bus_default",
)
status_operator = PythonOperator(
task_id='print_status_task',
dag=dag,
python_callable=print_status_code,
provide_context=True,
)
url_operator = PythonOperator(
task_id='done',
dag=dag,
python_callable=print_url,
provide_context=True,
)
hello_operator >> send_request >> url_operator >> status_operator
Thank you for the review.
Just want to check, right now several of the existing operators get a handle to the connection and call the Microsoft Azure library directly. These should really be refactored down into the hook as well, right? For example this: https://github.com/apache/airflow/blob/main/providers/src/airflow/providers/microsoft/azure/operators/asb.py#L316-L339
Just want to check, right now several of the existing operators get a handle to the connection and call the Microsoft Azure library directly. These should really be refactored down into the hook as well, right? For example this: https://github.com/apache/airflow/blob/main/providers/src/airflow/providers/microsoft/azure/operators/asb.py#L316-L339
Good catch, indeed it would be better that this code resides within the hook, the hook should take care of the connection handling and exposes the logic within a public method which on it's turn is called from the operator, that way the same operation can also be executed from the hook within a PythonOperator.
But don't worry, there are still a lot of operators written that way unfortunately, but if we clean up every time we need to modify an operator, we will get there one day :-)
When I was working on this new operator I thought about moving some of those down into the hooks so I could reuse. I'll go ahead and put in a PR to address that and then update this PR to use those. It will reduce duplication and be a good thing. I'll try and do it this afternoon...we'll see, I have day-job work to do suddenly.
It seems to me if a message is sent from an airflow DAG then the DAG author probably wants a message back at some point to confirm completion, check for errors, et cetera. To the best of my knowledge, the logic in this PR implements the standard design pattern for doing that.
Also, after the refactors that dabla requested this will be much smaller and the hooks will be more useful.
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.
Need to rewrite to use a dedicated response queue because there is a race condition between adding the subscription and modifying the filter. The alternatives are to discard any messages before sending the request or to fix the python SDK for ASB but seems simpler to use a queue.
I moved the ability to set message headers, reply-to and message-id into another PR https://github.com/apache/airflow/pull/47522
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.