micronaut-discovery-client icon indicating copy to clipboard operation
micronaut-discovery-client copied to clipboard

Support watching Consul for changes

Open mfarachepiksel opened this issue 6 years ago • 9 comments

When combining @Value with a Configuration Server like Consul, the changes to the properties in Consul are not propagated automatically to the beans that reference them

We can only force it using the /refresh endpoint that would reload all the beans annotated with Refreshable.

Micronaut could benefit if a mechanism is in place in order to subcsribe to events Consul changes, in order to perform the same action, without the need of hitting the refresh endpoint

mfarachepiksel avatar Jul 04 '18 12:07 mfarachepiksel

I haven't used Consul before and this feature sounds cool and all.. but I can't stop wondering; shouldn't it be rather "easy" (I hate this word lol) to have either 1) Consul ping the /refresh endpoint or 2) put a class into the Micronaut app that subscribe to events from Consul and then internally publish a RefreshEvent?

Regardless, whenever this feature does arrive it might be helpful if we could be selective in terms of what keys we subscribe to (not all configuration is relevant for all beans)?

martinandersson avatar Oct 04 '19 09:10 martinandersson

any update about this feature request?

eriknyk avatar Apr 24 '20 04:04 eriknyk

No update but contributors welcome!

graemerocher avatar Apr 24 '20 05:04 graemerocher

This issue is still very relevant. We are developing our new services with Micronaut. Our Spring services support this useful feature, and it would be great if we could use this option with Micronaut

glockbender avatar Nov 25 '21 18:11 glockbender

one for you @pgressa

graemerocher avatar Nov 25 '21 19:11 graemerocher

You can look at https://github.com/rickfast/consul-client as example of subscription implementation

glockbender avatar Nov 26 '21 08:11 glockbender

Hello !

I made this quick KV watcher for Micronaut, using Vertx-consul. Can you see some incorrect stuff, hidden issues or possible improvement ?

import static io.micronaut.context.env.Environment.DEFAULT_NAME;
import static io.micronaut.discovery.config.ConfigDiscoveryConfiguration.DEFAULT_PATH;

import lombok.extern.slf4j.Slf4j;

import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.Map;

import org.yaml.snakeyaml.Yaml;

import com.google.common.collect.Maps;
import io.micronaut.context.env.Environment;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.context.event.StartupEvent;
import io.micronaut.discovery.consul.ConsulConfiguration;
import io.micronaut.discovery.consul.condition.RequiresConsul;
import io.micronaut.runtime.context.scope.refresh.RefreshEvent;
import io.micronaut.runtime.event.annotation.EventListener;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.ext.consul.ConsulClientOptions;
import io.vertx.ext.consul.KeyValue;
import io.vertx.ext.consul.Watch;
import io.vertx.ext.consul.WatchResult;
import jakarta.inject.Singleton;

@Slf4j
@Singleton
@RequiresConsul
public class ConsulKVWatcher {

    private final Environment environment;
    private final ApplicationEventPublisher<RefreshEvent> eventPublisher;
    private final ConsulConfiguration consulConfiguration;
    private final Vertx vertx;
    private final ConsulClientOptions consulClientOptions;

    private final Map<String, Watch<KeyValue>> watchers = new HashMap<>();
    private final Yaml yaml = new Yaml();

    ConsulKVWatcher(final Environment environment,
            final ApplicationEventPublisher<RefreshEvent> eventPublisher,
            final ConsulConfiguration consulConfiguration,
            final Vertx vertx,
            final ConsulClientOptions consulClientOptions) {
        this.environment = environment;
        this.eventPublisher = eventPublisher;
        this.consulConfiguration = consulConfiguration;
        this.vertx = vertx;
        this.consulClientOptions = consulClientOptions;
    }

    @EventListener
    public void onStartupEvent(final StartupEvent event) {
        // force 1 refresh so all the env variables or inject properties are not taken into account on the first refreshContext()
        environment.refresh();
        listenForConsulChanges();
    }

    private void listenForConsulChanges() {
        log.info("Monitoring Consul changes");
        final var applicationName = consulConfiguration.getServiceId().orElseThrow();
        final var configurationPath = consulConfiguration.getConfiguration().getPath()
                .map(path -> {
                    if (!path.endsWith("/")) {
                        path += "/";
                    }

                    return path;
                })
                .orElse(DEFAULT_PATH);

        // Configuration shared by all applications
        final var commonConfigPath = configurationPath + DEFAULT_NAME;
        addKeyWatcher(commonConfigPath);

        // Application-specific configuration
        final var applicationSpecificPath = configurationPath + applicationName;
        addKeyWatcher(applicationSpecificPath);

        for (final var activeName : environment.getActiveNames()) {
            // Configuration shared by all applications by active environments
            addKeyWatcher(toProfiledPath(commonConfigPath, activeName));
            // Application-specific configuration by active environments
            addKeyWatcher(toProfiledPath(applicationSpecificPath, activeName));
        }
    }

    private static String toProfiledPath(final String resource, final String activeName) {
        return resource + "," + activeName;
    }

    private void addKeyWatcher(final String key) {
        log.debug("Watching key [{}] in the KV store ", key);
        this.watchers.put(key, Watch.key(key, vertx, consulClientOptions)
                .setHandler(handle(key))
                .start());
    }

    private Handler<WatchResult<KeyValue>> handle(final String key) {
        return event -> {
            if (event.succeeded()) {
                try {
                    onNext(key, event.prevResult(), event.nextResult());
                } catch (final Exception e) {
                    onError(key, e);
                }
            } else {
                onError(key, event.cause());
            }
        };
    }

