smallrye-reactive-messaging icon indicating copy to clipboard operation
smallrye-reactive-messaging copied to clipboard

Support intercepting and converting outgoing messages

Open vladykin opened this issue 4 years ago • 12 comments

There a several cross-cutting concerns associated with emitting messages to outgoing channel:

  • serializing domain objects to wire format
  • logging the message that is about to be sent (and the result of sending)
  • updating metrics

Having to write this stuff in all places of application where something is emitted or returned from an @outgoing-annotated method is inconvenient. It would be very handy to have an interceptor that would process all outgoing messages and be able to log/count/convert/etc them.

For incoming messages there is MessageConverter which can be used for this purpose. For outgoing messages, AFAIK, these is nothing like that.

P.S. I'm using smallrye-reactive-messaging-amqp 2.9 with Quarkus 1.13.1.

vladykin avatar Apr 17 '21 13:04 vladykin

That's an interesting idea. One thing is that AMQP is a bit behind in term of feature (in comparison to Kafka which have been a priority for the last few months). I have a few questions (mostly to collect ideas):

serializing domain objects to wire format

That's generally protocol specific? Typically in Kafka you can configure your serializer. The AMQP protocol is strict in terms of type (AMQP Type system), however, for object (where we map the object into JSON at the moment) we can imagine a hook that would be something like:

public <T> byte[] serialize(T instance, OutgoingAmqpMetadata metadata);

Implementations would be able to serialize the instance as they want (producing a byte[]) as well as updating the outgoing message metadata (typically the content-type header).

I guess the HTTP connector can also benefit from such an interceptor (@michalszynkiewicz)

logging the message that is about to be sent (and the result of sending)

That's definitely something we should add. However, the API is going to be protocol-specific. In Kafka, we will call the listeners with a Record (going to be sent) and RecordMetadata (result). HTTP would use something like an HTTP Request and an HTTP Response.

The question is do we need multiple hooks or a single one receiving what has been sent and the result. Typically, what about:

public <I, O> void onSuccess(I sent, O response);
public <I, O> void onFailure(I sent, Exception e);

Note that the onFailure does not allow retry or anything. This is already handled by the connector.

updating metrics

We already expose metrics (at least for Kafka). We should add the AMQP metrics soon.

cescoffier avatar Apr 18 '21 07:04 cescoffier

Is this perhaps related to #1141? (If so, I guess we should keep this one open, as it has more details/ideas. I'd close #1141 as duplicate.)

Ladicek avatar Apr 19 '21 06:04 Ladicek

serializing domain objects to wire format

That's generally protocol specific? Typically in Kafka you can configure your serializer. The AMQP protocol is strict in terms of type (AMQP Type system), however, for object (where we map the object into JSON at the moment) we can imagine a hook that would be something like: public <T> byte[] serialize(T instance, OutgoingAmqpMetadata metadata);

Well, MessageConverter used for incoming messages is not protocol-specific but works just fine. What about OutgoingMessageConverter with a couple of methods like:

    boolean canConvert(Message<?> in);
    Message<?> convert(Message<?> in);

? Similar to MessageConverter, but without target type argument, which is to be decided internally by the converter.

Implementations can access protocol-specific metadata of Message, if they need to.

vladykin avatar Apr 19 '21 10:04 vladykin

logging the message that is about to be sent (and the result of sending)

That's definitely something we should add. However, the API is going to be protocol-specific. In Kafka, we will call the listeners with a Record (going to be sent) and RecordMetadata (result). HTTP would use something like an HTTP Request and an HTTP Response. The question is do we need multiple hooks or a single one receiving what has been sent and the result. Typically, what about: public <I, O> void onSuccess(I sent, O response); public <I, O> void onFailure(I sent, Exception e);

I'd prefer a hook which would allow to intercept the following events: message about to be sent, message successfully sent (ack'ed by broker), message sending failed. And to reliably match all these events corresponding to a single message, e.g. to measure time between sending the message and getting ack for it.

vladykin avatar Apr 19 '21 10:04 vladykin

So, if I understand correctly, your MessageConverter is called before the connector is called. Because the first thing an outgoing connector does is to transform an incoming Message into its own protocol-specific structure.

cescoffier avatar Apr 19 '21 13:04 cescoffier

Yes, OutgoingMessageConverter I described can be called in the message processing pipeline before the message is passed to the outgoing connector. Maybe there are advanced usecases when access to low-level connector-specific stuff is needed. But for my usecase - serializing protobuf object to byte array - such connector-agnostic converter would be enough.

vladykin avatar Apr 20 '21 20:04 vladykin

We have similar needs. We have use cases where we want to add some metadata as cross-cutting concerns (correlationId, authenticated principal name, ...).

scrocquesel avatar Nov 24 '21 23:11 scrocquesel

