azure-sdk-for-net icon indicating copy to clipboard operation
azure-sdk-for-net copied to clipboard

[BUG] ReceiveAndDelete interrupts PeekLock

Open dzendras opened this issue 2 years ago • 1 comments

Library name and version

Azure.Messaging.ServiceBus 7.12.0

Describe the bug

Consider the following sample. It starts by receiving and deleting all messages from a given topic & subscription. Then it sends a message to the topic. Then it attempts to peek lock the message. Finally it receives and deletes all messages (clean up).

static void Log(string message) => Console.WriteLine($"[{DateTime.Now:T}] {message}");

var administrationClient = new ServiceBusAdministrationClient(administrationConnectionString);

if (!await administrationClient.TopicExistsAsync(topicName))
{
    await administrationClient.CreateTopicAsync(new CreateTopicOptions(topicName)
    {
        AutoDeleteOnIdle = TimeSpan.FromMinutes(10)
    });
    Log($"Topic {topicName} created");
    await administrationClient.CreateSubscriptionAsync(topicName, subscriptionName);
    Log($"Subscription {topicName}/{subscriptionName} created");
}

var serviceBusClient = new ServiceBusClient(sendReceiveConnectionString);

await using ServiceBusSender sender = serviceBusClient.CreateSender(topicName);
await using ServiceBusReceiver receiver = serviceBusClient.CreateReceiver(topicName, subscriptionName, new ServiceBusReceiverOptions
{
    ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete
});
await using ServiceBusReceiver peekLocker = serviceBusClient.CreateReceiver(topicName, subscriptionName, new ServiceBusReceiverOptions
{
    ReceiveMode = ServiceBusReceiveMode.PeekLock
});

async Task ReceiveAll()
{
    IReadOnlyList<ServiceBusReceivedMessage> messages;
    do
    {
        messages = await receiver.ReceiveMessagesAsync(maxMessages: 10, maxWaitTime: TimeSpan.FromSeconds(3));
        foreach (var serviceBusReceivedMessage in messages)
        {
            Log($"Received: {serviceBusReceivedMessage.Body}");
        }
    } while (messages.Count > 0);
    Log("Receiving completed.");
}

await ReceiveAll(); //comment me out for the PeekLock to succeed

var messageBody = $"Message {DateTime.Now:T}";
await sender.SendMessageAsync(new ServiceBusMessage(messageBody));
Log($"{messageBody} sent");
var message = await peekLocker.ReceiveMessageAsync(TimeSpan.FromSeconds(15));
if (message != null)
{
    Log($"{message.Body} received. MessageId = {message.MessageId}, LockToken = {message.LockToken}, LockedUntil = {message.LockedUntil}");
}
else
{
    Log("No message received using PeekLock");
}

await ReceiveAll();

Expected behavior

[16:32:05] Receiving completed. [16:32:06] Message 16:32:05 sent [16:32:06] Message 16:25:34 received. MessageId = 60217e3b779d400ca7a6aa8199b5615b, LockToken = ea02213a-d761-410b-83c8-78cbfec4b8a5, LockedUntil = 10.02.2023 15:33:07 +00:00 [16:32:07] Received: Message 16:32:05 [16:32:10] Receiving completed.

It can be achieved by commenting out the first call to ReceiveAll. It appears that the fact that receiver has been called, makes it claim the next message sent, even though that I'm done with it for now and I expect the peekLocker to be the one receiving it.

Actual behavior

[16:33:30] Receiving completed. [16:33:30] Message 16:33:30 sent [16:33:46] No message received using PeekLock [16:33:46] Received: Message 16:33:30 [16:33:49] Receiving completed.

Reproduction Steps

Run the sample.

Environment

Sample running against .NET 6, Windows 10 x64. Also observed in Azure App Service hosting a .NET 4.8 app.

dzendras avatar Feb 10 '23 15:02 dzendras

Thank you for your feedback. Tagging and routing to the team member best able to assist.

jsquire avatar Feb 10 '23 16:02 jsquire

This is expected behavior do to the fact that your sample is not closing the ReceiveAndDelete receiver in ReceiveAll. For context, in order to optimize throughput, when a certain number of messages is requested and a smaller number is returned, the "request" for the difference of these messages remains active on the receiving link so long as the link remains open, which is represented in library terms as the ServiceBusReceiver. Once those messages arrive in your queue/topic, they are sent along that active link, even though you may not be actively receiving at that moment in time. Because you are receiving in ReceiveAndDelete mode, the messages are deleted by the service when they are sent. In order to avoid this behavior, you would need to adjust your code as follows:

async Task ReceiveAll()
{
    IReadOnlyList<ServiceBusReceivedMessage> messages;
    do
    {
        messages = await receiver.ReceiveMessagesAsync(maxMessages: 10, maxWaitTime: TimeSpan.FromSeconds(3));
        foreach (var serviceBusReceivedMessage in messages)
        {
            Log($"Received: {serviceBusReceivedMessage.Body}");
        }
    } while (messages.Count > 0);
    Log("Receiving completed.");
    
    // Close the receiver to prevent newly arriving messages to be sent along this link
    await receiver.CloseAsync();
}

JoshLove-msft avatar Feb 16 '23 01:02 JoshLove-msft

Hi @dzendras. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text “/unresolve” to remove the “issue-addressed” label and continue the conversation.

ghost avatar Feb 16 '23 01:02 ghost

Thank you for your explanation!

After closing the receiver like you suggested, the second call to ReceiveAll results in ObjectDisposedException: : 'ServiceBusReceiver has already been closed and cannot perform the requested operation. Object name: 'ServiceBusReceiver'.' So I guess, I should create and dispose of the receiver inside ReceiveAll() as follows:

async Task ReceiveAll()
{
    await using ServiceBusReceiver receiver = serviceBusClient.CreateReceiver(topicName, subscriptionName, new ServiceBusReceiverOptions
    {
        ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete
    });
    IReadOnlyList<ServiceBusReceivedMessage> messages;
    do
    {
        messages = await receiver.ReceiveMessagesAsync(maxMessages: 10, maxWaitTime: TimeSpan.FromSeconds(3));
        foreach (var serviceBusReceivedMessage in messages)
        {
            Log($"Received: {serviceBusReceivedMessage.Body}");
        }
    } while (messages.Count > 0);
    Log("Receiving completed.");
}

dzendras avatar Feb 16 '23 06:02 dzendras