CAP icon indicating copy to clipboard operation
CAP copied to clipboard

NATS feature: add the possibility to disable dynamic consumer subject/topic creation through NatsCapOtpions

Open davte-beijer opened this issue 1 year ago • 2 comments

Currently NATS clients will create any missing topics of they are not configured on a stream through NATSConsumerClient.FetchTopics. This requires extra permissions for the client to be configured in the NATS server configuration. e.g:

"permissions": {
   "publish": {
      "JS.API.STREAM.INFO.VolatileStream"
      "JS.API.STREAM.CREATE.VolatileStream.*"
      "JS.API.STREAM.UPDATE.VolatileStream.*"
   }
}

For use cases when you want to limit the permissions required for a nats client, by having a centralized solution for creating streams & topics, then allowing dynamic creation by clients is not feasible as it currently enforces a client to have the publish permissions specified above.

Therefore I propose to extended the NATSOptions with a new property EnableSubscriberClientStreamAndTopicCreation with default value true in order for the change to be backwards compatible.

Pseudo code:

public class NATSOptions
{
   ...
    /// <summary>
    /// Allows a nats client to dynamically create a stream and configure its expected topics.
    /// </summary>
    public bool EnableSubscriberClientStreamAndTopicCreation {get; set;} = true;
    ...
}

This option would then be used in the NATSConsumerClient.FetchTopics method like the following in order to avoid doing calls requiring described client publish permissions via the the JetsManagementContext class.

public ICollection<string> FetchTopics(IEnumerable<string> topicNames)
{
    if (_natsOptions.EnableSubscriberClientStreamAndTopicCreation)
    {
        Connect();

        var jsm = _consumerClient!.CreateJetStreamManagementContext();

        var streamGroup = topicNames.GroupBy(x => _natsOptions.NormalizeStreamName(x));

        foreach (var subjectStream in streamGroup)
        {
            var builder = StreamConfiguration.Builder()
                .WithName(subjectStream.Key)
                .WithNoAck(false)
                .WithStorageType(StorageType.Memory)
                .WithSubjects(subjectStream.ToList());

            _natsOptions.StreamOptions?.Invoke(builder);

            try
            {
                jsm.GetStreamInfo(subjectStream.Key); // this throws if the stream does not exist

                jsm.UpdateStream(builder.Build());
            }
            catch (NATSJetStreamException)
            {
                try
                {
                    jsm.AddStream(builder.Build());
                }
                catch
                {
                    // ignored
                }
            }
        }
    }

    return topicNames.ToList();
}

davte-beijer avatar Jun 16 '24 20:06 davte-beijer

Hi @davte-beijer,

Looks good, Would you want to submit a PR?

yang-xiaodong avatar Jun 18 '24 02:06 yang-xiaodong

Will do!

davidterins avatar Jun 29 '24 17:06 davidterins

Fixed in version 8.3.0-preview-243613753

yang-xiaodong avatar Sep 20 '24 14:09 yang-xiaodong