vert.x
vert.x copied to clipboard
Event-bus streaming WIP
Prototype for streaming on top of event-bus.
this is expected, on each message on the specified address it creates a stream, this is similar to a server socket that creates socket
On 24 Apr 2020, at 03:53, Lin Gao [email protected] wrote:
@gaol commented on this pull request.
In src/main/java/io/vertx/core/eventbus/impl/ConsumerStream.java https://github.com/eclipse-vertx/vert.x/pull/3388#discussion_r414236383:
+public class ConsumerStream<T> implements ReadStream<T>, Handler<Message<T>> { +
- public static <T> void bind(Vertx vertx, String address, Handler<ReadStream<T>> handler, Handler<AsyncResult<Void>> completionHandler) {
- Context context = vertx.getOrCreateContext();
- Handler<ReadStream<T>> h = handler;
- EventBus bus = context.owner().eventBus();
- bus.consumer(address, msg -> {
String remoteAddress = msg.headers().get(STREAM_ADDRESS);if (remoteAddress == null) {// No back pressure (not supported)msg.fail(0, "No credits address");return;}String localAddress = UUID.randomUUID().toString();ConsumerStream<T> stream = new ConsumerStream<>(bus, context);This would create new ConsumerStream on each message, should it be put before bus.consumer() ?
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/eclipse-vertx/vert.x/pull/3388#pullrequestreview-399589145, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABXDCTUJXH267RMEQAZVR3RODWHZANCNFSM4MPOUURQ.
no particular reason, in practice this will use rather a low water mark and high water mark instead than being 0 - 500 range
On 24 Apr 2020, at 09:38, Julien Ponge [email protected] wrote:
@jponge commented on this pull request.
In src/main/java/io/vertx/core/eventbus/impl/ConsumerStream.java https://github.com/eclipse-vertx/vert.x/pull/3388#discussion_r414362662:
synchronized (this) {
handler = this.handler;credits++;if (credits > 500) {credits = 0;refund = true;} else {refund = false;}}if (handler != null) {handler.handle(msg.body());}if (refund) {// Refund 500 credits at oncebus.send(remoteAddress, null, new DeliveryOptions().addHeader(STREAM_CREDITS, "500"));Quick question: why 500? (if that's just an early prototyping value I get it)
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/eclipse-vertx/vert.x/pull/3388#pullrequestreview-399709501, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABXDCRBXGWPWBHXJPTMWJDROE6ZBANCNFSM4MPOUURQ.
@vietj what is the state of this pr?
it is currently on hold