nats.net
nats.net copied to clipboard
`Observe` and `ToObservable` for `IEncodedConnection`
Feature Request
Use Case:
Same use cases as for IConnection.
Proposed Change:
Implement extension methods Observe and ToObservable for IEncodedConnection.
Who Benefits From The Change(s)?
Users of Rx and IEncodedConnection.
Alternative Approaches
There's none I can see. I'm new to NATS, maybe there's something I'm not understanding.
Maybe I can expand a bit on what I was expecting when using an IEncodedConnection with Rx: I would have expected to have a straight forward way of getting an observable of the decoded object directly from the subscription like this
IEncodedConnection connection = this.connectionFactory.CreateEncodedConnection(configuration.Url)
connection.OnDeserialize = data =>
{
// Deserialization logic, e.g. using protocol buffers.
};
IEncodedAsyncSubscription subscription = connection.SubscribeAsync("Subject");
INATSObservable<object> observable = subscription.ToObservable()
.Select(decodedObject => {
// Mapping logic.
});
Note that in the above, the IEncodedAsyncSubscription type doesn't exist and is what I think would be needed for that to work, as an alternative to IAsyncSubscription.
@hypdeb Can you please review this unit test https://github.com/nats-io/nats.net/blob/master/src/Tests/IntegrationTests/TestEncoding.cs If looks like you can already received encoded messages using the normal async subscription. If you are already aware of this, maybe can you explain why you need to observe a message that you can already receive async?
@scottf Thanks for coming back to me on this.
I don't think the test addresses my issue. I know I can receive the decoded messages, what I am missing is a handy way of extracting an observable sequence out of those messages, like it exists for the IConnection using the Observe and ToObservable extension methods defined here
https://github.com/nats-io/nats.net/blob/e51e0c940f785ac9105dcb15cf27196f73af3b9d/src/NATS.Client/Rx/RxExtensions.cs#L18
It is possible what I'm trying to do is somehow not smart, therefore let me add a bit more context: I have a messaging service with an interface looking like this:
interface IMessagingService
{
IObservable<MessageType> ReadMessages();
}
that I would like to implement like this:
class NatsMessagingService
{
IObservable<MessageType> ReadMessages()
{
IEncodedConnection connection = this.connectionFactory.CreateEncodedConnection(configuration.Url)
connection.OnDeserialize = data =>
{
// Deserialization logic, e.g. using protocol buffers.
};
IEncodedAsyncSubscription subscription = connection.SubscribeAsync("Subject");
INATSObservable<MessageType> observable = subscription.ToObservable()
.Select(decodedObject => {
// Mapping logic.
});
return observable;
}
}
I cannot do that at this time because there is no ToObservable or Observe extension methods for IEncodedConnection and the ToObservable method defined on IAsyncSubscription produces an observable sequence of Msg, which doesn't contain the decoded message (as far as I can see), but only the byte[].
I think you might need some or an entire duplication of the RX package just for encoded connection. Maybe / maybe not, this is an area in the .NET client that I really don't use. You are welcome to fork the repo and propose a solution.
The Rx stuff is actually okay, the issue is this:
https://github.com/nats-io/nats.net/blob/e51e0c940f785ac9105dcb15cf27196f73af3b9d/src/NATS.Client/AsyncSub.cs#L37
What is missing is a subscription with and EncodedMessageEventArgs instead of MsgHandlerEventArgs.
Which part of the code are you referring to when you say that you are not using it ? Encoded connections or Rx ? Would you know of someone who is using that who could have a look ?
I am noticing a further discrepancy between the functionality supported by IConnection and IEncodedConnection, in that IEncodedConnection doesn't support CreateJetStreamContext. I think I will have to somehow make due with IConnection as it is too painful to use IEncodedConnection at this stage.