smallrye-reactive-messaging icon indicating copy to clipboard operation
smallrye-reactive-messaging copied to clipboard

Support aws kinesis for reactive messaging

Open rverma-nikiai opened this issue 5 years ago • 7 comments

Hi team,

I have started working on supporting kinesis client as part of reactive-messaging https://github.com/nikiai/smallrye-reactive-messaging/tree/master/smallrye-reactive-messaging-kinesis. I am having some difficulties with Kinesis Source though since kinesis library provides us a subscribeToShard method and I couldn't figure out a way to bind the PublisherBuilder extends Message>> source to the above subscriber.

Any leads are warmly appreciated.

rverma-nikiai avatar Jul 25 '19 08:07 rverma-nikiai

Hello @rverma-nikiai

I don't really know the Kinesis API. For a first look for the "outgoing" side it sounds rather easy as you get a CompletionStage, so you would use the same kind of approach used for Kafka.

For the "incoming" side, yes it might be a bit more tricky. There are several ways:

  • Using an AsyncProcessor and in a separate thread polling the records from Kinesis. The issue with this approach is the unbounded buffer created by the AsyncProcessor. Basically, if the consumption is very slow you may run out of memory
  • Using a queue of Kinesis messages, retrieve x records and stop until they have been consumed. In this approach you would create the PublisherBuilder using (be aware this is pseudo-code):
PublisherBuilder.fromPublisher(Flowable.generate(emitter -> {
     KinesisMessage msg = queue.get();
     if (msg == null) {
          pollMoreFromKinesis(emitter)
     } else {
         emitter.onNext(msg);
         if (queue.size() < threshold) {
              pollMoreFromKinesis(null)
         }
     }
});

pollMoreFromKinesis(Emitter) retrieves the records from Kinesis. If the passed emitter is not null the next record needs to be sent (using onNext) instead of queued. The queue is a storage of the records. It is filled by pollMoreFromKinesis and consumed using the get method.

cescoffier avatar Jul 25 '19 09:07 cescoffier

BTW, if you want to contribute this connector to the smallrye implementation, you are more than welcome.

cescoffier avatar Jul 25 '19 09:07 cescoffier

@cescoffier I am intended to add this to smallrye implementation only and thanks for your suggestions, they are very helpful.

I have a question. Since kinesis library uses http2 with rxjava what advantages would vertx would provide over that?

Also in the outgoing part, since both kafka and kinesis recommends to write messages in batch, I didn't found anything in kafkaSink which suggests batch workflow. Do you have any recommended approach for this? I did a crude hack at https://github.com/nikiai/smallrye-reactive-messaging/blob/master/smallrye-reactive-messaging-kinesis/src/main/java/io/smallrye/reactive/messaging/kinesis/KinesisSink.java but I would believe the batch process should include a fix flush time and max batch size and should flush when either of them hit the limit.

e.g. if we have a flush rate of 1 sec and batch size of 25, we should flush either every second or when we received 25 messages in the stream, whatever happens first.

rverma-nikiai avatar Jul 26 '19 01:07 rverma-nikiai

@rverma-nikiai Vert.x is not used in all the connectors (the camel connector does not use it), so no worries.

You can use the RX Java 2 API provided by the library. BTW, do you have the link to this lib (I'm a real Kinesis noob).

About batch, it's something I need to fix in the KafkaConnector. But my idea is very close to yours, keep a number of inflight message in a queue and send them "once in a while" (by size or timeout).

cescoffier avatar Jul 26 '19 07:07 cescoffier

The library is

<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>kinesis</artifactId>
</dependency>

And some sample code examples are at

https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamRxJavaEx.java

https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java

rverma-nikiai avatar Jul 26 '19 12:07 rverma-nikiai

Still stuck since the kinesis client expects a Consumer and we expect to return PublisherBuilder<Message<?>>. I think we have to pass a consumer which copies the messages to a Flowable. Does this make sense? Any idea how can we do this?

rverma-nikiai avatar Jul 26 '19 15:07 rverma-nikiai

As a first step, you can use an AsyncProcessor, and call onNext with the messages received by the consumer. Then, we can iterate to add proper back pressure.

cescoffier avatar Jul 26 '19 17:07 cescoffier