micronaut-discovery-client
micronaut-discovery-client copied to clipboard
Support watching Consul for changes
When combining @Value with a Configuration Server like Consul, the changes to the properties in Consul are not propagated automatically to the beans that reference them
We can only force it using the /refresh endpoint that would reload all the beans annotated with Refreshable.
Micronaut could benefit if a mechanism is in place in order to subcsribe to events Consul changes, in order to perform the same action, without the need of hitting the refresh endpoint
I haven't used Consul before and this feature sounds cool and all.. but I can't stop wondering; shouldn't it be rather "easy" (I hate this word lol) to have either 1) Consul ping the /refresh
endpoint or 2) put a class into the Micronaut app that subscribe to events from Consul and then internally publish a RefreshEvent
?
Regardless, whenever this feature does arrive it might be helpful if we could be selective in terms of what keys we subscribe to (not all configuration is relevant for all beans)?
any update about this feature request?
No update but contributors welcome!
This issue is still very relevant. We are developing our new services with Micronaut. Our Spring services support this useful feature, and it would be great if we could use this option with Micronaut
one for you @pgressa
You can look at https://github.com/rickfast/consul-client as example of subscription implementation
Hello !
I made this quick KV watcher for Micronaut, using Vertx-consul. Can you see some incorrect stuff, hidden issues or possible improvement ?
import static io.micronaut.context.env.Environment.DEFAULT_NAME;
import static io.micronaut.discovery.config.ConfigDiscoveryConfiguration.DEFAULT_PATH;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.Map;
import org.yaml.snakeyaml.Yaml;
import com.google.common.collect.Maps;
import io.micronaut.context.env.Environment;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.context.event.StartupEvent;
import io.micronaut.discovery.consul.ConsulConfiguration;
import io.micronaut.discovery.consul.condition.RequiresConsul;
import io.micronaut.runtime.context.scope.refresh.RefreshEvent;
import io.micronaut.runtime.event.annotation.EventListener;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.ext.consul.ConsulClientOptions;
import io.vertx.ext.consul.KeyValue;
import io.vertx.ext.consul.Watch;
import io.vertx.ext.consul.WatchResult;
import jakarta.inject.Singleton;
@Slf4j
@Singleton
@RequiresConsul
public class ConsulKVWatcher {
private final Environment environment;
private final ApplicationEventPublisher<RefreshEvent> eventPublisher;
private final ConsulConfiguration consulConfiguration;
private final Vertx vertx;
private final ConsulClientOptions consulClientOptions;
private final Map<String, Watch<KeyValue>> watchers = new HashMap<>();
private final Yaml yaml = new Yaml();
ConsulKVWatcher(final Environment environment,
final ApplicationEventPublisher<RefreshEvent> eventPublisher,
final ConsulConfiguration consulConfiguration,
final Vertx vertx,
final ConsulClientOptions consulClientOptions) {
this.environment = environment;
this.eventPublisher = eventPublisher;
this.consulConfiguration = consulConfiguration;
this.vertx = vertx;
this.consulClientOptions = consulClientOptions;
}
@EventListener
public void onStartupEvent(final StartupEvent event) {
// force 1 refresh so all the env variables or inject properties are not taken into account on the first refreshContext()
environment.refresh();
listenForConsulChanges();
}
private void listenForConsulChanges() {
log.info("Monitoring Consul changes");
final var applicationName = consulConfiguration.getServiceId().orElseThrow();
final var configurationPath = consulConfiguration.getConfiguration().getPath()
.map(path -> {
if (!path.endsWith("/")) {
path += "/";
}
return path;
})
.orElse(DEFAULT_PATH);
// Configuration shared by all applications
final var commonConfigPath = configurationPath + DEFAULT_NAME;
addKeyWatcher(commonConfigPath);
// Application-specific configuration
final var applicationSpecificPath = configurationPath + applicationName;
addKeyWatcher(applicationSpecificPath);
for (final var activeName : environment.getActiveNames()) {
// Configuration shared by all applications by active environments
addKeyWatcher(toProfiledPath(commonConfigPath, activeName));
// Application-specific configuration by active environments
addKeyWatcher(toProfiledPath(applicationSpecificPath, activeName));
}
}
private static String toProfiledPath(final String resource, final String activeName) {
return resource + "," + activeName;
}
private void addKeyWatcher(final String key) {
log.debug("Watching key [{}] in the KV store ", key);
this.watchers.put(key, Watch.key(key, vertx, consulClientOptions)
.setHandler(handle(key))
.start());
}
private Handler<WatchResult<KeyValue>> handle(final String key) {
return event -> {
if (event.succeeded()) {
try {
onNext(key, event.prevResult(), event.nextResult());
} catch (final Exception e) {
onError(key, e);
}
} else {
onError(key, event.cause());
}
};
}
void onNext(final String key, final KeyValue pre, final KeyValue next) {
if (pre != null) {
log.debug("Key [{}] updated", key);
final Map<String, Object> previousYaml = yaml.load(pre.getValue());
final Map<String, Object> nextYaml = yaml.load(next.getValue());
final var difference = Maps.difference(previousYaml, nextYaml);
if (!difference.areEqual()) {
final var previousProperties = environment.refreshAndDiff();
if (!previousProperties.isEmpty()) {
log.debug("Configuration has been updated, publishing RefreshEvent.");
eventPublisher.publishEvent(new RefreshEvent(previousProperties));
}
} else {
log.debug("No changes");
}
}
}
void onError(final String key, final Throwable error) {
log.error("An error occurred while listening to config changes with key: {}", key, error);
}
@PreDestroy
void close() {
watchers.forEach((key, watcher) -> {
log.debug("Stop watching key [{}]", key);
watcher.stop();
});
}
}
import lombok.extern.slf4j.Slf4j;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.discovery.consul.ConsulConfiguration;
import io.micronaut.discovery.consul.condition.RequiresConsul;
import io.vertx.core.Vertx;
import io.vertx.ext.consul.ConsulClientOptions;
@Slf4j
@Factory
@RequiresConsul
public class ConsulFactory {
@Bean
Vertx vertx() {
return Vertx.vertx();
}
@Bean
ConsulClientOptions consulClientOptions(final ConsulConfiguration consulConfiguration) {
final var consulClientOptions = new ConsulClientOptions()
.setHost(consulConfiguration.getHost())
.setPort(consulConfiguration.getPort());
consulConfiguration.getAslToken().ifPresent(consulClientOptions::setAclToken);
consulConfiguration.getConfiguration().getDatacenter().ifPresent(consulClientOptions::setDc);
consulConfiguration.getConnectTimeout().ifPresent(connectTimeout -> consulClientOptions.setConnectTimeout(
Math.toIntExact(connectTimeout.toMillis())));
return consulClientOptions;
}
}
Some update on my implementation: to avoid to fetch each time all the configuration in consul, I tried to update it
...
final var difference = Maps.difference(previousYaml, nextYaml);
if (!difference.areEqual()) {
updatePropertySource(key, nextYaml);
publishDifferences(difference);
} else {
log.debug("No changes");
}
}
}
with
private void updatePropertySource(final String key, final Map<String, Object> nextYaml) {
final var sourceName = resolvePropertySourceName(key);
final var propertySourceName = ConsulClient.SERVICE_ID + '-' + sourceName;
final var propertySources = environment.getPropertySources();
final var priority = propertySources
.stream()
.filter(propertySource -> propertySource.getName().equals(propertySourceName))
.findFirst()
.map(PropertySource::getOrder)
.orElse(EnvironmentPropertySource.POSITION);
final var propertySource = PropertySource.of(propertySourceName, nextYaml, priority);
environment.addPropertySource(propertySource);
}
private String resolvePropertySourceName(final String key) {
final var configurationPath = getConfigurationPath();
final var propertySourceName = key.replace(configurationPath, "");
final var tokens = propertySourceName.split(",");
if (tokens.length == 1) {
return propertySourceName;
}
final var name = tokens[0];
final var envName = tokens[1];
return name + '[' + envName + ']';
}
and
private void publishDifferences(final MapDifference<String, Object> difference) {
log.debug("Configuration has been updated, publishing RefreshEvent.");
final var changes = difference.entriesDiffering()
.entrySet()
.stream()
.map(entry -> Map.entry(entry.getKey(), entry.getValue().leftValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
eventPublisher.publishEvent(new RefreshEvent(changes));
}
but I have an issue:
GIVEN. I have application.yaml {key.value_1=hello, key.value_2=world}
and application,dev.yaml {key.value_2=univers}
configurations.
WHEN. I update key.value_2
in application.yaml,
THEN. It will be taken into account instead of keeping the value from application,dev.yaml
Any idea why, and how to prevent it ?
OK, I found the reason
private void updatePropertySource(final String key, final Map<String, Object> nextYaml) {
log.debug("Updating context with new configuration from [{}]", key);
final var sourceName = resolvePropertySourceName(key);
final var propertySourceName = ConsulClient.SERVICE_ID + '-' + sourceName;
final var updatedPropertySources = new ArrayList<PropertySource>();
for (final var propertySource : environment.getPropertySources()) {
if (propertySource.getName().equals(propertySourceName)) {
// creating a new PropertySource with new values but keeping the order
updatedPropertySources.add(PropertySource.of(propertySourceName, nextYaml, propertySource.getOrder()));
} else {
updatedPropertySources.add(propertySource);
}
}
updatedPropertySources.stream()
// /!\ re-setting all the propertySources sorted by Order, to keep precedence
.sorted(Comparator.comparing(PropertySource::getOrder))
.forEach(environment::addPropertySource);
}
private String resolvePropertySourceName(final String key) {
final var configurationPath = getConfigurationPath();
final var propertySourceName = key.replace(configurationPath, "");
final var tokens = propertySourceName.split(",");
if (tokens.length == 1) {
return propertySourceName;
}
final var name = tokens[0];
final var envName = tokens[1];
return name + '[' + envName + ']';
}
I have to keep the order of the PropertySources
Hope that will help