airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Add callback to process Azure Service Bus message contents

Open perry2of5 opened this issue 1 year ago • 4 comments

This PR adds a callback to process messages from an Azure Service Bus.

Right now, on main, the Azure Service Bus message receiver simply logs the messages received and returns. This PR preserves that default, but adds the ability to pass in a callback function with the signature ServiceBusMessage -> None which is applied to each message received.

This change is made to both the general receiver and the subscription receiver.

I'd like someone from the Airflow community to weigh in on if this is a good solution. To me, it seemed like the least intrusive way to add a hook to obtain the message contents. However, I'm not sure if it plays well with the Airflow Operator standards as I've barely been using Airflow for a month.

Please let me know if you see any errors, omissions, or if I'm headed in the wrong direction.

Thank you, Tim

perry2of5 avatar Aug 19 '24 21:08 perry2of5

This addresses https://github.com/apache/airflow/discussions/26446

perry2of5 avatar Aug 20 '24 22:08 perry2of5

Any chance @Lee-W or @Taragolis can review?

perry2of5 avatar Aug 23 '24 22:08 perry2of5

It looks good at the first glance but would like to know what would be the main usage of this functionality

Lee-W avatar Aug 28 '24 09:08 Lee-W

For my particular use case, I want to grab a file-location URL out of the message body and pass it to the next task in the DAG. In general, it provides a way to receive data from the message and react to it. This data may be success or failure information, file locations, etc. Other people have looked for this functionality too: https://github.com/apache/airflow/discussions/26446

In the AWS provider, the SQS sensor provides a similar result by putting the message into XCOM under the key messages. https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/sensors/sqs.py#L46 I don't know that I always want the entire message shoved into XCOM though. Often, I might want to branch or just store part of the data and so I provide a callback function so the user of the operator can choose what they want to happen with the data in the message body.

perry2of5 avatar Aug 28 '24 16:08 perry2of5

For my particular use case, I want to grab a file-location URL out of the message body and pass it to the next task in the DAG. In general, it provides a way to receive data from the message and react to it. This data may be success or failure information, file locations, etc. Other people have looked for this functionality too: #26446

In the AWS provider, the SQS sensor provides a similar result by putting the message into XCOM under the key messages. https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/sensors/sqs.py#L46 I don't know that I always want the entire message shoved into XCOM though. Often, I might want to branch or just store part of the data and so I provide a callback function so the user of the operator can choose what they want to happen with the data in the message body.

Sounds good 👍

Lee-W avatar Aug 30 '24 10:08 Lee-W

This is ready to be reviewed again.

perry2of5 avatar Sep 04 '24 20:09 perry2of5

Thank you @potiuk and @Lee-W.

perry2of5 avatar Sep 04 '24 22:09 perry2of5

I might have messed this one up from a usability point of view. I was assuming that if the context (to access XComs) was needed in the callback it could be captured in a lambda and passed down. However, I'm having trouble accomplishing that and it look like the old technique of calling an operator from within an "outer" task function is now highly discouraged.

The obvious solution"is to add a context parameter to the callback function. However, this would change the method signature and potentially break code somewhere "in the wild". I highly doubt anyone is using this yet, but that seems like the wrong thing to do.

Another solution would be to have to possible signatures of the callback: one with the context parameter and one without. Then inspect the callback function and only pass the context if the callback takes two arguments. That seems messy. Any thoughts on this?

perry2of5 avatar Oct 14 '24 15:10 perry2of5

I suppose for 1-argument callbacks we could catch the type error, print a warning to update the function, and then call it with just the message. Is generating the TypeError expensive?

perry2of5 avatar Oct 14 '24 16:10 perry2of5

The obvious solution"is to add a context parameter to the callback function. However, this would change the method signature and potentially break code somewhere "in the wild". I highly doubt anyone is using this yet, but that seems like the wrong thing to do.

maybe bump a major version for this?

Another solution would be to have to possible signatures of the callback: one with the context parameter and one without. Then inspect the callback function and only pass the context if the callback takes two arguments. That seems messy. Any thoughts on this?

I personally don't like it 🤔 sounds hacky to me

I suppose for 1-argument callbacks we could catch the type error, print a warning to update the function, and then call it with just the message. Is generating the TypeError expensive?

I guess it just as expensive as a regular exception?

Lee-W avatar Oct 15 '24 01:10 Lee-W

Everything except releasing a major version sounds too hacky to me as well.... I'll open a ticket for this.

perry2of5 avatar Oct 15 '24 15:10 perry2of5

opened 43361

perry2of5 avatar Oct 24 '24 18:10 perry2of5