Hi. I need to work with XML and SOAP messages. At the moment I've written MessageConverter for incoming messages (it uses Apache Axiom). But there is no way to do something similar for outgoing messages. I suppose that OutgoingMessageConverter could be useful in my case. Actually, I think that there is no need in to introduce new interface and MessageConverter may be reused. Connector can specify second parameter (Type type) according to it needs. Or may be target type could be made configurable at channel level (like mp.messaging.outgoing.myqueue.outputtype=fully.qulified.class.Name)

ylepikhov avatar Dec 10 '21 07:12 ylepikhov

It's possible to implement such logic with io.smallrye.reactive.messaging.providers.PublisherDecorator (which is not documented). It has one method PublisherDecorator.decorate taking publisher instance and channel name as parameters and returning new decorated publisher instance. This method gets called once for each channel at startup. I've implemented such decorator to convert all outgoing messages to String (see Kotlin code below).

@ApplicationScoped
class AmqpSoapPublisherDecorator @Inject constructor(
    // inject all converters
    private val converters: Instance<MessageConverter>,
    // and config
    private val config: Config) : PublisherDecorator {

    // called once for each channel at startup
    override fun decorate(publisher: Multi<out Message<*>>, channelName: String): Multi<out Message<*>> {

        // examine outgoing channel configuration
        val connector = config.getOptionalValue("mp.messaging.outgoing.$channelName.connector", String::class.java)
        // if connector is amqp        
        if (connector.isPresent && "smallrye-amqp".equals(connector.get())) {
            // apply converters to convert to String (see also io.smallrye.reactive.messaging.providers.helpers.ConverterUtils) 
            return ConverterUtils.convert(publisher, converters, String::class.java)
        }
        // otherwise return unchanged publisher 
        return publisher
    }
}

So it's possible to decorate publisher based on channel configuration.

ylepikhov avatar Apr 20 '22 20:04 ylepikhov

@ylepikhov I tested your approach in a Quarkus application, where I want to "decorate" outgoing messages as well as extract information from incoming messages (both using the smallrye-kafka connector). I was however only able to get this to work for incoming channels, not outgoing channels.

Stepping through the code the decorate() method gets called for incoming channels from ConfiguredChannelFactory#createPublisher(). The corresponding method ConfiguredChannelFactory#createSubscriber() which gets called for outgoing channels does however not call decorate(), so I am a bit surprised that you were able to get this working for an outgoing channel. Can you maybe tell me what I am missing?

The only other place I see calling decorate() is AbstractMediator#decorate(), but this method never ends up getting called in my application.

Looking at the SmallRye Reactive Messaging 3.16.0 documentation (Quarkus 2.9.0 is on version 3.15.0) I can see documentation for the experimental feature Custom Emitter Implementations. This sounds like it could be another alternative, but I would still be interested in how to use PublisherDecorator for outgoing channels.

knutwannheden avatar May 16 '22 14:05 knutwannheden

@knutwannheden, sorry for delay. You're right. In my case there was message processor with both @Incoming and @Outgoing. In this case there will be implicit incoming channel for @Outgoing. But it does not work for regular emitters annotated with @Channel.

ylepikhov avatar Jul 08 '22 06:07 ylepikhov

Was able to implement such feature with custom emitters. First, I've implemented an annotation to have a way to specify target class (to convert to).

@Qualifier
@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class ConvertPayloadFor(
   val connector: String,
   val clazz: KClass<*>)

For example, to make SOAPMessage be converted to String for smallrye-ampq connector you must have this:

@ApplicationScoped
class AmqpConversionConfig {

    // specify connector name and class to be converted
    @ConvertPayloadFor("smallrye-amqp", SOAPMessage::class)
    @Produces
    // function name does not really matter
    fun amqpConvertSOAPMessageTo() = String::class.java // target class
}

Actual conversion will be made by regular MessageConverter which can convert Message<SOAPMessage> to String (I've implemented such converter).

Finally, to send SOAPMessage you may have something like this:

@ApplicationScoped
class Test @Inject constructor(@param:Channel("test") val emitter: EmitterEx<SOAPMessage>) {
    
    suspend fun send(payload: SOAPMessage) {
        
        emitter.sendSuspending(payload)
    } 
}

Where EmitterEx is my custom emitter type (also I've implemented custom emitter implementation class and factory class).

But, it does not work for message processor (having both @Incoming and @Outgoint annotations). Such logic shold be implemented differently to support all scenarios.

ylepikhov avatar Jul 20 '22 15:07 ylepikhov

An implementation is provided by #1856 : http://smallrye.io/smallrye-reactive-messaging/3.22.0/concepts/decorators/#intercepting-outgoing-messages

ozangunalp avatar Nov 09 '22 14:11 ozangunalp