azure-sdk-for-net icon indicating copy to clipboard operation
azure-sdk-for-net copied to clipboard

[QUERY] Manipulate Prefetch for better processing

Open KondzioSSJ4 opened this issue 3 years ago • 11 comments

Library name and version

Try migrate Microsoft.Azure.WebJobs.Extensions.ServiceBus 4.2.2 to Microsoft.Azure.WebJobs.Extensions.ServiceBus 5.2.0

Query/Question

Hi

I created some feature that was based on MessageReceiver from Microsoft.Azure.WebJobs.Extensions.ServiceBus 4.2.2 That was manipulated a Prefetch to better fit the amount of getting the message so process as big as possible batches but including the condition of application e.g.:

  • scale batch size between 1-X messages by attribute (to prevent throttling of knowing functions)
  • scale down if a batch of messages is too slow processed (and future prefetched messages may be expired before it is even started)
  • scale up if processing work fast
  • scale to 1 when start receiving dependant service errors (and append delay to get next message)

Now when try to take ServiceBusReceiver from Microsoft.Azure.WebJobs.Extensions.ServiceBus 5.2.0 I can't change ServiceBusReceiver Prefetch or even get it (previously I use IBinding and took "MessageReceiver" from context.BindingData, but now all of them return ServiceBusMessageActions)

Could you tell me is there any possible way to change Prefetch in runtime? Or do you have any plan to make it?

Environment

.NET SDK Version: 6.0.100 Commit: 9e8b04bbff

Runtime environment: OS Name: Windows OS Version: 10.0.19044 OS Platform: Windows RID: win10-x64

Host (useful for support): Version: 6.0.0 Commit: 4822e3c3aa

IDE: Visual Studio 2022 (v. 17.0.2)

KondzioSSJ4 avatar Dec 14 '21 19:12 KondzioSSJ4

Thank you for your feedback. Tagging and routing to the team member best able to assist.

jsquire avatar Dec 14 '21 20:12 jsquire

@jsquire @JoshLove-msft Any update?

KondzioSSJ4 avatar Jan 04 '22 16:01 KondzioSSJ4

I don't think this issue is specific to the extensions, as when using the SDK directly you are not able to modify the Prefetch count for a running receiver.

We haven't heard of this request before - we can take it into consideration.

JoshLove-msft avatar Jan 04 '22 22:01 JoshLove-msft

@JoshLove-msft That would be nice

That allows to easily implement things like:

  • circuit breaker (without downloading big batches from the service bus queue because we can decrease the number of messages in batch for the period when "circuit" is broken)
  • "resource governor" (so e.g. decrease size of processing batch when function based on resources that have a heavy loaded)
  • (potential) prevent situation when prefetch take to big batch, and the message was expired when they waiting for the function (for this, I have no idea if that is still an issue, or you already use renew messages if the message is in the "prefetch" state to prevent expiration)

KondzioSSJ4 avatar Jan 05 '22 08:01 KondzioSSJ4

@JoshLove-msft Any update?

KondzioSSJ4 avatar Apr 26 '22 12:04 KondzioSSJ4

@JoshLove-msft Any update?

Unfortunately, we don't have an update yet. We are waiting to see if this can be addressed from the service side before considering an improvement in the client library. We appreciate your patience.

JoshLove-msft avatar May 13 '22 02:05 JoshLove-msft

@JoshLove-msft I'm not sure if that would work as I think but when I look at SDKs it looks like it can be a small change to allow me to change that prefetch because that looks like when in Microsoft.Extensions.Hosting.ServiceBusHostBuilderExtensions method:

public static IWebJobsBuilder AddServiceBus(this IWebJobsBuilder builder, Action<ServiceBusOptions> configure){
   //...
  builder.Services.AddAzureClientsCore();
  builder.Services.AddSingleton<MessagingProvider>();
  builder.Services.AddSingleton<ServiceBusClientFactory>();
  return builder;
}

it would be:

public static IWebJobsBuilder AddServiceBus(this IWebJobsBuilder builder, Action<ServiceBusOptions> configure){
   //...
  builder.Services.AddAzureClientsCore();
  builder.Services.TryAddSingleton<MessagingProvider>();
  builder.Services.AddSingleton<ServiceBusClientFactory>();
  return builder;
}

so change AddSingleton<MessagingProvider> to TryAddSingleton<MessagingProvider> in assembly: Microsoft.Azure.WebJobs.Extensions.ServiceBus

then I could inject my version of MessagingProvider to allow manipulation of Prefetch

Why I can't make it in now? because dependency is added into ServiceBusWebJobsStartup and it looks like it starts after my IWebJobsStartup class so I don't see that services in DI container

KondzioSSJ4 avatar May 13 '22 06:05 KondzioSSJ4

You should already be able to inject your own MessagingProvider. Any chance you can include your startup code? However, even if you are able to inject your own provider, I still don't see how you would be able to manipulate the Prefetch, as the property is not settable on the ServiceBusReceiver/ServiceBusProcessor, and you would only be allowed to create one instance of either of these.

JoshLove-msft avatar May 19 '22 02:05 JoshLove-msft

@JoshLove-msft I provided a sample of how I can override that Prefetch maybe it's not perfect (because I need to use some proxy like Castle.Core) but it's possible

In tests, I provided a short sample if that really overridden that value and in the azure function project, you can see how I inject my custom MessagingProvider and with some function In the end that should generate some logs when a new instance from CustomMessagingProvider was created

Did I inject my CustomMessagingProvider incorrectly?

You can find sample here: https://github.com/KondzioSSJ4/AzureSDK-Issue25929

P.S. I also added additional startup project to demonstrate how it works with service bus startup [assembly: WebJobsStartup(typeof(AzureWebJobs_Issue25929.Startup))] executed first and then: [assembly: WebJobsStartup(typeof(AzureWebJobs_Issue25929.OtherStartupLike_ServiceBusWebJobsStartup))]

and when it's configured to use AddSingleton in OtherStartupLike_ServiceBusWebJobsStartup then I would have 2 injected instances of InjectedClass, because my startup already add new instance and extension try again add new implementation

but when it is TryAddSingleton then that would inject only 1 instance of InjectedClass

KondzioSSJ4 avatar May 19 '22 10:05 KondzioSSJ4

I see that you are proxying the PrefetchCount property, but the library only uses the PrefetchCount once when setting up the AMQP link with the service. Therefore any changes to it would not be honored. I think we can consider this as a possible enhancement.

JoshLove-msft avatar Aug 05 '22 02:08 JoshLove-msft

Prefetch can now be adjusted in the processor, but still not in the receiver.

JoshLove-msft avatar Jan 04 '23 18:01 JoshLove-msft

Hi

It looks like 5.8.1 version of Microsoft.Azure.WebJobs.Extensions.ServiceBus solved all my needs

So I can use IBinding to take a MessageReceiver and then use reflection to change Prefetch and it will be applied when the last batch will be consumed, what is good enough

Thanks, for me you can close a ticket

KondzioSSJ4 avatar Feb 10 '23 08:02 KondzioSSJ4