    void onNext(final String key, final KeyValue pre, final KeyValue next) {
        if (pre != null) {
            log.debug("Key [{}] updated", key);
            final Map<String, Object> previousYaml = yaml.load(pre.getValue());
            final Map<String, Object> nextYaml = yaml.load(next.getValue());
            final var difference = Maps.difference(previousYaml, nextYaml);
            if (!difference.areEqual()) {
                final var previousProperties = environment.refreshAndDiff();
                if (!previousProperties.isEmpty()) {
                    log.debug("Configuration has been updated, publishing RefreshEvent.");
                    eventPublisher.publishEvent(new RefreshEvent(previousProperties));
                }
            } else {
                log.debug("No changes");
            }
        }
    }

    void onError(final String key, final Throwable error) {
        log.error("An error occurred while listening to config changes with key: {}", key, error);
    }

    @PreDestroy
    void close() {
        watchers.forEach((key, watcher) -> {
            log.debug("Stop watching key [{}]", key);
            watcher.stop();
        });

    }

}


import lombok.extern.slf4j.Slf4j;

import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.discovery.consul.ConsulConfiguration;
import io.micronaut.discovery.consul.condition.RequiresConsul;
import io.vertx.core.Vertx;
import io.vertx.ext.consul.ConsulClientOptions;

@Slf4j
@Factory
@RequiresConsul
public class ConsulFactory {

    @Bean
    Vertx vertx() {
        return Vertx.vertx();
    }

    @Bean
    ConsulClientOptions consulClientOptions(final ConsulConfiguration consulConfiguration) {
        final var consulClientOptions = new ConsulClientOptions()
                .setHost(consulConfiguration.getHost())
                .setPort(consulConfiguration.getPort());
        consulConfiguration.getAslToken().ifPresent(consulClientOptions::setAclToken);
        consulConfiguration.getConfiguration().getDatacenter().ifPresent(consulClientOptions::setDc);
        consulConfiguration.getConnectTimeout().ifPresent(connectTimeout -> consulClientOptions.setConnectTimeout(
                Math.toIntExact(connectTimeout.toMillis())));

        return consulClientOptions;
    }

}

FrogDevelopper avatar Feb 15 '22 16:02 FrogDevelopper

Some update on my implementation: to avoid to fetch each time all the configuration in consul, I tried to update it

            ...
            final var difference = Maps.difference(previousYaml, nextYaml);
            if (!difference.areEqual()) {
                updatePropertySource(key, nextYaml);
                publishDifferences(difference);
            } else {
                log.debug("No changes");
            }
        }
    }

with

    private void updatePropertySource(final String key, final Map<String, Object> nextYaml) {
        final var sourceName = resolvePropertySourceName(key);
        final var propertySourceName = ConsulClient.SERVICE_ID + '-' + sourceName;
        final var propertySources = environment.getPropertySources();
        final var priority = propertySources
                .stream()
                .filter(propertySource -> propertySource.getName().equals(propertySourceName))
                .findFirst()
                .map(PropertySource::getOrder)
                .orElse(EnvironmentPropertySource.POSITION);
        final var propertySource = PropertySource.of(propertySourceName, nextYaml, priority);

        environment.addPropertySource(propertySource);
    }

    private String resolvePropertySourceName(final String key) {
        final var configurationPath = getConfigurationPath();
        final var propertySourceName = key.replace(configurationPath, "");

        final var tokens = propertySourceName.split(",");
        if (tokens.length == 1) {
            return propertySourceName;
        }
        final var name = tokens[0];
        final var envName = tokens[1];

        return name + '[' + envName + ']';
    }

and

    private void publishDifferences(final MapDifference<String, Object> difference) {
        log.debug("Configuration has been updated, publishing RefreshEvent.");
        final var changes = difference.entriesDiffering()
                .entrySet()
                .stream()
                .map(entry -> Map.entry(entry.getKey(), entry.getValue().leftValue()))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        eventPublisher.publishEvent(new RefreshEvent(changes));
    }

but I have an issue: GIVEN. I have application.yaml {key.value_1=hello, key.value_2=world} and application,dev.yaml {key.value_2=univers}configurations. WHEN. I update key.value_2 in application.yaml, THEN. It will be taken into account instead of keeping the value from application,dev.yaml

Any idea why, and how to prevent it ?

FrogDevelopper avatar Feb 18 '22 14:02 FrogDevelopper

OK, I found the reason

    private void updatePropertySource(final String key, final Map<String, Object> nextYaml) {
        log.debug("Updating context with new configuration from [{}]", key);

        final var sourceName = resolvePropertySourceName(key);
        final var propertySourceName = ConsulClient.SERVICE_ID + '-' + sourceName;
        final var updatedPropertySources = new ArrayList<PropertySource>();
        for (final var propertySource : environment.getPropertySources()) {
            if (propertySource.getName().equals(propertySourceName)) {
                // creating a new PropertySource with new values but keeping the order
                updatedPropertySources.add(PropertySource.of(propertySourceName, nextYaml, propertySource.getOrder()));
            } else {
                updatedPropertySources.add(propertySource);
            }
        }

        updatedPropertySources.stream()
                // /!\ re-setting all the propertySources sorted by Order, to keep precedence
                .sorted(Comparator.comparing(PropertySource::getOrder))
                .forEach(environment::addPropertySource);
    }

    private String resolvePropertySourceName(final String key) {
        final var configurationPath = getConfigurationPath();
        final var propertySourceName = key.replace(configurationPath, "");

        final var tokens = propertySourceName.split(",");
        if (tokens.length == 1) {
            return propertySourceName;
        }
        final var name = tokens[0];
        final var envName = tokens[1];

        return name + '[' + envName + ']';
    }

I have to keep the order of the PropertySources

Hope that will help

FrogDevelopper avatar Feb 18 '22 15:02 FrogDevelopper