azure-sdk-for-net icon indicating copy to clipboard operation
azure-sdk-for-net copied to clipboard

[BUG] ProcessSessionMessageEventArgs.ReleaseSession does not work correctly

Open n-zyma opened this issue 2 years ago • 6 comments

Library name and version

Azure.Messaging.ServiceBus 7.11.1

Describe the bug

If you call the ReleaseSession() on multiple threads for the same session (using the ServiceBusSessionProcessor with the MaxConcurrentCallsPerSession > 1) the session does not close and the processor continues to process messages in the current session. Is this expected behavior? Adding a lock solves the problem.

Expected behavior

The processor will close the current session after processing all already received messages.

Actual behavior

The processor does not close the session and continues to process messages ignoring the ReleaseSession() call.

Reproduction Steps

var client = new ServiceBusClient(someConnectionString);
var processor = client.CreateSessionProcessor(someQueueName, new ServiceBusSessionProcessorOptions() { MaxConcurrentSessions = 1, MaxConcurrentCallsPerSession = 3, AutoCompleteMessages = false });
processor.ProcessMessageAsync += Processor_ProcessMessageAsync;

...

async Task Processor_ProcessMessageAsync(ProcessSessionMessageEventArgs arg)
{
	// lock (someLock) solves the problem
	arg.ReleaseSession(); // this call has no effect
	await Task.Delay(TimeSpan.FromSeconds(5));
	await arg.CompleteMessageAsync(arg.Message);
}

Environment

No response

n-zyma avatar Dec 22 '22 10:12 n-zyma

I'm not an expert there. Just sharing my observations.

I think the session is attempted to cancel but the release operation doesn't await the outcome of the internal cancellation. This means the method returns even though the actual cancellation is not yet done which means it can still take a while until the actual cancellation is observed.

Furthermore the documentation of ReleaseSession mentions

The session may end up being reopened for processing immediately after closing if there are messages remaining in the session (This depends on what other session messages may be in the queue or subscription).

Synchronizing within the handler adds global synchronization for all the concurrently executed handlers which would explain that the effects of the cancellation are seen sequentially.

There might be a problem though with the implementation

        public override async Task CancelAsync()
        {
            if (_sessionCancellationSource is { IsCancellationRequested: false })
            {
                _sessionCancellationSource.Cancel();
                _sessionCancellationSource.Dispose();
            }

            if (_sessionLockRenewalTask != null)
            {
                await _sessionLockRenewalTask.ConfigureAwait(false);
            }
        }

