nats.net icon indicating copy to clipboard operation
nats.net copied to clipboard

`Observe` and `ToObservable` for `IEncodedConnection`

Open hypdeb opened this issue 2 years ago • 6 comments

Feature Request

Use Case:

Same use cases as for IConnection.

Proposed Change:

Implement extension methods Observe and ToObservable for IEncodedConnection.

Who Benefits From The Change(s)?

Users of Rx and IEncodedConnection.

Alternative Approaches

There's none I can see. I'm new to NATS, maybe there's something I'm not understanding.

hypdeb avatar Feb 08 '22 21:02 hypdeb

Maybe I can expand a bit on what I was expecting when using an IEncodedConnection with Rx: I would have expected to have a straight forward way of getting an observable of the decoded object directly from the subscription like this

IEncodedConnection connection = this.connectionFactory.CreateEncodedConnection(configuration.Url)
connection.OnDeserialize = data =>
{
  // Deserialization logic, e.g. using protocol buffers.
};
IEncodedAsyncSubscription subscription = connection.SubscribeAsync("Subject");
INATSObservable<object> observable = subscription.ToObservable()
  .Select(decodedObject => {
    // Mapping logic.
  });

Note that in the above, the IEncodedAsyncSubscription type doesn't exist and is what I think would be needed for that to work, as an alternative to IAsyncSubscription.

hypdeb avatar Feb 09 '22 09:02 hypdeb

@hypdeb Can you please review this unit test https://github.com/nats-io/nats.net/blob/master/src/Tests/IntegrationTests/TestEncoding.cs If looks like you can already received encoded messages using the normal async subscription. If you are already aware of this, maybe can you explain why you need to observe a message that you can already receive async?

scottf avatar Feb 09 '22 13:02 scottf

@scottf Thanks for coming back to me on this.

I don't think the test addresses my issue. I know I can receive the decoded messages, what I am missing is a handy way of extracting an observable sequence out of those messages, like it exists for the IConnection using the Observe and ToObservable extension methods defined here https://github.com/nats-io/nats.net/blob/e51e0c940f785ac9105dcb15cf27196f73af3b9d/src/NATS.Client/Rx/RxExtensions.cs#L18

It is possible what I'm trying to do is somehow not smart, therefore let me add a bit more context: I have a messaging service with an interface looking like this:

interface IMessagingService
{
  IObservable<MessageType> ReadMessages();
}

that I would like to implement like this:

class NatsMessagingService
{
  IObservable<MessageType> ReadMessages()
  {
    IEncodedConnection connection = this.connectionFactory.CreateEncodedConnection(configuration.Url)
    connection.OnDeserialize = data =>
    {
      // Deserialization logic, e.g. using protocol buffers.
    };
    IEncodedAsyncSubscription subscription = connection.SubscribeAsync("Subject");
    INATSObservable<MessageType> observable = subscription.ToObservable()
      .Select(decodedObject => {
        // Mapping logic.
      });
    return observable;
  }
}

I cannot do that at this time because there is no ToObservable or Observe extension methods for IEncodedConnection and the ToObservable method defined on IAsyncSubscription produces an observable sequence of Msg, which doesn't contain the decoded message (as far as I can see), but only the byte[].

hypdeb avatar Feb 09 '22 16:02 hypdeb

I think you might need some or an entire duplication of the RX package just for encoded connection. Maybe / maybe not, this is an area in the .NET client that I really don't use. You are welcome to fork the repo and propose a solution.

scottf avatar Feb 09 '22 17:02 scottf

The Rx stuff is actually okay, the issue is this:

https://github.com/nats-io/nats.net/blob/e51e0c940f785ac9105dcb15cf27196f73af3b9d/src/NATS.Client/AsyncSub.cs#L37

What is missing is a subscription with and EncodedMessageEventArgs instead of MsgHandlerEventArgs.

Which part of the code are you referring to when you say that you are not using it ? Encoded connections or Rx ? Would you know of someone who is using that who could have a look ?

hypdeb avatar Feb 10 '22 07:02 hypdeb

I am noticing a further discrepancy between the functionality supported by IConnection and IEncodedConnection, in that IEncodedConnection doesn't support CreateJetStreamContext. I think I will have to somehow make due with IConnection as it is too painful to use IEncodedConnection at this stage.

hypdeb avatar Mar 30 '22 17:03 hypdeb