kop icon indicating copy to clipboard operation
kop copied to clipboard

Analyze (and remove) the usages of get() or join() method of a CompletableFuture

Open BewareMyPower opened this issue 4 years ago • 3 comments

Calling synchronous wait method like get() or join() on a CompletableFuture is dangerous because it could lead to a deadlock issue easily, especially when they are called in an I/O thread.

See following cases:

  • https://github.com/streamnative/kop/pull/931
  • https://github.com/streamnative/kop/pull/556

BewareMyPower avatar Nov 25 '21 03:11 BewareMyPower

Using a simple grep we can see

$ find kafka-impl/src/main -name "*.java" | xargs grep -n "\.join()"
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java:86:                                future.join().deleteExpiredCursor(current, expirePeriodMillis);
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java:818:                                        producer.join().getTopic());
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java:826:                                        reader.join().getTopic());
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java:820:                metadataConsumer.join().getTopic());
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java:826:                    metadataConsumer.join().getTopic(), message.getMessageId());
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java:895:                                bk, metadataConsumer.join().getTopic());
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java:971:                    metadataConsumer.join().getTopic(), completeCause);
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java:1174:                                producer.join().getTopic());
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.java:1182:                                reader.join().getTopic());
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopEventManager.java:162:        getBrokers(metadataStore.getChildren(getBrokersChangePath()).join(),

I see join() is used frequently in group coordinator and transaction coordinator.

Take the usage in KafkaTopicManager for example:

                        TCM_CACHE.forEach(future -> {
                            if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
                                future.join().deleteExpiredCursor(current, expirePeriodMillis);
                            }
                        });

The join() is guaranteed to return immediately because the returned value of isDone() is verified to be true. It looks like we'll do nothing for a pending future.

BewareMyPower avatar Nov 25 '21 03:11 BewareMyPower

Here are the list of the get() calls except those are used only in tests.

1. KafkaProtocolHandler#initTransactionCoordinator

        transactionCoordinator.startup(kafkaConfig.isEnableTransactionalIdExpiration()).get();

2. KafkaRequestHandler#validateTenantAccessForSession

            Boolean granted = authorize(AclOperation.ANY,
                    Resource.of(ResourceType.TENANT, currentTenant), session)
                    .get();

3. KopEventManager#DeleteTopicsEvent#process

                List<String> topicsDeletions = metadataStore.getChildren(getDeleteTopicsPath()).get();

BewareMyPower avatar Nov 25 '21 04:11 BewareMyPower

The third get() call can be removed from #928

wenbingshen avatar Nov 25 '21 04:11 wenbingshen