kotlin-kafka
kotlin-kafka copied to clipboard
Kafka bindings for Kotlin `suspend`, and Kafka streaming operators for KotlinX Flow.
I would appreciate the possibility of being able to use the underlying consumer for configuration of, for example, metrics. Something similar to this: https://github.com/reactor/reactor-kafka/blob/main/src/main/java/reactor/kafka/receiver/MicrometerConsumerListener.java
Hi, I'm trying to user KafkaReceiver inside Ktor 2. I'm not quite sure what is the best way to start KafkaReciever with Ktor. What I did was to create an...
```console javax.management.InstanceAlreadyExistsException: kafka.admin.client:type=app-info,id=test-kafka-admin-client at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) at org.apache.kafka.clients.admin.KafkaAdminClient.(KafkaAdminClient.java:602) at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:544) at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:488) at org.apache.kafka.clients.admin.Admin.create(Admin.java:134) at io.github.nomisRev.kafka.AdminKt.Admin(Admin.kt:43) at io.github.nomisrev.kafka.KafkaSpec.committedCount(KafkaSpec.kt:154) at...
Users have reported that in some cases you don't want `kotlin-kafka` to automatically pause the partitions in case the downstream cannot process the events fast enough but rather you want...
I thought the problem in on KafkaReceiver but it is the default polling timeout was too low (100ms). When I changed it, it worked. So we have extra tests 😄...
This PR contains the following updates: | Package | Change | Age | Adoption | Passing | Confidence | |---|---|---|---|---|---| | org.jetbrains.kotlin.jvm | `1.9.23` -> `2.0.20` | [data:image/s3,"s3://crabby-images/cc9d4/cc9d4bafd06793ff203356c7d6b25bac39e48182" alt="age"](https://docs.renovatebot.com/merge-confidence/) | [data:image/s3,"s3://crabby-images/2e671/2e67166f80f46951d3b1ae70f07d335b8cf601e4" alt="adoption"](https://docs.renovatebot.com/merge-confidence/)...
Hello! First of all, thank you for this library! I tried to update it from 0.3.1 to 0.4.0, but the consumer is now stuck during initialization. After some troubleshooting, it...
Here, when the channel successfully sends the records, the poll() function gets called and the current frame stuck in the stack. Therefore, the stack will keep growing infinitely if the...
Sometimes settings are given as `Properties`. Having to read, decode and pass individual parameters to the any Settings class is too much work. My request is to add a function...
Hello The `io.github.nomisRev.kafka.publisher.KafkaPublisher` interface in this library exposes [`metrics()`](https://github.com/nomisRev/kotlin-kafka/blob/0.4.0/src/main/kotlin/io/github/nomisRev/kafka/publisher/KafkaPublisher.kt#L135C3-L135C56) function to get metrics from the underlying `org.apache.kafka.clients.producer.KafkaProducer`. Unfortunately, the `io.github.nomisRev.kafka.receiver.KafkaReceiver` interface does not expose anything similar to obtain metrics. This...