vertx-kafka-client icon indicating copy to clipboard operation
vertx-kafka-client copied to clipboard

vertx-kafka (eventloop execute callback) is not the same one ( eventloop execute producer.write)

Open jiangchuan185 opened this issue 6 years ago • 1 comments

when execute kafak producer write() method,eg: eventloop-1-5 execute wirte method,but the callback is executing in eventloop-3-5, in write method ,i think should add a line like Context context = vertx.getOrCreateContext(); -->this line is get current eventloop context, then execute context.runOnContext() method. without this line , it will cause thread safety problems

reference vertx-mongo code :

private <T> SingleResultCallback<T> wrapCallback(Handler<AsyncResult<T>> resultHandler) {
    Context context = vertx.getOrCreateContext();  -->this line is get cuttent eventloop context
    return (result, error) -> {
      context.runOnContext(v -> {
        if (error != null) {
          resultHandler.handle(Future.failedFuture(error));
        } else {
          resultHandler.handle(Future.succeededFuture(result));
        }
      });
    };
  }

KafkaProducer<K, V> write(KafkaProducerRecord<K, V> record, Handler<AsyncResult<RecordMetadata>> handler);
//Construction method
public KafkaWriteStreamImpl(Context context, Producer<K, V> producer) {
    this.producer = producer;
    this.context = context;
  }

kafka write code:

@Override
  public synchronized KafkaWriteStreamImpl<K, V> write(ProducerRecord<K, V> record, Handler<AsyncResult<RecordMetadata>> handler) {

    int len = this.len(record.value());
    this.pending += len;
    this.context.<RecordMetadata>executeBlocking(fut -> {
      try {
        this.producer.send(record, (metadata, err) -> {

          // callback from IO thread
          this.context.runOnContext(v1 -> {
            synchronized (KafkaWriteStreamImpl.this) {

              // if exception happens, no record written
              if (err != null) {

                if (this.exceptionHandler != null) {
                  Handler<Throwable> exceptionHandler = this.exceptionHandler;
                  this.context.runOnContext(v2 -> exceptionHandler.handle(err));
                }
              }

              long lowWaterMark = this.maxSize / 2;
              this.pending -= len;
              if (this.pending < lowWaterMark && this.drainHandler != null) {
                Handler<Void> drainHandler = this.drainHandler;
                this.drainHandler = null;
                this.context.runOnContext(drainHandler);
              }
            }

            if (handler != null) {
              handler.handle(err != null ? Future.failedFuture(err) : Future.succeededFuture(metadata));
            }
          });
        });
      } catch (Throwable e) {
        exceptionHandler.handle(e);
      }
    }, null);

    return this;
  }

jiangchuan185 avatar Jan 07 '19 09:01 jiangchuan185

ping @ppatierno

vietj avatar Jan 09 '19 07:01 vietj