Kit.ServiceDiscovery
Kit.ServiceDiscovery copied to clipboard
IServiceSubscriber per serviceName and RoundRobin per serviceName
I think it's better to use one IServiceSubscriber
instance per service, and the same as RoundRobinLoadBalancer
,so in my project, I use the ·ServiceSubscriberFactory· to cache the IServiceSubscriber and in RoundRobinLoadBalancer
i cache index, is the usage right?
internal class ServiceSubscriberFactory : IServiceSubscriberFactory
{
private readonly IConsulServiceSubscriberFactory _consulServiceSubscriberFactory;
private readonly ICacheServiceSubscriberFactory _cacheServiceSubscriberFactory;
private readonly ConcurrentDictionary<string, IPollingServiceSubscriber> _subscribersCache;
public ServiceSubscriberFactory(
IConsulServiceSubscriberFactory consulServiceSubscriberFactory,
ICacheServiceSubscriberFactory cacheServiceSubscriberFactory
)
{
_consulServiceSubscriberFactory = consulServiceSubscriberFactory;
_cacheServiceSubscriberFactory = cacheServiceSubscriberFactory;
_subscribersCache = new ConcurrentDictionary<string, IPollingServiceSubscriber>();
}
public IPollingServiceSubscriber CreateSubscriber(string servicName)
{
return CreateSubscriber(servicName, DiscoveryOptions.Default, ThrottleSubscriberOptions.Default);
}
public IPollingServiceSubscriber CreateSubscriber(string serviceName, DiscoveryOptions discoverOptions, ThrottleSubscriberOptions throttleOptions)
{
IPollingServiceSubscriber subscriber;
if (_subscribersCache.TryGetValue(serviceName, out subscriber))
return subscriber;
var consulSubscriberOptions = discoverOptions.ToSubscriberOptions();
var consulSubscriber = _consulServiceSubscriberFactory.CreateSubscriber(serviceName, consulSubscriberOptions, true);
var throttleSubscriber = new ThrottleServiceSubscriber(consulSubscriber, throttleOptions.MaxUpdatesPerPeriod, throttleOptions.MaxUpdatesPeriod);
subscriber = _cacheServiceSubscriberFactory.CreateSubscriber(throttleSubscriber);
_subscribersCache.TryAdd(serviceName, subscriber);
return subscriber;
}
}
and roundrobin
public class RoundRobinLoadBalancerPolicy : ILoadBalancerPolicy
{
private readonly IServiceSubscriber _subscriber;
private readonly SemaphoreSlim _lock = new SemaphoreSlim(1, 1);
private int _index;
private static ConcurrentDictionary<int, int> _indexCache = new ConcurrentDictionary<int, int>();
public RoundRobinLoadBalancerPolicy(IServiceSubscriber subscriber)
{
_subscriber = subscriber;
}
public async Task<ServiceDescriptor> PickEndpoint(CancellationToken ct = default(CancellationToken))
{
var endpoints = await _subscriber.Endpoints(ct).ConfigureAwait(false);
if (endpoints.Count() == 0)
{
return null;
}
var hashCode = _subscriber.GetHashCode();
await _lock.WaitAsync(ct).ConfigureAwait(false);
try
{
_index = _indexCache.GetOrAdd(hashCode, 0);
if (_index >= endpoints.Count())
{
_index = 0;
}
var uri = endpoints.ElementAt(_index);
_index++;
return uri;
}
finally
{
_indexCache[hashCode] = _index;
_lock.Release();
}
}
}