smallrye-reactive-messaging
smallrye-reactive-messaging copied to clipboard
Connect to different brokers per topic - no events are consumed
I'm using Smallrye-Kafka in my Quarkus application for connecting to kafka. Until now the requirement was "having a single bootstrap-server which is valid for all channels". The configuration and event consuming worked without problems. This requirement changed and now I'm trying to configure the target broker per channel.
I've configured my application.properties
and was able to connect to different brokers per topic. The startup log shows a successful connection to my different brokers without any errors. The correct topics are also mentioned in the log, so everything looks perfect. But the problem is, that no events are consumed from the topics.
When I include some error in the configuration, the connection fails on startup. So I'm sure that my configuration is read and the connection to my brokers are established correctly.
My application.properties contains some some config options which are valid for all channels. Beside that there are two topics which are referencing to a kafka-configuration
option which is mentioned in the official documentation.
I'm using the same consumer-group for both topics by purpose. There are multiple cluster-nodes of my application where each node uses a different consumer-group.
My code is shown below:
# Kafka common
kafka.health-enabled=false
kafka.session.timeout.ms=45000
# Common configuration used for all channels
mp.messaging.connector=smallrye-kafka
mp.messaging.connector.smallrye-kafka.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
mp.messaging.connector.smallrye-kafka.value-deserialization-failure-handler=kafka-value-failure-handler
mp.messaging.connector.smallrye-kafka.fail-on-deserialization-failure=false
mp.messaging.connector.smallrye-kafka.auto.offset.reset=earliest
mp.messaging.connector.smallrye-kafka.use.latest.version=true
mp.messaging.connector.smallrye-kafka.auto.register.schemas=false
mp.messaging.connector.smallrye-kafka.connections.max.idle.ms=-1
mp.messaging.connector.smallrye-kafka.failure-strategy=ignore
# Topic 1
mp.messaging.incoming.my-channel-1.topic=my-topic-1
mp.messaging.incoming.my-channel-1.group.id=my-consumer-group
mp.messaging.incoming.my-channel-1.kafka-configuration=my-configuration-1
# Topic 2
mp.messaging.incoming.my-channel-2.topic=my-topic-2
mp.messaging.incoming.my-channel-2.group.id=my-consumer-group
mp.messaging.incoming.my-channel-2.kafka-configuration=my-configuration-2
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.HashMap;
import java.util.Map;
@Singleton
@Slf4j
public class KafkaAuthConfiguration {
private static final String SASL_CONFIGURATION_STRING = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\"password=\"%s\";";
@Produces
@Identifier("my-configuration-1")
@Singleton
public Map<String, Object> config1() {
final Map<String, Object> config = createConfig("clusterUrl_1", "clusterApiKey_1", "clusterApiSecret_1");
log.info("Created kafka config 1");
return config;
}
@Produces
@Identifier("my-configuration-2")
@Singleton
public Map<String, Object> config2() {
final Map<String, Object> config = createConfig("clusterUrl_2", "clusterApiKey_2", "clusterApiSecret_2");
log.info("Created kafka config 2");
return config;
}
private Map<String, Object> createConfig(final String clusterUrl, final String apiKey, final String apiSecret) {
final HashMap<String, Object> config = new HashMap<>(4);
config.put("bootstrap.servers", clusterUrl);
config.put("security.protocol", "SASL_SSL");
config.put("sasl.mechanism", "PLAIN");
config.put("ssl.endpoint.identification.algorithm", "https");
config.put("sasl.jaas.config", String.format(SASL_CONFIGURATION_STRING, apiKey, apiSecret));
return config;
}
}
Does somebody have a hint for me what's missing in the configuration so that no events are consumed from by brokers?