nats.net
nats.net copied to clipboard
`Observe` and `ToObservable` for `IEncodedConnection`
Feature Request
Use Case:
Same use cases as for IConnection
.
Proposed Change:
Implement extension methods Observe
and ToObservable
for IEncodedConnection
.
Who Benefits From The Change(s)?
Users of Rx
and IEncodedConnection
.
Alternative Approaches
There's none I can see. I'm new to NATS, maybe there's something I'm not understanding.
Maybe I can expand a bit on what I was expecting when using an IEncodedConnection
with Rx
: I would have expected to have a straight forward way of getting an observable of the decoded object directly from the subscription like this
IEncodedConnection connection = this.connectionFactory.CreateEncodedConnection(configuration.Url)
connection.OnDeserialize = data =>
{
// Deserialization logic, e.g. using protocol buffers.
};
IEncodedAsyncSubscription subscription = connection.SubscribeAsync("Subject");
INATSObservable<object> observable = subscription.ToObservable()
.Select(decodedObject => {
// Mapping logic.
});
Note that in the above, the IEncodedAsyncSubscription
type doesn't exist and is what I think would be needed for that to work, as an alternative to IAsyncSubscription
.
@hypdeb Can you please review this unit test https://github.com/nats-io/nats.net/blob/master/src/Tests/IntegrationTests/TestEncoding.cs If looks like you can already received encoded messages using the normal async subscription. If you are already aware of this, maybe can you explain why you need to observe a message that you can already receive async?
@scottf Thanks for coming back to me on this.
I don't think the test addresses my issue. I know I can receive the decoded messages, what I am missing is a handy way of extracting an observable sequence out of those messages, like it exists for the IConnection
using the Observe
and ToObservable
extension methods defined here
https://github.com/nats-io/nats.net/blob/e51e0c940f785ac9105dcb15cf27196f73af3b9d/src/NATS.Client/Rx/RxExtensions.cs#L18
It is possible what I'm trying to do is somehow not smart, therefore let me add a bit more context: I have a messaging service with an interface looking like this:
interface IMessagingService
{
IObservable<MessageType> ReadMessages();
}
that I would like to implement like this:
class NatsMessagingService
{
IObservable<MessageType> ReadMessages()
{
IEncodedConnection connection = this.connectionFactory.CreateEncodedConnection(configuration.Url)
connection.OnDeserialize = data =>
{
// Deserialization logic, e.g. using protocol buffers.
};
IEncodedAsyncSubscription subscription = connection.SubscribeAsync("Subject");
INATSObservable<MessageType> observable = subscription.ToObservable()
.Select(decodedObject => {
// Mapping logic.
});
return observable;
}
}
I cannot do that at this time because there is no ToObservable
or Observe
extension methods for IEncodedConnection
and the ToObservable
method defined on IAsyncSubscription
produces an observable sequence of Msg
, which doesn't contain the decoded message (as far as I can see), but only the byte[]
.
I think you might need some or an entire duplication of the RX package just for encoded connection. Maybe / maybe not, this is an area in the .NET client that I really don't use. You are welcome to fork the repo and propose a solution.
The Rx
stuff is actually okay, the issue is this:
https://github.com/nats-io/nats.net/blob/e51e0c940f785ac9105dcb15cf27196f73af3b9d/src/NATS.Client/AsyncSub.cs#L37
What is missing is a subscription with and EncodedMessageEventArgs
instead of MsgHandlerEventArgs
.
Which part of the code are you referring to when you say that you are not using it ? Encoded connections or Rx ? Would you know of someone who is using that who could have a look ?
I am noticing a further discrepancy between the functionality supported by IConnection
and IEncodedConnection
, in that IEncodedConnection
doesn't support CreateJetStreamContext
. I think I will have to somehow make due with IConnection
as it is too painful to use IEncodedConnection
at this stage.