azure-service-bus-dotnet
azure-service-bus-dotnet copied to clipboard
Use batching in MessageReceiver.RegisterMessageHandler
This is a feature request. Message processing is quite slow using MessageReceiver.RegisterMessageHandler because ReceiveMessagePump deals with one message at a time causing lots of roundtrips completing or abandoning messages, no matter the PreFetchCount.
Concept (BatchingReceiveMessagePump):
var batchSize = Math.Min(this.BatchSize, this.receiver.PrefetchCount);
while (!token.IsCancellationRequested) {
IList<Message> messages;
while (!token.IsCancellationRequested && (messages = await this.receiver.ReceiveAsync(batchSize, this.ReceiveTimeout).ConfigureAwait(false)) != null) {
if (messages.Count == 0) break;
var completed = new HashSet<string>();
var abandoned = new HashSet<string>(messages.Select(m => m.SystemProperties.LockToken));
foreach (var message in messages) {
var lockToken = message.SystemProperties.LockToken;
try {
await this.handler(message, token).ConfigureAwait(false);
completed.Add(lockToken);
abandoned.Remove(lockToken);
}
catch (Exception ex) {
// Exception handler might send to retry queue, deadletter or reschedule
var handled = await this.exceptionHandler(m, ex).ConfigureAwait(false);
if (handled) {
completed.Add(lockToken);
abandoned.Remove(lockToken);
}
}
}
// No batch abandon available, causing slowness when processing fails
foreach (var lockToken in abandoned)
await this.receiver.AbandonAsync(lockToken).ConfigureAwait(false);
if (completed.Any()) // Cannot complete empty list!
await this.receiver.CompleteAsync(completed).ConfigureAwait(false);
}
}
I made something like this and went from processing around 10 messages/second to 500 messages/second with PreFetchCount around 1000 and BatchSize 500, depending on processing time per message, message size and the lock timeout (allowing larger batches), maybe auto-renewing locks could push this even further but I haven't investigated.
This is a big breaking change for us as well when we look at switching clients. In order to achieve the throughput we are looking for we process batches of messages at a time. It would be great to extend the interface to accept a Func<Message[]> message array or collection. I am open to trying a pull request if people think this is a good idea and worth including.
This is a big breaking change
The old client didn't support OnMessage API with multiple message, jus a single message. You can receive batches, just w/o MessageHandler. Exactly as it used to be with the old client.
Are you referring to something else?
Maybe I missed that in the API, where is that method located?
On Wed, Oct 24, 2018, 11:21 AM Sean Feldman [email protected] wrote:
This is a big breaking change
The old client didn't support OnMessage API with multiple message, jus a single message. You can receive batches https://github.com/Azure/azure-service-bus-dotnet/blob/e5552e8cd2cb3072808ab306de164462bfd6c553/src/Microsoft.Azure.ServiceBus/Core/IMessageReceiver.cs#L68, just w/o MessageHandler. Exactly as it used to be with the old client.
Are you referring to something else?
— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/Azure/azure-service-bus-dotnet/issues/582#issuecomment-432726441, or mute the thread https://github.com/notifications/unsubscribe-auth/AAAj-nwBjRdPBpRdTMjPycZgMTOuuFo7ks5uoJNrgaJpZM4XIcW9 .
Any IMessageReceiver implementator.
Thanks, not sure how I missed that :) Any chance we want a batch version of a MessageHandler. Handlers are the easiest and safest way to do messaging, just super slow for throughput. Its a lot of heavy lifting for a client to roll their own optimized batching receiver.
~~@abombss that is a different issue (feature request) I'd think 🙂.~~ That's what this one is about.