Why are all the low-level (Java) Kafka AdminClient calls wrapped in `effectBlocking`?
AFAIK, and seeing that the Kafka AdminClient is using Java Futures (KafkaFuture implementing java.util.concurrent.Future and being based on CompletionStage since Kafka 3.1 or 3.2), why are all the calls made with this client wrapped in a effectBlocking?
Random example took from the code of zio-kafka:
/**
* List offset for the specified partitions.
*/
override def listOffsets(
topicPartitionOffsets: Map[TopicPartition, OffsetSpec],
options: Option[ListOffsetsOptions] = None
): Task[Map[TopicPartition, ListOffsetsResultInfo]] = {
val asJava = topicPartitionOffsets.bimap(_.asJava, _.asJava).asJava
fromKafkaFuture {
blocking.effectBlocking(
options
.fold(adminClient.listOffsets(asJava))(opts => adminClient.listOffsets(asJava, opts.asJava))
.all()
)
}
}.map(_.asScala.toMap.bimap(TopicPartition(_), ListOffsetsResultInfo(_)))
is it because of this synchronized:
?? 🤔
@guizmaii What shall we do with this one year old issue?
@svroonland I'm still waiting for an answer/to understand why we need this and if we should keep these usages of blocking I'd prefer to leave it open if that's ok for you
I suppose it was originally written that way as a precaution and probably after some actual experience of blocking in some circumstances.