vertx-kafka-client
vertx-kafka-client copied to clipboard
vertx-kafka (eventloop execute callback) is not the same one ( eventloop execute producer.write)
when execute kafak producer write() method,eg: eventloop-1-5 execute wirte method,but the callback is executing in eventloop-3-5, in write method ,i think should add a line like Context context = vertx.getOrCreateContext(); -->this line is get current eventloop context, then execute context.runOnContext() method. without this line , it will cause thread safety problems
reference vertx-mongo code :
private <T> SingleResultCallback<T> wrapCallback(Handler<AsyncResult<T>> resultHandler) {
Context context = vertx.getOrCreateContext(); -->this line is get cuttent eventloop context
return (result, error) -> {
context.runOnContext(v -> {
if (error != null) {
resultHandler.handle(Future.failedFuture(error));
} else {
resultHandler.handle(Future.succeededFuture(result));
}
});
};
}
KafkaProducer<K, V> write(KafkaProducerRecord<K, V> record, Handler<AsyncResult<RecordMetadata>> handler);
//Construction method
public KafkaWriteStreamImpl(Context context, Producer<K, V> producer) {
this.producer = producer;
this.context = context;
}
kafka write code:
@Override
public synchronized KafkaWriteStreamImpl<K, V> write(ProducerRecord<K, V> record, Handler<AsyncResult<RecordMetadata>> handler) {
int len = this.len(record.value());
this.pending += len;
this.context.<RecordMetadata>executeBlocking(fut -> {
try {
this.producer.send(record, (metadata, err) -> {
// callback from IO thread
this.context.runOnContext(v1 -> {
synchronized (KafkaWriteStreamImpl.this) {
// if exception happens, no record written
if (err != null) {
if (this.exceptionHandler != null) {
Handler<Throwable> exceptionHandler = this.exceptionHandler;
this.context.runOnContext(v2 -> exceptionHandler.handle(err));
}
}
long lowWaterMark = this.maxSize / 2;
this.pending -= len;
if (this.pending < lowWaterMark && this.drainHandler != null) {
Handler<Void> drainHandler = this.drainHandler;
this.drainHandler = null;
this.context.runOnContext(drainHandler);
}
}
if (handler != null) {
handler.handle(err != null ? Future.failedFuture(err) : Future.succeededFuture(metadata));
}
});
});
} catch (Throwable e) {
exceptionHandler.handle(e);
}
}, null);
return this;
}
ping @ppatierno