pulsar-client-dotnet
pulsar-client-dotnet copied to clipboard
Exception thrown if calling Seek earliest multiple times after each other
I'm experiencing issues seeking in a pulsar topic, where I tried to call Seek multiple times for an IConsumer subscribed to the pulsar topic. If I call consumer.SeekAsync(MessageId.Earliest) twice after each other, an NotConnectedException exception with the message Not connected to broker.
Is it the expected behavior or I'm I doing something wrong?
Steps to reproduce:
First I started Pulsar with Docker as described here https://pulsar.apache.org/docs/4.0.x/standalone-docker/:
docker run -it -p 6650:6650 -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:4.0.0 bin/pulsar standalone
Then I ran the following code based on Simple.cs example found in the examples
public static class PulsarClientSeekIssue
{
public static async Task Reproduce()
{
const string serviceUrl = "pulsar://localhost:6650";
const string subscriptionName = "my-subscription";
var topicName = $"my-topic-{DateTime.Now.Ticks}";
var client = await new PulsarClientBuilder()
.ServiceUrl(serviceUrl)
.BuildAsync();
var producer = await client.NewProducer()
.Topic(topicName)
.CreateAsync();
var consumer = await client.NewConsumer()
.Topic(topicName)
.SubscriptionName(subscriptionName)
.SubscribeAsync();
for (var i = 0; i < 10; i++)
{
await Send(producer, $"Sent message {i + 1} from C# at '{DateTime.Now}'");
}
await consumer.SeekAsync(MessageId.Earliest);
// Throws exception NotConnectedException "Not connected to broker"
await consumer.SeekAsync(MessageId.Earliest);
await Receive(consumer);
}
private static async Task<MessageId> Send(IProducer<byte[]> producer, string message)
{
var messageId = await producer.SendAsync(Encoding.UTF8.GetBytes(message));
Console.WriteLine($"Sent message: '{message}'");
Console.WriteLine($"MessageId: '{messageId}'");
await Task.Delay(500);
return messageId;
}
private static async Task<Message<byte[]>> Receive(IConsumer<byte[]> consumer)
{
var message = await consumer.ReceiveAsync();
Console.WriteLine($"Received message");
Console.WriteLine($"PublishTime: {message.PublishTime}");
Console.WriteLine($"MessageId: '{message.MessageId}'");
Console.WriteLine($"Data: {Encoding.UTF8.GetString(message.Data)}");
return message;
}
}
For comparison, I created a similar test with DotPulsar client and it does not throw any exception:
public static class DotPulsarSeekTest
{
public static async Task RunSimple()
{
const string serviceUrl = "pulsar://localhost:6650";
const string subscriptionName = "my-subscription";
var topicName = $"my-topic-{DateTime.Now.Ticks}";
await using var client = PulsarClient.Builder()
.Build(); // Connecting to pulsar://localhost:6650
await using var producer = client.NewProducer()
.Topic($"persistent://public/default/{topicName}")
.Create();
await using var consumer = client.NewConsumer()
.SubscriptionName(subscriptionName)
.Topic($"persistent://public/default/{topicName}")
.Create();
for (var i = 0; i < 10; i++)
{
await Send(producer, $"Sent message {i + 1} from C# at '{DateTime.Now}'");
}
await consumer.Seek(MessageId.Earliest);
// no exception thrown
await consumer.Seek(MessageId.Earliest);
var message = await Receive(consumer);
}
private static async Task<MessageId> Send(IProducer<ReadOnlySequence<byte>> producer, string message)
{
var messageId = await producer.Send(Encoding.UTF8.GetBytes(message));
Console.WriteLine($"Sent message: '{message}'");
Console.WriteLine($"MessageId: '{messageId}'");
await Task.Delay(500);
return messageId;
}
private static async Task<IMessage<ReadOnlySequence<byte>>> Receive(IConsumer<ReadOnlySequence<byte>> consumer)
{
var message = await consumer.Receive();
Console.WriteLine($"Received message");
Console.WriteLine($"PublishTime: {message.PublishTime}");
Console.WriteLine($"MessageId: '{message.MessageId}'");
Console.WriteLine($"Data: {Encoding.UTF8.GetString(message.Data)}");
return message;
}
}