airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Add request-reply operator to Microsoft Azure provider

Open perry2of5 opened this issue 1 year ago • 5 comments

This PR adds a request-reply operator to implement the design pattern from Enterprise Integration Patterns, Hohpe, Woolf, Addison-Wesley, 2003

In particular, this means one could: a) Create a service bus queue and topic for a batch process b) set up an auto-scaling Azure Container Job listening to an Azure Service Bus queue for messages c) create a DAG using the request-reply operator to start the Azure Container Job and capture the reply when it finishes.

Potential improvements:

  • have the operator background itself while waiting for a reply. No need to tie up a worker thread while a remote process runs
  • Provide more parameters to control the subscription to the reply queue. Right now it deregisters itself if not used for 6 hours and drops messages after 1 hour. This seems reasonable to me since this subscription should only exist for the life of the operator, but more configuration might help some use case I haven't thought of.

A working Azure Container App Job can be built using the scripts/event-job-aca.zsh in the repo https://github.com/perry2of5/http-file-rtrvr

A working DAG is provided below:

from datetime import datetime
from airflow import DAG
from airflow.utils.context import Context
from airflow.operators.python import PythonOperator
from airflow.providers.microsoft.azure.operators.asb import AzureServiceBusRequestReplyOperator
from azure.servicebus import ServiceBusMessage
import json

dag = DAG('test-http-req-reply-dag', description='Test sending message to HTTP download service',
          schedule_interval='0 12 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)


def print_hello():
    return 'Hello world from first Airflow DAG!'


def body_generator(context: Context):
    # Define the request body here
    return '''
        {
            "method": "GET",
            "url": "http://example.com/index.html",
            "save_to": "example/dag/1",
            "timeout_seconds": 5
        }
        '''


def process_reply(message: ServiceBusMessage, context: Context):
    # Process the reply message here
    print(f"Received reply: {message}")
    body = json.loads(str(message))
    context['ti'].xcom_push(key='URL', value=body['saved_to_fqn'])
    context['ti'].xcom_push(key='STATUS_CODE', value=body['status'])


def print_url(**context):
    url = context['ti'].xcom_pull(task_ids='send_request', key='URL')
    print('url:', url)


def print_status_code(**context):
    status_code = context['ti'].xcom_pull(task_ids='send_request', key='STATUS_CODE')
    print("status_code", status_code)


hello_operator = PythonOperator(task_id='hello_task', dag=dag, python_callable=print_hello)

send_request = AzureServiceBusRequestReplyOperator(
        task_id='send_request',
        dag=dag,
        request_queue_name="file-rtrvr-request",
        request_body_generator=body_generator,
        reply_topic_name="file-rtrvr-complete",
        max_wait_time=360, # 6 minutes, poll for messages is 5 minutes in Azure Container App Job
        reply_message_callback=process_reply,
        azure_service_bus_conn_id="azure_service_bus_default",
)

status_operator = PythonOperator(
    task_id='print_status_task',
    dag=dag,
    python_callable=print_status_code,
    provide_context=True,
)

url_operator = PythonOperator(
    task_id='done',
    dag=dag,
    python_callable=print_url,
    provide_context=True,
)


hello_operator >> send_request >> url_operator >> status_operator

perry2of5 avatar Dec 04 '24 23:12 perry2of5

Thank you for the review.

perry2of5 avatar Dec 10 '24 17:12 perry2of5

Just want to check, right now several of the existing operators get a handle to the connection and call the Microsoft Azure library directly. These should really be refactored down into the hook as well, right? For example this: https://github.com/apache/airflow/blob/main/providers/src/airflow/providers/microsoft/azure/operators/asb.py#L316-L339

perry2of5 avatar Dec 10 '24 17:12 perry2of5

Just want to check, right now several of the existing operators get a handle to the connection and call the Microsoft Azure library directly. These should really be refactored down into the hook as well, right? For example this: https://github.com/apache/airflow/blob/main/providers/src/airflow/providers/microsoft/azure/operators/asb.py#L316-L339

Good catch, indeed it would be better that this code resides within the hook, the hook should take care of the connection handling and exposes the logic within a public method which on it's turn is called from the operator, that way the same operation can also be executed from the hook within a PythonOperator.

But don't worry, there are still a lot of operators written that way unfortunately, but if we clean up every time we need to modify an operator, we will get there one day :-)

dabla avatar Dec 11 '24 07:12 dabla

When I was working on this new operator I thought about moving some of those down into the hooks so I could reuse. I'll go ahead and put in a PR to address that and then update this PR to use those. It will reduce duplication and be a good thing. I'll try and do it this afternoon...we'll see, I have day-job work to do suddenly.

perry2of5 avatar Dec 11 '24 15:12 perry2of5

It seems to me if a message is sent from an airflow DAG then the DAG author probably wants a message back at some point to confirm completion, check for errors, et cetera. To the best of my knowledge, the logic in this PR implements the standard design pattern for doing that.

Also, after the refactors that dabla requested this will be much smaller and the hooks will be more useful.

perry2of5 avatar Dec 11 '24 19:12 perry2of5

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Feb 04 '25 00:02 github-actions[bot]

Need to rewrite to use a dedicated response queue because there is a race condition between adding the subscription and modifying the filter. The alternatives are to discard any messages before sending the request or to fix the python SDK for ASB but seems simpler to use a queue.

perry2of5 avatar Feb 07 '25 17:02 perry2of5

I moved the ability to set message headers, reply-to and message-id into another PR https://github.com/apache/airflow/pull/47522

perry2of5 avatar Mar 21 '25 05:03 perry2of5

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar May 13 '25 00:05 github-actions[bot]