pulsar-client-dotnet icon indicating copy to clipboard operation
pulsar-client-dotnet copied to clipboard

Exception thrown if calling Seek earliest multiple times after each other

Open henrikstengaard opened this issue 1 year ago • 2 comments

I'm experiencing issues seeking in a pulsar topic, where I tried to call Seek multiple times for an IConsumer subscribed to the pulsar topic. If I call consumer.SeekAsync(MessageId.Earliest) twice after each other, an NotConnectedException exception with the message Not connected to broker.

Is it the expected behavior or I'm I doing something wrong?

Steps to reproduce:

First I started Pulsar with Docker as described here https://pulsar.apache.org/docs/4.0.x/standalone-docker/:

docker run -it -p 6650:6650  -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:4.0.0 bin/pulsar standalone

Then I ran the following code based on Simple.cs example found in the examples

public static class PulsarClientSeekIssue
{
    public static async Task Reproduce()
    {
        const string serviceUrl = "pulsar://localhost:6650";
        const string subscriptionName = "my-subscription";
        var topicName = $"my-topic-{DateTime.Now.Ticks}";

        var client = await new PulsarClientBuilder()
            .ServiceUrl(serviceUrl)
            .BuildAsync();

        var producer = await client.NewProducer()
            .Topic(topicName)
            .CreateAsync();

        var consumer = await client.NewConsumer()
            .Topic(topicName)
            .SubscriptionName(subscriptionName)
            .SubscribeAsync();

        for (var i = 0; i < 10; i++)
        {
            await Send(producer, $"Sent message {i + 1} from C# at '{DateTime.Now}'");
        }

        await consumer.SeekAsync(MessageId.Earliest);
        
        // Throws exception NotConnectedException "Not connected to broker"
        await consumer.SeekAsync(MessageId.Earliest);

        await Receive(consumer);
    }

    private static async Task<MessageId> Send(IProducer<byte[]> producer, string message)
    {
        var messageId = await producer.SendAsync(Encoding.UTF8.GetBytes(message));

        Console.WriteLine($"Sent message: '{message}'");
        Console.WriteLine($"MessageId: '{messageId}'");
        
        await Task.Delay(500);
        
        return messageId;
    }

    private static async Task<Message<byte[]>> Receive(IConsumer<byte[]> consumer)
    {
        var message = await consumer.ReceiveAsync();
        
        Console.WriteLine($"Received message");
        Console.WriteLine($"PublishTime: {message.PublishTime}");
        Console.WriteLine($"MessageId: '{message.MessageId}'");
        Console.WriteLine($"Data: {Encoding.UTF8.GetString(message.Data)}");
        
        return message;
    }
}

henrikstengaard avatar Nov 22 '24 10:11 henrikstengaard

For comparison, I created a similar test with DotPulsar client and it does not throw any exception:

public static class DotPulsarSeekTest
{
    public static async Task RunSimple()
    {
        const string serviceUrl = "pulsar://localhost:6650";
        const string subscriptionName = "my-subscription";
        var topicName = $"my-topic-{DateTime.Now.Ticks}";

        await using var client = PulsarClient.Builder()
            .Build();                                        // Connecting to pulsar://localhost:6650

        await using var producer = client.NewProducer()
            .Topic($"persistent://public/default/{topicName}")
            .Create();
        
        await using var consumer = client.NewConsumer()
            .SubscriptionName(subscriptionName)
            .Topic($"persistent://public/default/{topicName}")
            .Create();
        
        for (var i = 0; i < 10; i++)
        {
            await Send(producer, $"Sent message {i + 1} from C# at '{DateTime.Now}'");
        }

        await consumer.Seek(MessageId.Earliest);
        
        // no exception thrown
        await consumer.Seek(MessageId.Earliest);
        
        var message = await Receive(consumer);
    }

    private static async Task<MessageId> Send(IProducer<ReadOnlySequence<byte>> producer, string message)
    {
        var messageId = await producer.Send(Encoding.UTF8.GetBytes(message));

        Console.WriteLine($"Sent message: '{message}'");
        Console.WriteLine($"MessageId: '{messageId}'");
        
        await Task.Delay(500);
        
        return messageId;
    }

    private static async Task<IMessage<ReadOnlySequence<byte>>> Receive(IConsumer<ReadOnlySequence<byte>> consumer)
    {
        var message = await consumer.Receive();
        
        Console.WriteLine($"Received message");
        Console.WriteLine($"PublishTime: {message.PublishTime}");
        Console.WriteLine($"MessageId: '{message.MessageId}'");
        Console.WriteLine($"Data: {Encoding.UTF8.GetString(message.Data)}");
        
        return message;
    }
}

henrikstengaard avatar Nov 22 '24 11:11 henrikstengaard

Hi, thank for reporting. Pulsar.Client is a port of Java client library and as far as I can see this feature was implemented relatively recently in PRs one and two, so you can work on implementing this feature and I'll review the PR.

Lanayx avatar Nov 23 '24 21:11 Lanayx