kop
kop copied to clipboard
Analyze (and remove) the usages of get() or join() method of a CompletableFuture
Calling synchronous wait method like get() or join() on a CompletableFuture is dangerous because it could lead to a deadlock issue easily, especially when they are called in an I/O thread.
See following cases:
- https://github.com/streamnative/kop/pull/931
- https://github.com/streamnative/kop/pull/556
Using a simple grep we can see
$ find kafka-impl/src/main -name "*.java" | xargs grep -n "\.join()"
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java:86: future.join().deleteExpiredCursor(current, expirePeriodMillis);
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java:818: producer.join().getTopic());
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java:826: reader.join().getTopic());
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java:820: metadataConsumer.join().getTopic());
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java:826: metadataConsumer.join().getTopic(), message.getMessageId());
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java:895: bk, metadataConsumer.join().getTopic());
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java:971: metadataConsumer.join().getTopic(), completeCause);
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java:1174: producer.join().getTopic());
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java:1182: reader.join().getTopic());
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java:162: getBrokers(metadataStore.getChildren(getBrokersChangePath()).join(),
I see join() is used frequently in group coordinator and transaction coordinator.
Take the usage in KafkaTopicManager for example:
TCM_CACHE.forEach(future -> {
if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
future.join().deleteExpiredCursor(current, expirePeriodMillis);
}
});
The join() is guaranteed to return immediately because the returned value of isDone() is verified to be true. It looks like we'll do nothing for a pending future.
Here are the list of the get() calls except those are used only in tests.
1. KafkaProtocolHandler#initTransactionCoordinator
transactionCoordinator.startup(kafkaConfig.isEnableTransactionalIdExpiration()).get();
2. KafkaRequestHandler#validateTenantAccessForSession
Boolean granted = authorize(AclOperation.ANY,
Resource.of(ResourceType.TENANT, currentTenant), session)
.get();
3. KopEventManager#DeleteTopicsEvent#process
List<String> topicsDeletions = metadataStore.getChildren(getDeleteTopicsPath()).get();
The third get() call can be removed from #928