logicapps
logicapps copied to clipboard
Limit concurrency for built-in Service Bus Trigger (Logic App Standard)
Hi.
It does not seem to be a way to limit the number of concurrent workflow runs the Service Bus Trigger may start. During peek load the Service Bus queue receive thousand messages a minute and the result is that the Logic App starts thousands of concurrent workflow runs (I use the split on option in the Trigger to get one workflow run per message).
This results in massive slowdown of workflow execution. The workflow usually take less than a second to run but the high number of parallel workflows pegs the Logic Apps CPU to 100% and workflows complete in upwards of 4 minutes if they complete at all.
I would like to limit the number of parallel workflows that run but have not found a way to do so. I have tried editing the host.json file by following: https://docs.microsoft.com/EN-US/azure/logic-apps/edit-app-settings-host-settings?tabs=visual-studio-code but the settings does not seem to make a difference. I can see that some settings are respected since the below settings limited the batch size of received messages to 10. However the trigger still continually poll Service Bus for messages and starts new workflows until the queue is empty.
{
"version": "2.0",
"extensions": {
"workflow": {
"settings": {
"Runtime.Trigger.MaximumWaitingRuns": "1",
"Runtime.Trigger.MaximumRunConcurrency": "10",
"Jobs.BackgroundJobs.NumWorkersPerProcessorCount": 2,
"Runtime.Backend.ForeachDefaultDegreeOfParallelism": "1",
"Runtime.Backend.FlowDefaultSplitOnItemsLimit": "20",
"Jobs.BackgroundJobs.DispatchingWorkersPulseInterval": "00:00:30"
}
},
"serviceBus": {
"maxMessageBatchSize": 10,
"maxConcurrentCalls": 1,
"sessionIdleTimeout": "00:01:00",
"autoCompleteMessages": true,
"maxAutoLockRenewalDuration": "00:00:30"
}
}
}
Am I missing something or is there as of today no way to limit the number of concurrent workflows the bult-in Service Bus Trigger can start?
TLDR: I would like to limit the number of parallel workflows that the built-in Service Bus Trigger can start but have not found a way to do so.
That's not good, we're having the same setting however I've not tested that the intended behavior works as it should.
"Runtime.Trigger.MaximumRunConcurrency: 10"
Doesn't that setting allow up to 10 simultaneously running instances?
I'm having the same issue for the built-in EventHub trigger. Can this be related to #455 ?
I think Niels is correct - it's not supported for built-in connectors. I had a go testing this, last weekend, and when it didn't work, I thought I'd run the Microsoft.Azure.Functions.ExtensionBundle.Workflows bundle through ILSpy and see what was in the source code (since the documentation indicates that this setting is available locally in Azure Functions, I figured it must be in that bundle somewhere).
What I found is that the 2 values I was interested in aren't wired up to host settings like some of the other values:
Runtime.Trigger.MaximumRunConcurrency
Runtime.Trigger.MaximumWaitingRuns
For other values (mentioned in the documentation), I can see that they are linked to a Host Setting value (which appears to be done via a custom config file):
e..g. FlowDefaultSplitOnItemsLimit is mapped to a HostSetting via this key:
<add key="Microsoft.Azure.Workflows.SkuExtensions.FlowDefaultSplitOnItemsLimit.ConsumptionSku" value="100000" hostSettingName="Runtime.Backend.FlowDefaultSplitOnItemsLimit" />
But I couldn't see anything linking MaximumRunConcurrency or MaximumWaitingRuns to any configuration entries, unless it's not in the bundle, or I missed it.
You can see where MaximumRunConcurrency gets loaded in from config:
public static long MaximumRunConcurrency => CloudConfigurationManager.GetConfigurationNumber("Microsoft.Azure.Workflows.Trigger.MaximumRunConcurrency", 100);
Although there's no indication that this value is bubbled up in a HostFile (the CloudConfiguratioNManager is handled by another component when run from Azure, which is not included in the bundle).
And you can see how it is used, e.g. to check if a trigger's concurrency is set higher than the global max, in this error message in a resource file:
The concurrency configuration of workflow trigger '{0}' of type '{1}' at line '{2}' and column '{3}' is not valid. The value specified for property 'runs' exceeds the maximum allowed. Actual: '{4}'. Maximum: '{5}'.
However, in my testing, I don't think this value is used for Managed API connections either. It's possible that a later version of the bundle uses it - the latest version for VSCode is 1.2.11, whereas in Azure (as of today's date) the version is 1.2.12.
In any case, I think we might be missing something - and that's how the built-in Service Bus connector works (or Event Hub) vs how we expect it to work.
In my case, I have a subscription with 37,000 messages in it. I need to process all of those messages, in no particular order, and there's no particular time frame to process them.
Ideally, what I want is the ability to turn on a workflow, have it grab x messages and spin up x workflows to process them and then, as each workflow completes, grab another message from the queue i.e. limit it to x running triggers at any given time for this workflow.
I don't believe MaximumRunConcurrency would achieve this, as this is for all workflows, not just the one I'm thinking of. My workflow makes calls to about 7 other workflows (i.e. other triggers that need to execute), and takes about 90 secs to complete (mainly due to delays in calling Dynamics).
Using a WS1 SKU, I have worked out that I can process about 25 messages in parallel before I start getting timeouts.
The thing is, the Service Bus connector is designed to drain a queue or topic, getting a batch of messages at a time. I can see that the property maxMessageBatchSize (in the service bus section) is used by the built-in connector (it's not used by the Managed API connector).
If I use splitOn, then what happens is we get triggered, we get x messages from the subscription, spin up x workflows, then get another x messages from the subscription, spin up another x workflows, etc. until we either have one workflow for each message OR we hit the default concurrent limit for the Logic App (at which point the messages start being moved to the dead-letter queue, with an error saying they couldn't be delivered).
This isn't too far from how the ManagedAPI connector works for ServiceBus:
- If you use splitOn, you'll get one workflow for each message in the queue/subscription;
- If you don't use splitOn, you'll get x workflows, each with an array of y messages (where y is <= maxMessageBatchSize).
This is by design - the purpose of the trigger is to drain the queue/subscription as quickly as possible: although there's a polling cycle, that is only used to check for new messages once the queue/subscription has been drained.
So for me, the solution is this:
- Implement a workflow using a recurrence trigger
- Use the Managed API Service Bus connector to get x messages from a queue/topic
- (this uses Peek Lock, and you can specify the max number of messages to get, you'll get between 1 and x messages in an array)
- Use the CallWorkflow shape and a splitOn setting to start x workflow instances
The above is not ideal, as this is the pattern I need to use to drain a queue/subscription - once it's empty, I don't want to be polling, I want to be executing as new messages arrive (i.e. implement a real-time workflow using the built-in connector). I could implement a service that monitors the queue/subscription and once it gets over a certain size, stops the "real time" workflow and starts the "polling recurrence workflow".
So yes, MaximumRinConcurrency doesn't work, but even if it did, I'd want it to be specific to a workflow. As an aside, this value (under the serviceBus Section) also doesn't seem to be used: MaxConcurrentCalls. It didn't make any difference to my runs during testing.
I'm not sure if Logic Apps overrides the default values for the underlying ServiceBus WebJob used for the built-in connector?
@axellundh the concurrency support has been added for service bus built-in trigger only as of now. Its currently behind a feature flag, please use below setting on the workflow extension to enable it. Please ensure the concurrency definition on the workflow definition still remains the same under "runtimeConfiguration" section. I will also recommend adjusting the maximum message batch size (maxMessageBatchSize) accordingly.
Please note, we will have it published in a blog post sometime next week.
"extensions": { "workflow":{ "settings": { "Runtime.ServiceProviders.FunctionTriggers.ConcurrencySupported": "true" } } }
@axellundh the concurrency support has been added for service bus built-in trigger only as of now. Its currently behind a feature flag, please use below setting on the workflow extension to enable it. Please ensure the concurrency definition on the workflow definition still remains the same under "runtimeConfiguration" section. I will also recommend adjusting the maximum message batch size (maxMessageBatchSize) accordingly.
Please note, we will have it published in a blog post sometime next week.
"extensions": { "workflow":{ "settings": { "Runtime.ServiceProviders.FunctionTriggers.ConcurrencySupported": "true" } } }
What do you mean with "please ensure the concurrency definition on the workflow...."? If I just want 10 concurrent calls at the same time, should I add "something" in my workflow or can I just adjust the host.json as defined below (snippet) where my max is 10?
{ "version": "2.0", "extensions": { "workflow": { "settings": { "Runtime.ServiceProviders.FunctionTriggers.ConcurrencySupported": "true", "Runtime.Trigger.MaximumWaitingRuns": "1", "Runtime.Trigger.MaximumRunConcurrency": "10", "Jobs.BackgroundJobs.NumWorkersPerProcessorCount": 2, "Runtime.Backend.ForeachDefaultDegreeOfParallelism": "1", "Runtime.Backend.FlowDefaultSplitOnItemsLimit": "20", "Jobs.BackgroundJobs.DispatchingWorkersPulseInterval": "00:00:30" }
And additionally, is this applicable for stateful or stateless workflows? Because looking at this Microsoft Blog I'm getting the feeling that it only applies for stateless.
https://techcommunity.microsoft.com/t5/azure-integration-services-blog/optimizing-service-bus-message-processing-concurrency-using/ba-p/4003303
I think that a couple of configuration examples would be very helpful here. While I have managed to set the concurrency on the workflow, that alone is not enough.
- maxMessageBatchSize must be changed, as the default value is too big for even the splitOn feature. This means that if the queue length exceeds 100 messages, only the first 100 messages will be processed while the rest is immediately abandoned. However, it's not very clear exactly which setting this refers to.
- messages should not be retrieved from the queue until there is a free workflow "slot" to process it. The message lock timer starts when the message leaves the service bus queue. If there is also a queuing situation on the logic app side, there is a high risk that the timer expires and message is reset on the queue.
Hi - please bear with us as we fix the documents to reflect the preview changes here
As Aprana mentioned (as of today) you need to opt into the Logic App Standard "built in Service Bus auto-complete trigger" concurrency setting via the feature flag in the host file as per comment above.
When the feature is "GA" then the flag wont be required. please also note that the designer is running behind the feature, it does not yet support the additional configuration,
Once you've opted into the feature via host json then you need to set concurrency on the trigger via the workflow definition json and example of which is here , it's the "runtimeConfiguration" section.
{
"definition": {
"$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
"actions": {},
"contentVersion": "1.0.0.0",
"outputs": {},
"triggers": {
"When_messages_are_available_in_a_queue": {
"inputs": {
"parameters": {
"isSessionsEnabled": false,
"queueName": "queue1"
},
"serviceProviderConfiguration": {
"connectionName": "serviceBus-2",
"operationId": "receiveQueueMessages",
"serviceProviderId": "/serviceProviders/serviceBus"
}
},
"runtimeConfiguration": {
"concurrency": {
"runs": "100"
}
},
"splitOn": "@triggerOutputs()?['body']",
"type": "ServiceProvider"
}
}
},
"kind": "Stateful"
}
Please note that this example is for a stateful logic app standard. We'll provide some additional clarity about stateless workflows ASAP.
Also, the value here "runs": "100"
cannot be parameterized in the workflow definition json as of today , you need to enter a static value as above.
The other host setting mentioned is "Runtime.Trigger.MaximumRunConcurrency" Example
{
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle.Workflows",
"version": "[1.*, 2.0.0)"
},
"extensions": {
"workflow":{
"settings": {
"Runtime.ServiceProviders.FunctionTriggers.ConcurrencySupported": "true",
"Runtime.Trigger.MaximumRunConcurrency": 100
}
},
"serviceBus": {
"prefetchCount": 25,
"maxMessageBatchSize": 25
}
}
}
To illustrate in the SB trigger example: By default the trigger run concurrency is 100 (it can be between 1 and 100) what the above value does is allow you to override the maximum ... So for example, If I add this setting to the host.json
"Runtime.Trigger.MaximumRunConcurrency":125
Then, in my workflow trigger definition I should be able to say
"runtimeConfiguration": { "concurrency": { "runs": 125 }} //without the host setting override to the maximum this would need to be 100 or less
There's also something else to keep in mind. The split-on functionality (which applies to stateful workflows only) is capped at 100 when trigger concurrency is applied. What this means is you need to be careful that the batch you set (in my example above its 25) is always less than 100 otherwise you'll see an error. Batching is more efficient (the runtime has optimizations for this) but obviously you need to "tune" the exact batch value you use to fit your particular scenario.
I think that a couple of configuration examples would be very helpful here. While I have managed to set the concurrency on the workflow, that alone is not enough.
- maxMessageBatchSize must be changed, as the default value is too big for even the splitOn feature. This means that if the queue length exceeds 100 messages, only the first 100 messages will be processed while the rest is immediately abandoned. However, it's not very clear exactly which setting this refers to.
- messages should not be retrieved from the queue until there is a free workflow "slot" to process it. The message lock timer starts when the message leaves the service bus queue. If there is also a queuing situation on the logic app side, there is a high risk that the timer expires and message is reset on the queue.
@johan-burman - yes to the batch/split-on as per my comment above
The second point I will look to get some clarity on - the trigger should be abstracting this detail.
maxMessageBatchSize
In addition to what Ben mentioned above, please ensure that the service bus extension setting "maxMessageBatchSize" is lesser than or equal to the the maximum concurrent runs specified in the trigger runtime configuration. Like in his example its 25 although the concurrent runs defined for the trigger is 100 in the workflow definition.
I tried to work with all of the settings above and I am getting at least somthing working. However, the behavior is still very strange. When using for example 20 maxConcurrenCalls with a batchSize of one, i do see that 20 executions are starting. However, the next messages will only be picked up once the first 20 executions have been finished. That is still unworkable at has a lot of impact on performance. Is this as designed?
Thanks @bengimblett for the additional info!
I've done a few tests to try and figure out how these parameters (concurrency.runs and maxMessageBatchSize) affect the behavior of the workflow, and I made some observations:
To start with, it seems that the concurrency control is indeed working. If I set concurrency.runs in my trigger configuration, there will not be any more simultaneously running workflows than that number.
Also, the maxMessageBatchSize does affect the number of messages fetched from Service Bus. However, it seems that there is a polling behavior to this message fetching and it's very unclear how the maxMessageBatchSize is applied. If I set it to 1 and monitor the queue, the message count will decrease by 1 at regular intervals, but these intervals does not at all correlate with the message processing in the workflow.
As an example: If I set maxMessageBatchSize to 1, and concurrency.runs also to 1 for a workflow that has a 40 second delay in it, I can see that 2 (!) new runs get initialized approximately every 30 seconds and put into "waiting" status. As the workflow takes at least 40 seconds to complete, this list of waiting runs just grows longer (as long as there are messages on the queue). And as the message lock timer starts when the message gets "polled" from the queue, the locks will start to time out and messages are either DLQ'ed or put back onto the queue...
But if I increase the maxMessageBatchSize to 10, it will now grab 20 messages initially (and put 19 directly into "waiting" status). But with this setting it waits until 10 messages has been processed, and then it fetches 10 more messages from the service bus and adds to the waiting list... So a bit different from what @romlodihc observed above.
(prefetchCount is set to 0 in both examples above)
I not sure what to make of the above, except that it's not what I expected this to be. We need a way to limit how much resources a workflow consumes and to make sure it doesn't DoS-attack anything downstream. This mechanism seems much too crude for that.
Are there maybe any other configuration parameters that I'm missing?
@romlodihc what kind of workflow are you using? Are you using stateless workflows? The stateless workflow would wait for runs to complete before trigger can get new messages.
@johan-burman when maxMessageBatchSize is set to 10 are you using the concurrent runs still as 1? If so the batch read would continuously keep on getting more messages from service bus. Every trigger run might get a batch of 10 but Logic Apps would process only 1 and rest all will be in waiting state. Please ensure "maxMessageBatchSize" is lesser than or equal to the the maximum concurrent runs specified in the trigger runtime configuration.
@romlodihc what kind of workflow are you using? Are you using stateless workflows? The stateless workflow would wait for runs to complete before trigger can get new messages.
@johan-burman when maxMessageBatchSize is set to 10 are you using the concurrent runs still as 1? If so the batch read would continuously keep on getting more messages from service bus. Every trigger run might get a batch of 10 but Logic Apps would process only 1 and rest all will be in waiting state. Please ensure "maxMessageBatchSize" is lesser than or equal to the the maximum concurrent runs specified in the trigger runtime configuration.
We are using stateful workflows.
@johan-burman when maxMessageBatchSize is set to 10 are you using the concurrent runs still as 1? If so the batch read would continuously keep on getting more messages from service bus. Every trigger run might get a batch of 10 but Logic Apps would process only 1 and rest all will be in waiting state. Please ensure "maxMessageBatchSize" is lesser than or equal to the the maximum concurrent runs specified in the trigger runtime configuration.
Yes, maxMessageBatchSize is 10 and concurrent runs is 1. But what I see is that the trigger will fetch 20 messages, put 19 as waiting and one as running. Then after 9 more runs finished, it will fetch an additional 10 messages which are all put in waiting along with the other 10 remaining from the first fetch.
And like I also describe above, when batchSize and concurrent.runs are both 1, it keeps fetching messages and adding waiting runs despite the first one not being finished. This means that the queue of waiting messages is growing faster than they are processed by the workflow.
Every trigger run might get a batch of 10 but Logic Apps would process only 1 and rest all will be in waiting state.
What condition is it that makes the trigger fire and get another batch? I can see that in the first case, it is holding off for 10 runs to complete which correlates with the concurrency.runs setting, but in the second case it seems to run on a timer.
Hello,
while the concurrent runs seems to be working as expected, the trigger behaviour seems to work on a timer which is not very perfomance friendly when you have a queue for example with a few thousand records.
Using a stateful workflow, having for example a batch of 100 messages and max concurrency of 100, the next batch is only retrieved after all 100 previous runs are completed plus what seems to be a ~30 second delay. Shouldn't the batch be optimized to keep fetching messages while there is capacity, why is there a need to wait for the entire batch to finish ?
The following example depicts the performance issue on the trigger with a batch of 1 and max concurrency of 1 also. The Queue had 4 messages on it. It took +- 90s to process these 4 messages overall (because the trigger is slow to poll messages)
Trigger History
Run History (you can see the runs are fast)
So most of the time the logic app is idle until something is triggered.
We also configured this, on the hope it would help the trigger firing faster, but it didn't AzureFunctionsJobHost__extensions__serviceBus__maxBatchWaitTime -> 00:00:01
Regards, Tiago
@johan-burman when maxMessageBatchSize is set to 10 are you using the concurrent runs still as 1? If so the batch read would continuously keep on getting more messages from service bus. Every trigger run might get a batch of 10 but Logic Apps would process only 1 and rest all will be in waiting state. Please ensure "maxMessageBatchSize" is lesser than or equal to the the maximum concurrent runs specified in the trigger runtime configuration.
Yes, maxMessageBatchSize is 10 and concurrent runs is 1. But what I see is that the trigger will fetch 20 messages, put 19 as waiting and one as running. Then after 9 more runs finished, it will fetch an additional 10 messages which are all put in waiting along with the other 10 remaining from the first fetch.
And like I also describe above, when batchSize and concurrent.runs are both 1, it keeps fetching messages and adding waiting runs despite the first one not being finished. This means that the queue of waiting messages is growing faster than they are processed by the workflow.
Every trigger run might get a batch of 10 but Logic Apps would process only 1 and rest all will be in waiting state.
What condition is it that makes the trigger fire and get another batch? I can see that in the first case, it is holding off for 10 runs to complete which correlates with the concurrency.runs setting, but in the second case it seems to run on a timer.
@johan-burman on this " maxMessageBatchSize is 10 and concurrent runs is 1." my understanding here is that the concurrent runs value should always be larger than the batch size specified to avoid the "waiting" state. This makes logical sense too because the concurrency setting is a throttling mechanism.
@johan-burman on this " maxMessageBatchSize is 10 and concurrent runs is 1." my understanding here is that the concurrent runs value should always be larger than the batch size specified to avoid the "waiting" state. This makes logical sense too because the concurrency setting is a throttling mechanism.
Agreed, but using maxMessageBatchSize = 1 and concurrency.runs = 1 doesn't work as expected either.
@johan-burman on this " maxMessageBatchSize is 10 and concurrent runs is 1." my understanding here is that the concurrent runs value should always be larger than the batch size specified to avoid the "waiting" state. This makes logical sense too because the concurrency setting is a throttling mechanism.
Agreed, but using maxMessageBatchSize = 1 and concurrency.runs = 1 doesn't work as expected either.
@johan-burman - can you mail me (my email is linked to my profile) with your logic app name (which is unique) and the workflow name where you had the behaviour you describe also a date time when you ran the test would be great and Aprana / colleagues will look into this specifically. I'll ask the same of Tiago (I have his details)
@johan-burman - can you mail me (my email is linked to my profile) with your logic app name (which is unique) and the workflow name where you had the behaviour you describe also a date time when you ran the test would be great and Aprana / colleagues will look into this specifically. I'll ask the same of Tiago (I have his details)
Mail sent!
I don't know why this is closed.
I can see that a new option is added to the 'When messages are available in a topic' called limit. When enabled, to can set it to 1 to enable sequentially. However, when trying to save the workflow, you will get the error 'The runtimeConfiguration.concurrency' is not allowed. Why is this happening?
I can see that a new option is added to the 'When messages are available in a topic' called limit. When enabled, to can set it to 1 to enable sequentially. However, when trying to save the workflow, you will get the error 'The runtimeConfiguration.concurrency' is not allowed. Why is this happening?
I think you still need to enable the whole concurrency control thing in host.json (see full examples earlier in the thread):
"Runtime.ServiceProviders.FunctionTriggers.ConcurrencySupported": "true"
Using Split on with this method does not seem to work. When sending through a sample payload through the service bus like the below:
[
{
"test": "1"
},
{
"test2": "2"
}
]
I would expect "test" and "test2" to split into two different workflows. I've set up the concurrency, split on, and other settings as advised above but having no luck.
As per above PR the learn document covering the Service Bus Trigger for Logic Apps has been updated to cover the built in connector concurrency https://learn.microsoft.com/en-us/azure/connectors/connectors-create-api-servicebus?tabs=consumption#service-bus-built-in-connector-triggers
Note: The feature flag mentioned in this thread is no longer required, opt in through the designer UI or by adding the correct fields in Json. The Logic Apps designer now supports the workflow trigger fields required.
@bengimblett
Looking in the documentation here:
https://learn.microsoft.com/en-us/azure/connectors/connectors-create-api-servicebus?tabs=consumption#service-bus-built-in-connector-triggers
It looks like
Runtime.ServiceProviders.FunctionTriggers.DynamicListenerEnableDisableInterval
is not accepting values smaller than 10 seconds. Tried "00:00:05" to no avail still looks like the trigger runs every 10 seconds. It that the minimum possible value?