retrofit-spring-boot-starter icon indicating copy to clipboard operation
retrofit-spring-boot-starter copied to clipboard

负载均衡方案

Open melin opened this issue 2 months ago • 0 comments

image

Server 注册服务host:port

import com.gitee.melin.bee.util.NetUtils;
import com.gitee.melin.bee.util.ThreadUtils;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@Service
public class RedisDiscoveryClusterMember implements InitializingBean, DisposableBean {

    private static final Logger LOG = LoggerFactory.getLogger(RedisDiscoveryClusterMember.class);

    private final AtomicBoolean running = new AtomicBoolean(true);

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    private Environment environment;

    @Value("${server.port}")
    private int serverPort;

    @Override
    public void destroy() throws Exception {
        running.getAndSet(false);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        final String profile;
        if (ArrayUtils.contains(environment.getActiveProfiles(), "dev")) {
            profile = "dev";
        } else if (ArrayUtils.contains(environment.getActiveProfiles(), "test")) {
            profile = "test";
        } else if (ArrayUtils.contains(environment.getActiveProfiles(), "production")) {
            profile = "production";
        } else {
            throw new IllegalArgumentException("profile 值不正确");
        }

        final String hostName = NetUtils.getLocalHost() + ":" + serverPort;
        final String key = "superior-member-" + profile + "_" + hostName;
        redisTemplate.opsForValue().set(key, hostName, 2, TimeUnit.SECONDS);

        ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("update-jobserver-node");
        executorService.submit(() -> {
            while (running.get()) {
                try {
                    redisTemplate.opsForValue().set(key, hostName, 2, TimeUnit.SECONDS);

                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (Throwable e) {
                    LOG.error("更新集群成员信息失败: " + e.getMessage());
                }
            }
        });
    }
}

** Client 负责均衡相关代码**

import com.netflix.loadbalancer.Server;
import com.netflix.loadbalancer.ServerList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

@Service
public class DiscoveryServerList implements ServerList<Server> {

    private static final Logger LOG = LoggerFactory.getLogger(DiscoveryServerList.class);

    @Autowired
    private RedisDiscoveryClusterMember clusterMember;

    @Override
    public List<Server> getInitialListOfServers() {
        return getServers();
    }

    @Override
    public List<Server> getUpdatedListOfServers() {
        return getServers();
    }

    private List<Server> getServers() {
        List<String> uris = clusterMember.getSuperiorUris();

        if (uris.size() > 0) {
            return uris.stream().map(Server::new).collect(Collectors.toList());
        } else {
            return Collections.EMPTY_LIST;
        }
    }
}
import com.netflix.loadbalancer.IPing;
import com.netflix.loadbalancer.Server;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StopWatch;

import java.io.IOException;

public class SuperiorHealthCheck implements IPing {

    private static final Logger LOGGER = LoggerFactory.getLogger(SuperiorHealthCheck.class);

    private static final RequestConfig requestConfig = RequestConfig.custom().
            setConnectTimeout(3 * 1000).
            setConnectionRequestTimeout(3 * 1000).
            setSocketTimeout(3 * 1000).build();

    private String pingAppendString = "";

    private boolean isSecure = false;

    private final String expectedContent = null;

    public SuperiorHealthCheck() {}

    public SuperiorHealthCheck(boolean isSecure, String pingAppendString) {
        this.isSecure = isSecure;
        this.pingAppendString = (pingAppendString != null) ? pingAppendString : "";
    }

    public void setPingAppendString(String pingAppendString) {
        this.pingAppendString = (pingAppendString != null) ? pingAppendString : "";
    }

    public String getPingAppendString() {
        return pingAppendString;
    }

    public String getExpectedContent() {
        return expectedContent;
    }

    @Override
    public boolean isAlive(Server server) {
        String uri = "";
        if (isSecure) {
            uri = "https://";
        } else {
            uri = "http://";
        }
        uri += server.getId();
        uri += getPingAppendString();

        boolean isAlive = false;
        HttpUriRequest getRequest = new HttpGet(uri);
        StopWatch watch = new StopWatch();
        watch.start();
        try (CloseableHttpClient httpClient = HttpClientBuilder.create()
                .setDefaultRequestConfig(requestConfig).build()) {

            HttpResponse response = httpClient.execute(getRequest);
            String content = EntityUtils.toString(response.getEntity());
            isAlive = (response.getStatusLine().getStatusCode() == 200);
            if (getExpectedContent() != null) {
                LOGGER.debug("content:" + content);
                if (content == null) {
                    isAlive = false;
                } else {
                    if (content.equals(getExpectedContent())) {
                        isAlive = true;
                    } else {
                        isAlive = false;
                    }
                }
            }
        } catch (IOException e) {
            watch.stop();
            LOGGER.debug("ping server failure: {}, times: {}ms", uri, watch.getTotalTimeMillis());
        } finally {
            getRequest.abort();
        }

        return isAlive;
    }
}
import com.github.lianjiatech.retrofit.spring.boot.core.ServiceInstanceChooser;
import io.github.melin.jobserver.spark.api.JobServerException;
import com.netflix.loadbalancer.*;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.net.URI;

@Service
public class SuperiorServiceInstanceChooser implements ServiceInstanceChooser, InitializingBean {

    private static final long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;

    private static final int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 3 * 1000; // msecs;

    private ILoadBalancer loadBalancer;

    @Autowired
    private DiscoveryServerList discoveryServerList;

    @Override
    public void afterPropertiesSet() throws Exception {
        SuperiorHealthCheck healthCheck = new SuperiorHealthCheck();
        healthCheck.setPingAppendString("/ok");
        loadBalancer = LoadBalancerBuilder.newBuilder()
                .withRule(new RoundRobinRule())
                .withPing(healthCheck)
                .withDynamicServerList(discoveryServerList)
                .withServerListUpdater(new PollingServerListUpdater(LISTOFSERVERS_CACHE_UPDATE_DELAY,
                        LISTOFSERVERS_CACHE_REPEAT_INTERVAL))
                .buildDynamicServerListLoadBalancerWithUpdater();
    }

    @Override
    public URI choose(String serviceId) {
        Server server = loadBalancer.chooseServer("sparkjobserver");
        if (server != null) {
            return URI.create(server.getScheme() + "://" + server.getHostPort());
        } else {
            throw new JobServerException("不能自动探查到 superior server");
        }
    }
}

melin avatar Jun 06 '24 14:06 melin