Add callback to process Azure Service Bus message contents
This PR adds a callback to process messages from an Azure Service Bus.
Right now, on main, the Azure Service Bus message receiver simply logs the messages received and returns. This PR preserves that default, but adds the ability to pass in a callback function with the signature ServiceBusMessage -> None which is applied to each message received.
This change is made to both the general receiver and the subscription receiver.
I'd like someone from the Airflow community to weigh in on if this is a good solution. To me, it seemed like the least intrusive way to add a hook to obtain the message contents. However, I'm not sure if it plays well with the Airflow Operator standards as I've barely been using Airflow for a month.
Please let me know if you see any errors, omissions, or if I'm headed in the wrong direction.
Thank you, Tim
This addresses https://github.com/apache/airflow/discussions/26446
Any chance @Lee-W or @Taragolis can review?
It looks good at the first glance but would like to know what would be the main usage of this functionality
For my particular use case, I want to grab a file-location URL out of the message body and pass it to the next task in the DAG. In general, it provides a way to receive data from the message and react to it. This data may be success or failure information, file locations, etc. Other people have looked for this functionality too: https://github.com/apache/airflow/discussions/26446
In the AWS provider, the SQS sensor provides a similar result by putting the message into XCOM under the key messages. https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/sensors/sqs.py#L46 I don't know that I always want the entire message shoved into XCOM though. Often, I might want to branch or just store part of the data and so I provide a callback function so the user of the operator can choose what they want to happen with the data in the message body.
For my particular use case, I want to grab a file-location URL out of the message body and pass it to the next task in the DAG. In general, it provides a way to receive data from the message and react to it. This data may be success or failure information, file locations, etc. Other people have looked for this functionality too: #26446
In the AWS provider, the SQS sensor provides a similar result by putting the message into XCOM under the key messages. https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/sensors/sqs.py#L46 I don't know that I always want the entire message shoved into XCOM though. Often, I might want to branch or just store part of the data and so I provide a callback function so the user of the operator can choose what they want to happen with the data in the message body.
Sounds good 👍
This is ready to be reviewed again.
Thank you @potiuk and @Lee-W.
I might have messed this one up from a usability point of view. I was assuming that if the context (to access XComs) was needed in the callback it could be captured in a lambda and passed down. However, I'm having trouble accomplishing that and it look like the old technique of calling an operator from within an "outer" task function is now highly discouraged.
The obvious solution"is to add a context parameter to the callback function. However, this would change the method signature and potentially break code somewhere "in the wild". I highly doubt anyone is using this yet, but that seems like the wrong thing to do.
Another solution would be to have to possible signatures of the callback: one with the context parameter and one without. Then inspect the callback function and only pass the context if the callback takes two arguments. That seems messy. Any thoughts on this?
I suppose for 1-argument callbacks we could catch the type error, print a warning to update the function, and then call it with just the message. Is generating the TypeError expensive?
The obvious solution"is to add a context parameter to the callback function. However, this would change the method signature and potentially break code somewhere "in the wild". I highly doubt anyone is using this yet, but that seems like the wrong thing to do.
maybe bump a major version for this?
Another solution would be to have to possible signatures of the callback: one with the context parameter and one without. Then inspect the callback function and only pass the context if the callback takes two arguments. That seems messy. Any thoughts on this?
I personally don't like it 🤔 sounds hacky to me
I suppose for 1-argument callbacks we could catch the type error, print a warning to update the function, and then call it with just the message. Is generating the TypeError expensive?
I guess it just as expensive as a regular exception?
Everything except releasing a major version sounds too hacky to me as well.... I'll open a ticket for this.
opened 43361