vert.x icon indicating copy to clipboard operation
vert.x copied to clipboard

Event-bus streaming WIP

Open vietj opened this issue 5 years ago • 4 comments
trafficstars

Prototype for streaming on top of event-bus.

vietj avatar Apr 23 '20 20:04 vietj

this is expected, on each message on the specified address it creates a stream, this is similar to a server socket that creates socket

On 24 Apr 2020, at 03:53, Lin Gao [email protected] wrote:

@gaol commented on this pull request.

In src/main/java/io/vertx/core/eventbus/impl/ConsumerStream.java https://github.com/eclipse-vertx/vert.x/pull/3388#discussion_r414236383:

+public class ConsumerStream<T> implements ReadStream<T>, Handler<Message<T>> { +

  • public static <T> void bind(Vertx vertx, String address, Handler<ReadStream<T>> handler, Handler<AsyncResult<Void>> completionHandler) {
  • Context context = vertx.getOrCreateContext();
  • Handler<ReadStream<T>> h = handler;
  • EventBus bus = context.owner().eventBus();
  • bus.consumer(address, msg -> {
  •  String remoteAddress = msg.headers().get(STREAM_ADDRESS);
    
  •  if (remoteAddress == null) {
    
  •    // No back pressure (not supported)
    
  •    msg.fail(0, "No credits address");
    
  •    return;
    
  •  }
    
  •  String localAddress = UUID.randomUUID().toString();
    
  •  ConsumerStream<T> stream = new ConsumerStream<>(bus, context);
    

This would create new ConsumerStream on each message, should it be put before bus.consumer() ?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/eclipse-vertx/vert.x/pull/3388#pullrequestreview-399589145, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABXDCTUJXH267RMEQAZVR3RODWHZANCNFSM4MPOUURQ.

vietj avatar Apr 24 '20 06:04 vietj

no particular reason, in practice this will use rather a low water mark and high water mark instead than being 0 - 500 range

On 24 Apr 2020, at 09:38, Julien Ponge [email protected] wrote:

@jponge commented on this pull request.

In src/main/java/io/vertx/core/eventbus/impl/ConsumerStream.java https://github.com/eclipse-vertx/vert.x/pull/3388#discussion_r414362662:

  •  synchronized (this) {
    
  •    handler = this.handler;
    
  •    credits++;
    
  •    if (credits > 500) {
    
  •      credits = 0;
    
  •      refund = true;
    
  •    } else {
    
  •      refund = false;
    
  •    }
    
  •  }
    
  •  if (handler != null) {
    
  •    handler.handle(msg.body());
    
  •  }
    
  •  if (refund) {
    
  •    // Refund 500 credits at once
    
  •    bus.send(remoteAddress, null, new DeliveryOptions().addHeader(STREAM_CREDITS, "500"));
    

Quick question: why 500? (if that's just an early prototyping value I get it)

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/eclipse-vertx/vert.x/pull/3388#pullrequestreview-399709501, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABXDCRBXGWPWBHXJPTMWJDROE6ZBANCNFSM4MPOUURQ.

vietj avatar Apr 24 '20 09:04 vietj

@vietj what is the state of this pr?

knotenpunkt avatar Aug 12 '21 14:08 knotenpunkt

it is currently on hold

vietj avatar Aug 16 '21 06:08 vietj