vertx-kafka-client
vertx-kafka-client copied to clipboard
Move blocking send operation to executor thread
Hi! :)
Thank you for this project..
This PR fixes a blocking call in the send
method of KafkaWriteStreamImpl
class, which was detected with the help of BlockHound:
We re-ran the test cases and also evaluated performance (in terms of sleep time latency of the SendThread) before and after the fix:
Before
After
The producer.send()
operation is already called on a worker thread (ctx.executeBlocking()
), so this helps nothing AFAICT.
@Ladicek Right, you know I was wondering the same.. :) Then I came across similar issue by another user (of executeBlocking still throwing blocking errors: https://github.com/eclipse-vertx/vert.x/issues/2798
The Vertx developers then clarified its use in their documentation:
WARNING: Blocking code should block for a reasonable amount of time (i.e no more than a few seconds). Long blocking operations or polling operations (i.e a thread that spin in a loop polling events in a blocking fashion) are precluded. When the blocking operation lasts more than the 10 seconds, a message will be printed on the console by the blocked thread checker. Long blocking operations should use a dedicated thread managed by the application...
https://vertx.io/docs/apidocs/io/vertx/rxjava/core/Context.html
I believe the thing that matters here is : is the blocking operation for such time expected or not ?
@vietj yes, send
blocks on metadata updates for max.block.ms
:
- doSend: https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L898-L906
- await metadata https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1047
This is particularly evident when the Kafka cluster is down or unavailable
so I think we should use this specific executor, however it should not be nested inside executeBlocking and use directly instead
@vietj Should I try updating?
yes you should
On Wed, Jul 26, 2023 at 5:49 PM Arooba Shahoor @.***> wrote:
@vietj https://github.com/vietj Should I try updating?
— Reply to this email directly, view it on GitHub https://github.com/vert-x3/vertx-kafka-client/pull/251#issuecomment-1652085992, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABXDCQTCHX7IG4AXAPKOPDXSE4AXANCNFSM6AAAAAA2XZAVPI . You are receiving this because you were mentioned.Message ID: @.***>
Apologies for the delayed response..
If I replace executeBlocking()
with executor.execute(..)
, the return type will become void right? so we would have to update the usage of send
method call everywhere.. no? 🤔
or should we return promise like this:
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
ContextInternal ctx = vertx.getOrCreateContext();
ProducerTracer.StartedSpan startedSpan = this.tracer == null ? null : this.tracer.prepareSendMessage(ctx, record);
int len = this.len(record.value());
this.pending += len;
Promise<RecordMetadata> prom = ctx.promise();
try {
executor.execute(() -> {
this.producer.send(record, (metadata, err) -> {
// callback from Kafka IO thread
ctx.runOnContext(v1 -> {
synchronized (KafkaWriteStreamImpl.this) {
// if exception happens, no record written
if (err != null) {
if (this.exceptionHandler != null) {
Handler<Throwable> exceptionHandler = this.exceptionHandler;
ctx.runOnContext(v2 -> exceptionHandler.handle(err));
}
}
long lowWaterMark = this.maxSize / 2;
this.pending -= len;
if (this.pending < lowWaterMark && this.drainHandler != null) {
Handler<Void> drainHandler = this.drainHandler;
this.drainHandler = null;
ctx.runOnContext(drainHandler);
}
}
});
if (err != null) {
if (startedSpan != null) {
startedSpan.fail(ctx, err);
}
prom.fail(err);
} else {
if (startedSpan != null) {
startedSpan.finish(ctx);
}
prom.complete(metadata);
}
});
});
} catch (Throwable e) {
synchronized (KafkaWriteStreamImpl.this) {
if (this.exceptionHandler != null) {
Handler<Throwable> exceptionHandler = this.exceptionHandler;
ctx.runOnContext(v3 -> exceptionHandler.handle(e));
}
}
if (startedSpan != null) {
startedSpan.fail(ctx, e);
}
prom.fail(e);
}
return prom.future();
}
it is not clear what is happening since we are already in an execute blocking block, what is the actual issue we are trying to fix ? does it mean we should avoid execute blocking with kafka ?