orleans icon indicating copy to clipboard operation
orleans copied to clipboard

[API proposal] Stream interceptors/filters

Open galvesribeiro opened this issue 4 years ago • 11 comments

Hello folks!

When instrumenting Orleans grains, we can easily use IIncomingGrainCallFilter/IOutgoingGrainCallFilter. That allow us to inspect the grain calls, instrument it with logs/telemetry and even modify or block them before they happen.

I came to a scenario where while instrumenting grains with those filters I get a lot of calls like this:

image

Clearly DeliverBatch is not helpful when you are trying to get metrics about streams like measuring latency between the producer and the consumer, grab exceptions when delivering messages to the consumer, etc, since we don't have the metadata from the stream messages across the board.

In a recent conversation with @ReubenBond, I've suggested to add something very similar to the Grain call filters, but with the context of the Streams where the developer can tap into the producer/consumer and inspect some information from the messages and maybe even modify its contents just like with do with Grain call filters and here is what I came out of a very high-level API design for this:

    /// <summary>
    /// Represents the context of the Stream message being produced or consumed
    /// </summary>
    public interface IStreamMessageContext
    {
        /// <summary>
        /// The identity of the stream (i.e. id and namespace)
        /// </summary>
        IStreamIdentity Identity { get; }

        /// <summary>
        /// The type information of the message
        /// </summary>
        Type MessageType { get; }

        /// <summary>
        /// The message being sent on the stream
        /// </summary>
        object Message { get; }

        /// <summary>
        /// Push the message to the backing Stream Provider on the producer side
        /// or deliver the message to the consumer
        /// </summary>
        /// <returns></returns>
        Task OnNextAsync();
    }

    /// <summary>
    /// The context of the message being published
    /// </summary>
    public interface IStreamPublisherMessageContext : IStreamMessageContext
    {
    }

    /// <summary>
    /// The context of the message being consumed
    /// </summary>
    public interface IStreamConsumerMessageContext : IStreamMessageContext
    {
        /// <summary>
        /// The stream subscription id
        /// </summary>
        Guid SubscriptionId { get; }

        /// <summary>
        /// The stream sequence token (if any)
        /// </summary>
        StreamSequenceToken SequenceToken { get; }
    }
    
    /// <summary>
    /// Interface for a filter that is invoked right before
    /// a message is published in a stream
    /// </summary>
    public interface IStreamPublisherMessageFilter
    {
        Task OnNextAsync(IStreamProducerMessageContext context);
    }

    /// <summary>
    /// Interface for a filter that is invoked right before
    /// a message is consumed from a stream and delivered to a grain
    /// </summary>
    public interface IStreamConsumerMessageFilter
    {
        Task OnNextAsync(IStreamConsumerMessageContext context);
    }

Just like the Grain call filters, this a completely opt-in feature and doesn't introduce any breaking changes to the current way we deal with Streams.

I'd like to get the input of the Community before I implement it and submit a PR. What are you thoughts? Any input is really appreciated.

galvesribeiro avatar Nov 17 '21 02:11 galvesribeiro

That sounds like an excellent idea! The streaming filtering right now in 3.x is mostly broken. In main, we have a different but more simple approach. Do you think this new "interceptors/filter" should replace that?

Another question: a stream producer, is an entity producing events. For most people, that mean it's either a client or a Grain. But in Orleans Core, if using persistent streams, a producer is the PullingAgent. Here, there the IStreamProducerMessageFilter will be executed? By the pulling agent, or by the entity pushing events to the queue?

benjaminpetit avatar Nov 17 '21 14:11 benjaminpetit

The streaming filtering right now in 3.x is mostly broken

I wasn't aware we had that on 3.x 😄 Probably because it is broken and nobody told me 😃

Do you think this new "interceptors/filter" should replace that?

I think those interfaces are simple and try to be similar to what everyone already does for the grain call. So if we could have a similar/coherent approach yes, we could replace that IMHO.

Here, there the IStreamProducerMessageFilter will be executed? By the pulling agent, or by the entity pushing events to the queue?

I think the producer filter should run on the client and the grain pushing the events. It should have similar semantics from the developer's perspective as in the grain call filters. In other words, whenever someone call stream.OnNextAsync(event) they should know the filter will run within that call, so if possible, when properly awaited, that filter can even throw an exception or simply discard the message if the filter logic say so.

