azure-sdk-for-net
azure-sdk-for-net copied to clipboard
[BUG] ProcessSessionMessageEventArgs.ReleaseSession does not work correctly
Library name and version
Azure.Messaging.ServiceBus 7.11.1
Describe the bug
If you call the ReleaseSession() on multiple threads for the same session (using the ServiceBusSessionProcessor with the MaxConcurrentCallsPerSession > 1) the session does not close and the processor continues to process messages in the current session. Is this expected behavior? Adding a lock solves the problem.
Expected behavior
The processor will close the current session after processing all already received messages.
Actual behavior
The processor does not close the session and continues to process messages ignoring the ReleaseSession() call.
Reproduction Steps
var client = new ServiceBusClient(someConnectionString);
var processor = client.CreateSessionProcessor(someQueueName, new ServiceBusSessionProcessorOptions() { MaxConcurrentSessions = 1, MaxConcurrentCallsPerSession = 3, AutoCompleteMessages = false });
processor.ProcessMessageAsync += Processor_ProcessMessageAsync;
...
async Task Processor_ProcessMessageAsync(ProcessSessionMessageEventArgs arg)
{
// lock (someLock) solves the problem
arg.ReleaseSession(); // this call has no effect
await Task.Delay(TimeSpan.FromSeconds(5));
await arg.CompleteMessageAsync(arg.Message);
}
Environment
No response
I'm not an expert there. Just sharing my observations.
I think the session is attempted to cancel but the release operation doesn't await the outcome of the internal cancellation. This means the method returns even though the actual cancellation is not yet done which means it can still take a while until the actual cancellation is observed.
Furthermore the documentation of ReleaseSession mentions
The session may end up being reopened for processing immediately after closing if there are messages remaining in the session (This depends on what other session messages may be in the queue or subscription).
Synchronizing within the handler adds global synchronization for all the concurrently executed handlers which would explain that the effects of the cancellation are seen sequentially.
There might be a problem though with the implementation
public override async Task CancelAsync()
{
if (_sessionCancellationSource is { IsCancellationRequested: false })
{
_sessionCancellationSource.Cancel();
_sessionCancellationSource.Dispose();
}
if (_sessionLockRenewalTask != null)
{
await _sessionLockRenewalTask.ConfigureAwait(false);
}
}
two concurrent releases would attempt to dispose the same token source which could cause issues. I think the safest approach there would be to acquire the semaphore like CloseReceiverCore does. So basically something like
bool releaseSemaphore = false;
try
{
// Intentionally not including cancellation token as
// we need to ensure that we at least attempt to close the receiver if needed.
await WaitSemaphore(CancellationToken.None).ConfigureAwait(false);
if (_sessionCancellationSource is { IsCancellationRequested: false })
{
_sessionCancellationSource.Cancel();
_sessionCancellationSource.Dispose();
}
if (_sessionLockRenewalTask != null)
{
await _sessionLockRenewalTask.ConfigureAwait(false);
}
finally
{
if (releaseSemaphore)
{
_semaphore.Release();
}
}
Thank you for your feedback. Tagging and routing to the team member best able to assist. Please be aware that responses will be delayed due to the US holidays.
As @danielmarbach mentioned, the session is not closed immediately, but rather all in-flight processing for the current session is allowed to complete after which point the session is closed. So for messages that have already been dispatched to your handler, they will continue to be processed in your handler. But you shouldn't see new messages coming in for the session you have released (unless the session has already been closed and reopened).
Can you clarify the exact behavior that you are seeing and what you are actually expecting?
There might be a problem though with the implementation
Interesting.. I think you are right and an issue could arise if two threads enter the if block simultaneously (would need to be simultaneous as otherwise the IsCancellationRequested check would prevent the condition from being true), and one thread cancels and disposes before the other thread attempts to cancel. In that case we'd get an ObjectDisposedException, so apparently not what is being seen here. Still it would be good to patch this up. I think we can probably just use a lock within the if block rather than a sempahoreSlim as we have no need for any async operations.
In the first case, the handler continues to process messages and ReleaseSession() has no effect. The session is only closed when the handler has processed all messages in the queue for this session.
var client = new ServiceBusClient(someConnectionString);
var processor = client.CreateSessionProcessor(someQueueName, new ServiceBusSessionProcessorOptions() { MaxConcurrentSessions = 1, MaxConcurrentCallsPerSession = 3, AutoCompleteMessages = false });
processor.ProcessMessageAsync += Processor_ProcessMessageAsync;
...
async Task Processor_ProcessMessageAsync(ProcessSessionMessageEventArgs arg)
{
await Task.Delay(TimeSpan.FromSeconds(5));
arg.ReleaseSession();
await arg.CompleteMessageAsync(arg.Message);
}
In the second case, the session is closed as soon as all received messages have been processed. After that, the next session (or the same session) will be received and so on.
var client = new ServiceBusClient(someConnectionString);
var processor = client.CreateSessionProcessor(someQueueName, new ServiceBusSessionProcessorOptions() { MaxConcurrentSessions = 1, MaxConcurrentCallsPerSession = 3, AutoCompleteMessages = false });
processor.ProcessMessageAsync += Processor_ProcessMessageAsync;
...
async Task Processor_ProcessMessageAsync(ProcessSessionMessageEventArgs arg)
{
await Task.Delay(TimeSpan.FromSeconds(5));
lock (someLock)
{
arg.ReleaseSession();
}
await arg.CompleteMessageAsync(arg.Message);
}
Can you provide an end-to-end repro including sending messages? I have not been able to reproduce what you are seeing.
Here is the link to the repository. If you run it and enter the number 1 then the program will send 100 messages to the queue with random sessionId (from 0 to 9), if you enter any other number the program will start processing messages. The problem doesn't always happen, repeatability is 2/5.
Adding a lock solves the problem.
private async Task ProcessMessageAsync(ProcessSessionMessageEventArgs arg)
{
await Task.Delay(TimeSpan.FromSeconds(2));
lock (_lock)
arg.ReleaseSession();
await arg.CompleteMessageAsync(arg.Message);
}
UP
Hi @n-zyma, I am running your repro code. Can you clarify how I can tell if the issue is occurring? Here is the output I am seeing:
Enter 1 to write or enter any other number to read: 2
Press any key to stop reading
SessionInitializingAsync(1)
SessionClosingAsync(1)
SessionInitializingAsync(4)
SessionClosingAsync(4)
SessionInitializingAsync(0)
SessionClosingAsync(0)
SessionInitializingAsync(2)
SessionClosingAsync(2)
SessionInitializingAsync(6)
SessionClosingAsync(6)
SessionInitializingAsync(3)
SessionClosingAsync(3)
SessionInitializingAsync(7)
SessionClosingAsync(7)
SessionInitializingAsync(9)
SessionClosingAsync(9)
SessionInitializingAsync(5)
SessionClosingAsync(5)
SessionInitializingAsync(8)
SessionClosingAsync(8)
SessionInitializingAsync(1)
SessionClosingAsync(1)
SessionInitializingAsync(4)
SessionClosingAsync(4)
SessionInitializingAsync(0)
SessionClosingAsync(0)
SessionInitializingAsync(2)
SessionClosingAsync(2)
SessionInitializingAsync(6)
SessionClosingAsync(6)
SessionInitializingAsync(3)
SessionClosingAsync(3)
SessionInitializingAsync(7)
SessionClosingAsync(7)
SessionInitializingAsync(9)
SessionClosingAsync(9)
SessionInitializingAsync(5)
SessionClosingAsync(5)
SessionInitializingAsync(8)
SessionClosingAsync(8)
SessionInitializingAsync(1)
SessionClosingAsync(1)
SessionInitializingAsync(4)
SessionClosingAsync(4)
SessionInitializingAsync(2)
SessionClosingAsync(2)
SessionInitializingAsync(6)
SessionClosingAsync(6)
SessionInitializingAsync(3)
SessionClosingAsync(3)
SessionInitializingAsync(7)
SessionClosingAsync(7)
SessionInitializingAsync(9)
SessionClosingAsync(9)
SessionInitializingAsync(5)
SessionClosingAsync(5)
SessionInitializingAsync(8)
SessionClosingAsync(8)
SessionInitializingAsync(1)
SessionClosingAsync(1)
SessionInitializingAsync(4)
SessionClosingAsync(4)
SessionInitializingAsync(2)
SessionClosingAsync(2)
SessionInitializingAsync(6)
SessionClosingAsync(6)
SessionInitializingAsync(9)
SessionClosingAsync(9)
SessionInitializingAsync(2)
SessionClosingAsync(2)
SessionInitializingAsync(2)
SessionClosingAsync(2)
Hi @n-zyma, I am running your repro code. Can you clarify how I can tell if the issue is occurring? Here is the output I am seeing:
Enter 1 to write or enter any other number to read: 2 Press any key to stop reading SessionInitializingAsync(1) SessionClosingAsync(1) SessionInitializingAsync(4) SessionClosingAsync(4) SessionInitializingAsync(0) SessionClosingAsync(0) SessionInitializingAsync(2) SessionClosingAsync(2) SessionInitializingAsync(6) SessionClosingAsync(6) SessionInitializingAsync(3) SessionClosingAsync(3) SessionInitializingAsync(7) SessionClosingAsync(7) SessionInitializingAsync(9) SessionClosingAsync(9) SessionInitializingAsync(5) SessionClosingAsync(5) SessionInitializingAsync(8) SessionClosingAsync(8) SessionInitializingAsync(1) SessionClosingAsync(1) SessionInitializingAsync(4) SessionClosingAsync(4) SessionInitializingAsync(0) SessionClosingAsync(0) SessionInitializingAsync(2) SessionClosingAsync(2) SessionInitializingAsync(6) SessionClosingAsync(6) SessionInitializingAsync(3) SessionClosingAsync(3) SessionInitializingAsync(7) SessionClosingAsync(7) SessionInitializingAsync(9) SessionClosingAsync(9) SessionInitializingAsync(5) SessionClosingAsync(5) SessionInitializingAsync(8) SessionClosingAsync(8) SessionInitializingAsync(1) SessionClosingAsync(1) SessionInitializingAsync(4) SessionClosingAsync(4) SessionInitializingAsync(2) SessionClosingAsync(2) SessionInitializingAsync(6) SessionClosingAsync(6) SessionInitializingAsync(3) SessionClosingAsync(3) SessionInitializingAsync(7) SessionClosingAsync(7) SessionInitializingAsync(9) SessionClosingAsync(9) SessionInitializingAsync(5) SessionClosingAsync(5) SessionInitializingAsync(8) SessionClosingAsync(8) SessionInitializingAsync(1) SessionClosingAsync(1) SessionInitializingAsync(4) SessionClosingAsync(4) SessionInitializingAsync(2) SessionClosingAsync(2) SessionInitializingAsync(6) SessionClosingAsync(6) SessionInitializingAsync(9) SessionClosingAsync(9) SessionInitializingAsync(2) SessionClosingAsync(2) SessionInitializingAsync(2) SessionClosingAsync(2)
Hi, in your output everything works as it should, it seems that the problem occurs only with debugging enabled. When I run this without debugging, everything works fine and the problem does not occur. It's strange but when I add a lock, the problem does not occur even with debugging enabled. Thanks for your help.
I tried running while debugging and see the same output pattern. Given that this isn't occurring with a release build (and I can't repro even in Debug), I'm going to close this out as not being reproducible. Please feel free to reopen if you can provide a repro.
Hi @n-zyma. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text “/unresolve” to remove the “issue-addressed” label and continue the conversation.