smallrye-reactive-messaging
smallrye-reactive-messaging copied to clipboard
Add support for NATS
Add support for NATS: https://nats.io/
I was able to get NATS working with smallrye reactive messaging by using the camel connector with their NATS integration. It was janky but it worked for a POC. Although the experience was subpar and left something to be desired.
That being said, I'd be happy to volunteer to contribute a native NATS connector. I've been following their Java client and it seems they're stable with the V2 rewrite. So now would be a good time to implement it here.
https://github.com/nats-io/nats.java
I think a first cut release goal for me would be configuration, receiving messages, and publishing messages with plain NATS. Then I should hopefully be up to speed enough to also somehow include Jetsream support and TLS suppport / anything else using the client.
I took a quick glance at the RabbitMQ implementation and although it looks daunting I think I'm up to the challenge. I'd imagine NATS would be similar.
With some guidance I should be able to get set up, implement some basic functionality and submit a PR for review sometime in the next week or so.
Does that sound good?
Any advice on how to proceed and where to start would be appreciated. 🤙🏽
That would be awesome! I'm not a NATS expert (I'm already struggling with the few protocols that I know).
The main questions are:
- Is there a "good" non-blocking API,
- How back-pressure is handled and how can we adapt it to reactive messaging,
- How is acknowledgment/negative-acknowledgment handled,
- Is the consumption per message or per batch
I would add to the previous list:
- Maybe it goes without saying but having a container util for running the nats server in tests is very important.
- Backpressure needs to be handled both ways: For fast publishers and slow consumers.
- How to check for connections status for health checks?
Okay good questions. I'll do some research and come back with answers
Okay I stood up the module and began with documentation
Copying my notes to this thread (./smallrye-reactive-messaging-nats/notes.md)
@cescoffier:
Is there a "good" non-blocking API
-
Requirement: Implementation must be non blocking
-
According to the nats.java#connecting:4 the client is capable of asynchronous communication.
Options options = new Options.Builder().server(Options.DEFAULT_URL).connectionListener(handler).build();
Nats.connectAsynchronously(options, true);
- They have marked asynchronous connections as experimental
- According to nats.java#listening there is an async callback thread based way to receive messages
Dispatcher d = nc.createDispatcher((msg) -> {
String response = new String(msg.getData(), StandardCharsets.UTF_8);
...
});
d.subscribe("subject");
How back-pressure is handled and how can we adapt it to reactive messaging
- "NATS automatically handles a slow consumer. If a client is not processing messages quick enough, the NATS server cuts it off" source
- There is an exception thrown client side that serves as a warning that a consumer is slow source
- A consumer can limit the incoming queue size source
- The client throws an exception that can be handled when it is force disconnected by the server source
- After researching back-pressure I found the following options
- Control the Producer
- NATS does not recommend metering the publisher
- One can issue a periodic request/reply to match subscriber rates
- Create a bounded buffer
- NatsConnectionReader and NatsConnectionWriter implement buffering
- I could write a configurable FIFO queue to further buffer messages
- Drop Messages
- This seems like a bad idea since the messages aren't ack'd (see below)
- Naive approach: just yolo it and punt handling back pressure for now
- I'm still not really sure how to handle backpressure, any ideas?
How is acknowledgment/negative-acknowledgment handled,
- The nats.io Java library has a method for n/acking
- Unless the connection is a Jetstream connection n/acking is a NO-OP
- Since they didn't implement it
- would it be desirable to override their implementation and implement it?
- or is it fine if left as is
client/impl/NatsMessage.java
...
@Override
public void ack() {
// do nothing. faster. saves checking whether a message is jetstream or not
}
@Override
public void ackSync(Duration d) throws InterruptedException, TimeoutException {
// do nothing. faster. saves checking whether a message is jetstream or not
}
@Override
public void nak() {
// do nothing. faster. saves checking whether a message is jetstream or not
}
...
Is the consumption per message or per batch
- I would want the consumption to be per message
- I think mirroring how the NATS Java library handles it would be nice
Hypothetical Example Usage:
import io.nats.client.Message;
import imaginary.message.format.Deserialize;
...
@Incoming("get.example.model.*") //[1]
public CompletionStage<Void> consume(NatsMessage<Message> message) { //[2]
if (msg.hasHeaders()) { //[3]
...
}
String subject = message.getSubject(); //[4]
byte[] data = message.getData(); //[5]
MyDTO dto = Deserialize.fromBytes(data , MyDTO.class); //[6]
...
}
- In nats topics are called subjects
- users should be able to set raw subjects as strings or have them mapped from microprofile config
- Everything else in this library has a
Message<T>container by conventionNatsMessage<T>would implementorg.eclipse.microprofile.reactive.messaging.Message<T>- The Message object from nats.io could be contained by NatsMessage
- Nats Message objects have
{subject: String, header: Map<k,v> , data: byte[]}- Link to Nats Message interface
- idea: the ability to pick a deserializer
- That way one could do
NatsMessage<MyDTO>and have it be automatically mapped
- That way one could do
- Messages have optional headers
- If you map a DTO then maybe there could be a second parameter that takes the header object
- The subject is the topic name, would be the raw form of [1]
- Nats protocol is binary and serialization format is left to the user
- Can use plain UTF8 Strings, JSON, JSONB, etc
- NATS Streaming uses protobuf
- According to the Streaming Server's repository README it is deprecated
- In favor of Jetstream
- According to the Streaming Server's repository README it is deprecated
- Like the idea above, providing JSON/B deserialization out of the box would be helpful
- Maybe an integration with Jackson or JSONB as other contributions have
@ozangunalp
Maybe it goes without saying but having a container util for running the nats server in tests is very important.
- Requirement: set up dev container for nats server in tests.
- Nats Docker
version: "3.5"
services:
nats:
image: nats
ports:
- "8222:8222"
networks: ["nats"]
networks:
nats:
name: nats
-
This project uses Test Containers
- There is no prebuilt NATS Test Container interface
- There is a merge-able open PR on the Test Containers Repo 🎉
- I can bug the mainers to review and merge the PR
- Or I can write a generic one, all it has to do is run the docker image as above.
-
There is a Java Nats Server Runner that is used by the library to run NATS
- It relies on having the nats-server binary in the path
- would need to figure out how to dynamically acquire and package nats-server for tests
-
I guess there's three options here, any preference?
- Option 1: Get the NATS Test Containers PR merged and use that
- Option 2: Use GenericContainer Test Container
- Option 3: Use the official Java Nats Server Runner
@Container
public GenericContainer nats = new GenericContainer(DockerImageName.parse("nats:latest"))
.withExposedPorts(8222);
Backpressure needs to be handled both ways: For fast publishers and slow consumers.
- Requirement: apply bi directional backpressure
- Research what backpressure is and how to handle it
- See the answer above, I'm not too sure how to handle back pressure, any ideas?
How to check for connections status for health checks?
- Requirement: Contribution must expose health check
- The NatsConnection manages connections and exposes a getStatus() method which returns a Status object representing the current state of the connection.
- NATS Exposes a bunch of Monitoring Endpoints
That sounds promising.
-
Non-Blocking I'm a bit afraid of the async support that seems to be async but not non-blocking. Basically, it looks like they offload to a thread pool (kind of a bad idea under load). For the
Dispatcherwe may be able to use a Vert.x context. -
Back-pressure The consumer side back-pressure mechanism is not great. At least it tells you you got dropped and we could imagine marking the application as unhealthy (hoping to be restarted). I wonder if there is a way to "pause" the reception instead of being kicked.
On the producer side, it does not seem to be handled either. There is some cache/flush mechanism (https://docs.nats.io/developing-with-nats/sending/caches). As for the consumer side, we can use their ping/pong to detect connection issues or too much load and force a restart. It's not elegant, but it looks unsupported (I may be wrong, I just read the doc very quickly).
We can start with a simple implementation not doing much around back-pressure and see how it goes.
Any update or workaround found to integrate nats? I'm looking to integrate nats messaging into a quarkus project.
No news yet; looking for a contributor that knows NATS enough to build a connector. We can help with the integration, but NATS is in uncharted territory for me.
FYI for backpressure support, I believe the standard approach with NATS is to use JetStream with a pull based consumer.
The consumer can then request batches of messages when it finishes processing the previous batch, essentially implementing "back pressure", though this doesn't actually slow down the message producers, and just acts as a buffer.
In that way, the consumer client can pull messages at it's leisure. If it's too slow, then the NATS server will deal with the queue overflowing as defined in the jetstream configuration and isn't the consumer's concern. Some options are to limit the size or number of messages in the stream, with policies to discard the oldest or newest messages when that limit is reached.
If the intent is to implement proper back pressure that slows down the producer, then I believe this needs to be done as a custom application protocol using reply topics. The producer can then listen for acks on the reply topic and continue emitting once the messages have been consumed. This is much more complex to implement correctly however as NATS without Jetstream isn't really a message queue.
Andy news? Were you guys able to speak to the nats-team?