My current usage is to use Activity and OpenTelemetry to instrument the stream messages and pass over the current Activity.Id from the producer so the consumer can create another Activity and set the incoming parent id so we can measure latency and processing time. But nothing stops people to implement any sort of logic like authorization, logging, sampling, exception handling etc. The major goal is to have similar features as in the regular grain call filters.

galvesribeiro avatar Nov 17 '21 15:11 galvesribeiro

If you track the processing of stream messages, the tracking timeline may seem strange, especially when you look at the processing time of the Grain call. You will find that the processing time of the Grain call is shorter than the tracking timeline.

buzzers avatar Nov 18 '21 04:11 buzzers

@buzzers thanks for the input. But in this case, I'm looking of building a tree like this

--> Client/Grain call OnNextAsync() on stream to publish a message ---> IStreamProducerMessageFilter is called and track that message (Activity 1) ----> IStreamConsumerMessageContext is called on the consumer side being that a grain or another client (Activity 2) -----> Grain/Client receives the message ------> Grain/Client makes another grain call (Activity 3)

The time between Activity 1 and 2 is where we measure the "latency" on the published message arriving on the client.

Activity 3, is just another regular grain call, which could be intercepted by a regular Grain Call filter.

So if we see on a timeline, because the Activity is inheriting the parent id, they would be chained on a single operation, and then we would see it correctly on tools that support OpenTelemetry like for example Azure Application Insights and NewRelic.

The correlation between the activities can leverage the current RequestContext as we do Today with Grain method calls.

With that, I can have a full tracing from a HTTP Request, thru grain calls and streams all together properly chained.

galvesribeiro avatar Nov 18 '21 17:11 galvesribeiro

I think those interfaces are simple and try to be similar to what everyone already does for the grain call. So if we could have a similar/coherent approach yes, we could replace that IMHO.

Since you want the filter to run in the OnNextAsync call, I don't think they are very comparable. The current filter implementation run in the Pulling Agent, filtering on a per subscription basis: you can filter a given event for subscriber A, but not for B for example.

I think the producer filter should run on the client and the grain pushing the events

Then I think we should choose a different name than "producer" to avoid more confusion.

benjaminpetit avatar Nov 24 '21 09:11 benjaminpetit

Sorry @benjaminpetit haven't saw your reply.

"Producer" in my mind is the one who is putting a message into a stream using stream.OnNextAsync() being that a IClusterClient or a Grain. Consumers in the other hand, is anything that calls stream.SubscribeAsync().

We can change for sure the naming if that makes more sense. What would you suggest?

I really need to get this implemented since it is becoming a nightmare for us to track down stream messages. Everything is working perfectly with grain method calls, but since we really heavily on streams, we have a good part of the code path "in the dark" without metrics :/

galvesribeiro avatar Dec 02 '21 23:12 galvesribeiro

Maybe "publisher"?

I don't want people to confuse "Producer filter" with the notion of publisher in Orleans. The other possibility would be to find a better name for the current "publisher" in streaming and replace everything...

benjaminpetit avatar Dec 03 '21 08:12 benjaminpetit

Ok make sense. I'll keep everything as publisher. Thanks!

galvesribeiro avatar Dec 03 '21 10:12 galvesribeiro

Hey @galvesribeiro , great idea which you described! I have the exact same problem as you have/had. Does anyone know if stream filters/interceptors are planned?

mert-ozturk-lgs avatar Nov 11 '25 17:11 mert-ozturk-lgs

Hey thanks!

We didn't moved to implement it back in the day. The quick workaround we did, was to wrap all stream messages with a envelope type which carry over some metadata, including the traceId/Activity.Id so it could be used on the other side. Not ideal but worked back then.

I'll talk to Reuben about where we are on this and see what can be done. I still think it is a useful API to be added.

Will update back here later on.

galvesribeiro avatar Nov 11 '25 17:11 galvesribeiro

I'll draft something in a branch and show @benjaminpetit and @ReubenBond as soon as I have time and eventually come up with a PR.

galvesribeiro avatar Nov 11 '25 17:11 galvesribeiro