pulsar-client-go icon indicating copy to clipboard operation
pulsar-client-go copied to clipboard

Add Support for NonDurable subscriptions

Open frankjkelly opened this issue 3 years ago • 11 comments

Is your feature request related to a problem? Please describe. The Java API supports Non-Durable Subscriptions via SubscriptionMode https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java#L243-L256

but it appears that the Go client does not https://github.com/apache/pulsar-client-go/blob/25f3075176f853f5f09d89b3b625e0646432735f/pulsar/consumer.go#L76

This is useful in our use case where we have lots of topics with short-lived subscriptions that last minutes to hours and then once data is read the topic is no longer needed. To ensure that data is compacted in the bookies currently we have to set the subscription expiration time at the namespace level

Describe the solution you'd like Please add support for Subscription Mode

Describe alternatives you've considered Continue to use subscription expiration time

Additional context Add any other context or screenshots about the feature request here.

frankjkelly avatar Feb 16 '21 21:02 frankjkelly

@frankjkelly The go client support reader API which is based on the non-durable subscription. Is it works for you?

codelipenghui avatar Feb 18 '21 03:02 codelipenghui

Thanks - I guess that's not what I think we're seeing using the 0.3.0 Go Client library with Pulsar 2.6.1

In the image below taken from our Pulsar Grafana dashboard hopefully you can see our Consumers (on the cogito-dialog\wav namespace are short-lived) but the count of subscriptions on that namespace lasts much longer and the count of subscribers only goes down at the 1 hour mark since we have set a subscription expiration time on that namespace (largely so we can allow the ledger to rollover and disk to be reclaimed).

image

Here is how we have configured our consumer

	consumer, err := p.client.Subscribe(pulsar.ConsumerOptions{
		Topic:                       topic,
		SubscriptionName:            subscriptionName,
		SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
		Type:                        pulsar.Exclusive,
	})

frankjkelly avatar Feb 18 '21 17:02 frankjkelly

@codelipenghui @wolfstudy wondering if I am misunderstanding something with the Go client - perhaps my ConsumerOptions are wrong (see above) or there is a bug in the Broker side counts of open subscriptions but it does appear that the Go client uses Durable subscriptions? Thanks in advance!

frankjkelly avatar Mar 05 '21 14:03 frankjkelly

To add some context, @frankjkelly and I spoke in the Pulsar Slack, and he mentioned that he misunderstood @codelipenghui's recommendation and that a Reader might work for him. Whether there's still a need for a non-durable subscription with a plain consumer, I can't say. If there's effectively no difference between this and a reader, I'd probably say the issue should be closed.

flowchartsman avatar Apr 14 '21 13:04 flowchartsman

I think it would be nice to have equivalence with the Java API which supports NonDurable subscriptions on both the Reader and Consumer interfaces

https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionMode.java#L35

frankjkelly avatar Nov 17 '21 20:11 frankjkelly

Feature parity alone is not a compelling reason for me, since it duplicates functionality. Re-reading your use-case, it seems that you might even benefit more from a traditional consumer with aggressive subscription/topic removal policies. This way, at least, you know that if your consumer has a hiccough and needs to restart it won't have to start over, but that the data will still be deleted in a timely manner once the subscription is caught up.

flowchartsman avatar Nov 18 '21 16:11 flowchartsman

@flowchartsman I'm picking this up from @frankjkelly and trying to implement the Reader interface. However, I'm unsure how to implement it for our use case. Essentially, we just have a worker than wants to read a topic from the beginning to the "end" where the end is indicated by a message with an end-of-stream property. If this worker dies, the "job" will just be picked up by another worker which will start reading from the beginning of the stream. Now we need to handle reading the stream while a publisher is writing to it in real-time and we may also connect to the topic to read before the publisher does.

What I'm unclear on is how to use the reader.HasNext() and reader.Next(ctx) interface. Under what conditions will HasNext() return false? Presumably any time there are no unread messages? If HasNext() returns false, what should the reader routine do? sleep for some amount of time? We want to read with as minimal latency as possible so we'd like to avoid unnecessary sleeps.

With the Consumer interface, we were able to simply call consumer.Receive(ctx) and it would block until the next message was available. Is there a way to achieve that while using the Reader for a non-durable subscription?

Thanks!

apapia avatar Jan 12 '22 22:01 apapia

HasNext() gives you the ability to bail when you're caught up. If you just loop on reader.Next(ctx), you should get the behavior you desire: the client should block until it gets new messages and then process them. You can then choose to bail on whatever property you want. That said, reading until HasNext() is false, will also get you to the "end", though it sounds like what you're doing isn't looking for the last message, but a "last" message of your own designation, in which case your ending condition is up to you: you can inspect the messages, bail after a certain amount of time waiting (with a ctx timeout) or whatever you like.

flowchartsman avatar Jan 12 '22 23:01 flowchartsman

excellent thanks for the explainer @flowchartsman!

apapia avatar Jan 12 '22 23:01 apapia

@apapia no problem. If the issue is now resolved, please feel free to close the issue, thanks!

flowchartsman avatar Jan 13 '22 21:01 flowchartsman

@flowchartsman Hi, with the reader interface, we will always have to manage the discovery of the topic partitions, and also regex consumer won't work. On top of that the backlog also won't show up in the monitoring stats.

That's why imho, a NonDurable subscription is still meaningful feature.

yangou avatar Oct 04 '22 04:10 yangou

At my company, we use Pulsar in an enterprise application that has requirements that my team cannot control. We cannot use "aggressive subscription/topic removal policies" as there are many other requirements that control those policies. We also cannot easily use Reader model as it is very hard to scale a Reader if you want to replicate a Consumer's "shared" subscription model, scaling so that multiple consumers on the subscription will share the load of messages. We cannot just re-code in Java, we are using Golang, which is why we need the feature in Golang client.

In our use case, Pulsar is processing a very large amount of data in throughput. It is important to us that if something goes wrong with our component where it is no longer able to listen/ack on its subscription, the subscription does not persist and cause a backlog of messages that could bring the enterprise level application down. Making our subscriptions non-durable seems like the perfect solution, but we can't use it because the Golang pulsar client doesn't support it.

dinghram avatar Mar 16 '23 15:03 dinghram

@frankjkelly @yangou @apapia @codelipenghui Take a look at the attached PR. It exposes the Reader's non-durable attribute to Consumer. This works well in all my tests, and my company is now using this implementation in an enterprise application. I do not know how to get the PR some attention from reviewers.

dinghram avatar Jun 01 '23 15:06 dinghram