two concurrent releases would attempt to dispose the same token source which could cause issues. I think the safest approach there would be to acquire the semaphore like CloseReceiverCore does. So basically something like

            bool releaseSemaphore = false;
            try
            {
                // Intentionally not including cancellation token as
                // we need to ensure that we at least attempt to close the receiver if needed.
                await WaitSemaphore(CancellationToken.None).ConfigureAwait(false);

if (_sessionCancellationSource is { IsCancellationRequested: false })
            {
                _sessionCancellationSource.Cancel();
                _sessionCancellationSource.Dispose();
            }

            if (_sessionLockRenewalTask != null)
            {
                await _sessionLockRenewalTask.ConfigureAwait(false);
            }

            finally
            {
                if (releaseSemaphore)
                {
                    _semaphore.Release();
                }
            }

danielmarbach avatar Dec 22 '22 14:12 danielmarbach

Thank you for your feedback. Tagging and routing to the team member best able to assist. Please be aware that responses will be delayed due to the US holidays.

jsquire avatar Dec 22 '22 16:12 jsquire

As @danielmarbach mentioned, the session is not closed immediately, but rather all in-flight processing for the current session is allowed to complete after which point the session is closed. So for messages that have already been dispatched to your handler, they will continue to be processed in your handler. But you shouldn't see new messages coming in for the session you have released (unless the session has already been closed and reopened).

Can you clarify the exact behavior that you are seeing and what you are actually expecting?

JoshLove-msft avatar Jan 05 '23 05:01 JoshLove-msft

There might be a problem though with the implementation

Interesting.. I think you are right and an issue could arise if two threads enter the if block simultaneously (would need to be simultaneous as otherwise the IsCancellationRequested check would prevent the condition from being true), and one thread cancels and disposes before the other thread attempts to cancel. In that case we'd get an ObjectDisposedException, so apparently not what is being seen here. Still it would be good to patch this up. I think we can probably just use a lock within the if block rather than a sempahoreSlim as we have no need for any async operations.

JoshLove-msft avatar Jan 05 '23 05:01 JoshLove-msft

In the first case, the handler continues to process messages and ReleaseSession() has no effect. The session is only closed when the handler has processed all messages in the queue for this session.

var client = new ServiceBusClient(someConnectionString);
var processor = client.CreateSessionProcessor(someQueueName, new ServiceBusSessionProcessorOptions() { MaxConcurrentSessions = 1, MaxConcurrentCallsPerSession = 3, AutoCompleteMessages = false });
processor.ProcessMessageAsync += Processor_ProcessMessageAsync;

...

async Task Processor_ProcessMessageAsync(ProcessSessionMessageEventArgs arg)
{
	await Task.Delay(TimeSpan.FromSeconds(5));
	arg.ReleaseSession();
	await arg.CompleteMessageAsync(arg.Message);
}

In the second case, the session is closed as soon as all received messages have been processed. After that, the next session (or the same session) will be received and so on.

var client = new ServiceBusClient(someConnectionString);
var processor = client.CreateSessionProcessor(someQueueName, new ServiceBusSessionProcessorOptions() { MaxConcurrentSessions = 1, MaxConcurrentCallsPerSession = 3, AutoCompleteMessages = false });
processor.ProcessMessageAsync += Processor_ProcessMessageAsync;

...

async Task Processor_ProcessMessageAsync(ProcessSessionMessageEventArgs arg)
{
	await Task.Delay(TimeSpan.FromSeconds(5));
	lock (someLock)
	{
		arg.ReleaseSession();
	}
	await arg.CompleteMessageAsync(arg.Message);
}

n-zyma avatar Jan 05 '23 12:01 n-zyma

Can you provide an end-to-end repro including sending messages? I have not been able to reproduce what you are seeing.

JoshLove-msft avatar Jan 05 '23 18:01 JoshLove-msft

Here is the link to the repository. If you run it and enter the number 1 then the program will send 100 messages to the queue with random sessionId (from 0 to 9), if you enter any other number the program will start processing messages. The problem doesn't always happen, repeatability is 2/5.

Adding a lock solves the problem.

private async Task ProcessMessageAsync(ProcessSessionMessageEventArgs arg)
		{
			await Task.Delay(TimeSpan.FromSeconds(2));
			lock (_lock)
				arg.ReleaseSession();
			await arg.CompleteMessageAsync(arg.Message);
		}

n-zyma avatar Jan 11 '23 13:01 n-zyma

UP

n-zyma avatar Jan 23 '23 11:01 n-zyma

Hi @n-zyma, I am running your repro code. Can you clarify how I can tell if the issue is occurring? Here is the output I am seeing:

Enter 1 to write or enter any other number to read: 2
Press any key to stop reading
SessionInitializingAsync(1)
SessionClosingAsync(1)
SessionInitializingAsync(4)
SessionClosingAsync(4)
SessionInitializingAsync(0)
SessionClosingAsync(0)
SessionInitializingAsync(2)
SessionClosingAsync(2)
SessionInitializingAsync(6)
SessionClosingAsync(6)
SessionInitializingAsync(3)
SessionClosingAsync(3)
SessionInitializingAsync(7)
SessionClosingAsync(7)
SessionInitializingAsync(9)
SessionClosingAsync(9)
SessionInitializingAsync(5)
SessionClosingAsync(5)
SessionInitializingAsync(8)
SessionClosingAsync(8)
SessionInitializingAsync(1)
SessionClosingAsync(1)
SessionInitializingAsync(4)
SessionClosingAsync(4)
SessionInitializingAsync(0)
SessionClosingAsync(0)
SessionInitializingAsync(2)
SessionClosingAsync(2)
SessionInitializingAsync(6)
SessionClosingAsync(6)
SessionInitializingAsync(3)
SessionClosingAsync(3)
SessionInitializingAsync(7)
SessionClosingAsync(7)
SessionInitializingAsync(9)
SessionClosingAsync(9)
SessionInitializingAsync(5)
SessionClosingAsync(5)
SessionInitializingAsync(8)
SessionClosingAsync(8)
SessionInitializingAsync(1)
SessionClosingAsync(1)
SessionInitializingAsync(4)
SessionClosingAsync(4)
SessionInitializingAsync(2)
SessionClosingAsync(2)
SessionInitializingAsync(6)
SessionClosingAsync(6)
SessionInitializingAsync(3)
SessionClosingAsync(3)
SessionInitializingAsync(7)
SessionClosingAsync(7)
SessionInitializingAsync(9)
SessionClosingAsync(9)
SessionInitializingAsync(5)
SessionClosingAsync(5)
SessionInitializingAsync(8)
SessionClosingAsync(8)
SessionInitializingAsync(1)
SessionClosingAsync(1)
SessionInitializingAsync(4)
SessionClosingAsync(4)
SessionInitializingAsync(2)
SessionClosingAsync(2)
SessionInitializingAsync(6)
SessionClosingAsync(6)
SessionInitializingAsync(9)
SessionClosingAsync(9)
SessionInitializingAsync(2)
SessionClosingAsync(2)
SessionInitializingAsync(2)
SessionClosingAsync(2)

JoshLove-msft avatar Jan 23 '23 19:01 JoshLove-msft

Hi @n-zyma, I am running your repro code. Can you clarify how I can tell if the issue is occurring? Here is the output I am seeing:

Enter 1 to write or enter any other number to read: 2
Press any key to stop reading
SessionInitializingAsync(1)
SessionClosingAsync(1)
SessionInitializingAsync(4)
SessionClosingAsync(4)
SessionInitializingAsync(0)
SessionClosingAsync(0)
SessionInitializingAsync(2)
SessionClosingAsync(2)
SessionInitializingAsync(6)
SessionClosingAsync(6)
SessionInitializingAsync(3)
SessionClosingAsync(3)
SessionInitializingAsync(7)
SessionClosingAsync(7)
SessionInitializingAsync(9)
SessionClosingAsync(9)
SessionInitializingAsync(5)
SessionClosingAsync(5)
SessionInitializingAsync(8)
SessionClosingAsync(8)
SessionInitializingAsync(1)
SessionClosingAsync(1)
SessionInitializingAsync(4)
SessionClosingAsync(4)
SessionInitializingAsync(0)
SessionClosingAsync(0)
SessionInitializingAsync(2)
SessionClosingAsync(2)
SessionInitializingAsync(6)
SessionClosingAsync(6)
SessionInitializingAsync(3)
SessionClosingAsync(3)
SessionInitializingAsync(7)
SessionClosingAsync(7)
SessionInitializingAsync(9)
SessionClosingAsync(9)
SessionInitializingAsync(5)
SessionClosingAsync(5)
SessionInitializingAsync(8)
SessionClosingAsync(8)
SessionInitializingAsync(1)
SessionClosingAsync(1)
SessionInitializingAsync(4)
SessionClosingAsync(4)
SessionInitializingAsync(2)
SessionClosingAsync(2)
SessionInitializingAsync(6)
SessionClosingAsync(6)
SessionInitializingAsync(3)
SessionClosingAsync(3)
SessionInitializingAsync(7)
SessionClosingAsync(7)
SessionInitializingAsync(9)
SessionClosingAsync(9)
SessionInitializingAsync(5)
SessionClosingAsync(5)
SessionInitializingAsync(8)
SessionClosingAsync(8)
SessionInitializingAsync(1)
SessionClosingAsync(1)
SessionInitializingAsync(4)
SessionClosingAsync(4)
SessionInitializingAsync(2)
SessionClosingAsync(2)
SessionInitializingAsync(6)
SessionClosingAsync(6)
SessionInitializingAsync(9)
SessionClosingAsync(9)
SessionInitializingAsync(2)
SessionClosingAsync(2)
SessionInitializingAsync(2)
SessionClosingAsync(2)

Hi, in your output everything works as it should, it seems that the problem occurs only with debugging enabled. When I run this without debugging, everything works fine and the problem does not occur. It's strange but when I add a lock, the problem does not occur even with debugging enabled. Thanks for your help.

n-zyma avatar Jan 24 '23 08:01 n-zyma

I tried running while debugging and see the same output pattern. Given that this isn't occurring with a release build (and I can't repro even in Debug), I'm going to close this out as not being reproducible. Please feel free to reopen if you can provide a repro.

JoshLove-msft avatar Jan 24 '23 18:01 JoshLove-msft

Hi @n-zyma. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text “/unresolve” to remove the “issue-addressed” label and continue the conversation.

ghost avatar Jan 24 '23 18:01 ghost