traefik-proxy icon indicating copy to clipboard operation
traefik-proxy copied to clipboard

Redis sentinel support

Open CarlosDominguezBecerril opened this issue 1 year ago • 16 comments

Proposed change

Hey, thanks for the people working on this repo, amazing work

My understanding is that we want to use traefik + redis to offer high availability on the proxy. Right now Redis is a single point of failure since the current implementation only allows to connect directly to the instance hosting it, so when redis is down all the components that rely directly or indirectly on it are going to stop working

I would like to consider support for Redis Sentinel for real HA

Thanks

Alternative options

Who would use this feature?

(Optional): Suggest a solution

CarlosDominguezBecerril avatar Aug 28 '24 08:08 CarlosDominguezBecerril

Thanks for the suggestion. Do you know what client-side changes are required to support Redis Sentinel?

manics avatar Aug 28 '24 09:08 manics

I think it's a different constructor, but otherwise the same. But I also think this might not be necessary, as certainly traefik and I think also the Redis client should handle disruptions to the redis server with retries. and won't actually have availability affected. Redis being down doesn't disrupt the proxy, it should only disrupt changes to the proxy.

minrk avatar Aug 28 '24 09:08 minrk

It's a different constructor. I'm bringing this because I had some issue of getting spawned failed during initialization and I believe this was coming from my redis instance restarting due to automatic scale down in our k8s cluster and creating some inconsistent state. (Can't confirm anything of what I'm saying, currently investigating)

Logs:

Stopping <name> to avoid inconsistent state
<Failed to add  <name> to proxy!>
asyncio.exceptions.TimeoutError: Traefik route for /user/<name>/: not ready
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/jupyterhub/handlers/base.py", line 1142, in finish_user_spawn
    await self.proxy.add_user(user, server_name)
  File "/usr/local/lib/python3.10/dist-packages/jupyterhub/proxy.py", line 343, in add_user
    await self.add_route(
  File "/usr/local/lib/python3.10/dist-packages/jupyterhub_traefik_proxy/proxy.py", line 688, in add_route
    await self._wait_for_route(routespec)
  File "/usr/local/lib/python3.10/dist-packages/jupyterhub_traefik_proxy/proxy.py", line 351, in _wait_for_route
    await exponential_backoff(
  File "/usr/local/lib/python3.10/dist-packages/jupyterhub/utils.py", line 265, in exponential_backoff
    raise asyncio.TimeoutError(fail_message)
asyncio.exceptions.TimeoutError: Traefik route for /user/<name>/: not ready

This log like 20 times:

Traefik route for /user/<name>/: service__2Fuser_2F<name>_2F@redis not yet in service

CarlosDominguezBecerril avatar Aug 28 '24 11:08 CarlosDominguezBecerril

Got it, thanks. Do you happen to know how long it took your redis server to come back, and if the client was able to recover eventually without being restarted?

minrk avatar Aug 28 '24 13:08 minrk

So to reproduce this is as easy as deleting redis with kubectl delete pod redis and wait for a new pod to appear. I checked two things:

  • DNS records: looks like when I delete the pod I get assigned a new IP, but this looks to be working fine as far as I can tell. Looking for caching in case it was looking for a different IP address
  • My redis instance takes around 40 seconds to startup, so I change the Retry default value by c.TraefikRedisProxy.redis_client_kwargs = {"retry": Retry(ExponentialBackoff(cap=10), 20), "retry_on_error": [BusyLoadingError, ConnectionError, TimeoutError]} to allow more time (default is aroung ~30 seconds), but still the client never recovers

My redis instance has the following liveness and readiness probes

readinessProbePeriodSeconds: 10
livenessProbePeriodSeconds: 10
readinessProbeInitialDelaySeconds: 30
livenessProbeInitialDelaySeconds: 30
readinessFailureThreshold: 3
livenessFailureThreshold: 3

Regardless of how much I wait, the client never recovers and it requires me to restart hub

CarlosDominguezBecerril avatar Aug 28 '24 13:08 CarlosDominguezBecerril

Can you set c.TraefikProxy.check_route_timeout = 60? It looks like the Hub isn't having a problem talking to redis, but traefik is taking too long to load from redis. check_route_timeout governs this wait, which has a default of 30 seconds.

minrk avatar Aug 28 '24 14:08 minrk

I think Sentinel support makes plenty of sense, we just need to figure out what the config options should look like.

minrk avatar Aug 28 '24 14:08 minrk

Unfortunately, using c.TraefikProxy.check_route_timeout = 60 (alongside c.TraefikRedisProxy.redis_client_kwargs = {"retry": Retry(ExponentialBackoff(cap=10), 20), "retry_on_error": [BusyLoadingError, ConnectionError, TimeoutError]})) didn't work. I'm using jupyterhub==5.1.0, jupyterhub_traefik_proxy==2.0.0, proxy running traefik==2.9.5 and one node running redis==7.0.8.

I tried looking for more logs and I found these after deleting redis pod. Is it mandatory to have this root key? my redis node doesn't have persistence memory, so after a restart everything is lost

Aug 29 09:42:09.447
Getting redis keys []

Aug 29 09:42:09.448
Root key 'jupyterhub/' not found in {}

Aug 29 09:42:09.448
Fetching routes to check

Aug 29 09:42:09.449
Getting redis keys []

Aug 29 09:42:09.450
Root key 'jupyterhub/' not found in {}

Aug 29 09:42:09.450
Checking routes

CarlosDominguezBecerril avatar Aug 29 '24 07:08 CarlosDominguezBecerril

I don't think it will work without persistence. It is at least specified as required, along with keyspace notifications. Redis is not a stateless communication medium, it is the state of the proxy, so it should be persisted. Persistence is the main reason to use a KV store backend for the proxy.

We might be able to make it work (I don't think it does now, see #247, #242), but clearing the config stored in redis will definitely always be a massive disruption (more so than redis not running at all), so requiring persistence is the best way to ensure sensible behavior.

minrk avatar Aug 29 '24 09:08 minrk

Persistence would be best, but in cases where the storage options for it aren't good, could a sidecar container service be used to prompt the hub to sync the proxy by doing a POST /proxy when Redis starts up?

rcthomas avatar Aug 30 '24 02:08 rcthomas

That should be addressable with #247. Right now, it restores the routes but it doesn't restore the initial dynamic config, which is also required.

minrk avatar Aug 30 '24 06:08 minrk

OK. The sidecar hub prompt is a work-around we've used "successfully" with etcd, but hadn't tried it with Redis so hadn't run into #247 in that context. Hoping we don't have to resort to using it again...

rcthomas avatar Aug 30 '24 15:08 rcthomas

If you have persistence, it won't be necessary. Persistence greatly improves reliability.

But as part of #247 we could also look at using a watch to notice disruptions rather than needing to wait for a prod.

minrk avatar Sep 01 '24 09:09 minrk

So I have worked on implementing basic sentinel support but getting the same error as before, spawn failed + same consistent log Traefik route for /user/<name>/: service__2Fuser_2F<name>_2F@redis not yet in service. Previously adding the PVC worked as you mentioned, but again it is failing even if it has a PVC attached

Do you know what could be the issue? All key-values are stored in redis except when creating a new server. Here is my code, config, and redis keys:

import asyncio
from urllib.parse import urlparse

from jupyterhub_traefik_proxy.redis import TraefikRedisProxy
from jupyterhub_traefik_proxy.traefik_utils import deep_merge
from redis.exceptions import ConnectionError
from traitlets import Any, Int, List, Tuple, Unicode

try:
    from redis.asyncio.sentinel import Sentinel
except ImportError:
    raise ImportError(
        "Please install `redis` package to use traefik-proxy with redis sentinel"
    )


class TraefikRedisSentinelProxy(TraefikRedisProxy):

    provider_name = "redis_sentinel"

    sentinel_hosts = List(
        trait=Tuple,
        config=True,
        help="Sentinel hosts in the [(address, port), (address, port), ...] format",
    )
    master_name = Unicode(config=True, help="The name of the Sentinel master")
    max_reconnection_retries = Int(
        default_value=3,
        config=True,
        help="Maximum number of retries for reconnection attempts",
    )
    retry_interval = Int(
        default_value=1,
        config=True,
        help="Interval (in seconds) before retrying to get the master connection",
    )

    sentinel = Any()
    redis = Any()

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.log.info(
            f"sentinel hosts: {[(urlparse(url).netloc, port) for url, port in self.sentinel_hosts]}"
        )
        self.sentinel = Sentinel(
            [(urlparse(url).netloc, port) for url, port in self.sentinel_hosts],
            decode_responses=True,
        )
        self.redis = self.get_master_connection()

    def get_master_connection(self):
        self.log.info("Obtaining master server from sentinel")
        return self.sentinel.master_for(self.master_name)

    def get_slave_connection(self):
        self.log.info("Obtaining slave server from sentinel")
        return self.sentinel.slave_for(self.master_name)

    def _setup_traefik_static_config(self):
        self.log.info(
            "Setting up the redis provider and sentinel in the traefik static config"
        )
        redis_config = {
            "endpoints": [
                f"{urlparse(url).netloc}:{port}" for url, port in self.sentinel_hosts
            ],
            "rootKey": self.kv_traefik_prefix,
            "sentinel": {"masterName": self.master_name},
        }

        self.static_config = deep_merge(
            self.static_config, {"providers": {"redis": redis_config}}
        )

        self.log.info(f"Final static config: {self.static_config}")
        return super(TraefikRedisProxy, self)._setup_traefik_static_config()

    async def execute_with_reconnection(self, func, *args, **kwargs):
        """
        Execute a function with automatic reconnection handling.

        :param func: The Redis command to execute.
        :param args: Arguments for the Redis command.
        :param kwargs: Keyword arguments for the Redis command.
        """
        current_retries = 0
        for attempt in range(self.max_reconnection_retries):
            try:
                return await func(*args, **kwargs)
            except ConnectionError as e:
                self.log.error(
                    f"Error during Redis operation: {e}. Attempt {attempt + 1} of {self.max_reconnection_retries}. Reconnecting to master..."
                )
                self.redis = self.get_master_connection()
                await asyncio.sleep(self.retry_interval)
            current_retries += 1

        raise ConnectionError(
            f"Failed to reconnect after {self.max_reconnection_retries} attempts"
        )

    async def _kv_get_tree(self, prefix):
        async def wrapped():
            return await super(TraefikRedisSentinelProxy, self)._kv_get_tree(prefix)

        return await self.execute_with_reconnection(wrapped)

    async def _kv_atomic_set(self, to_set: dict):
        async def wrapped():
            return await super(TraefikRedisSentinelProxy, self)._kv_atomic_set(to_set)

        return await self.execute_with_reconnection(wrapped)

    async def _kv_atomic_delete(self, *keys):
        async def wrapped():
            return await super(TraefikRedisSentinelProxy, self)._kv_atomic_delete(*keys)

        return await self.execute_with_reconnection(wrapped)

Same redis configuration except redis_url and these two extra variables:

c.TraefikRedisSentinelProxy.master_name = ""  # The name of the Sentinel master
c.TraefikRedisSentinelProxy.sentinel_hosts = [] # Sentinel hosts in the [(address, port), (address, port), ...] format
Keys stored in redis
127.0.0.1:6379> keys *
1) "traefik/http/routers/router__2Fservices_2Fnotebook-sharing_2F/service"
2) "jupyterhub/routes/router__2Fservices_2Fgithub-proxy_2F/service"
3) "jupyterhub/routes/router__2Fservices_2Fgithub-proxy_2F/routespec"
4) "jupyterhub/routes/router__2Fservices_2Fnotebook-sharing_2F/data/service"
5) "jupyterhub/routes/router__2Fservices_2Fnotebook-sharing_2F/router"
6) "traefik/http/services/service__2Fservices_2Fannouncement_2F/loadBalancer/passHostHeader"
7) "jupyterhub/routes/router__2F/router"
8) "jupyterhub/routes/router__2F/routespec"
9) "traefik/http/routers/router__2Fservices_2Fddmetrics_2F/service"
10) "traefik/http/routers/router__2Fservices_2Fannouncement_2F/service"
11) "traefik/http/routers/router__2Fservices_2Fannouncement_2F/entryPoints/0"
12) "traefik/http/routers/router__2Fservices_2Fgithub-proxy_2F/entryPoints/0"
13) "jupyterhub/routes/router__2F/target"
14) "traefik/http/routers/router__2Fservices_2Fddmetrics_2F/entryPoints/0"
15) "jupyterhub/routes/router__2Fservices_2Fpip-proxy_2F/target"
16) "traefik/http/services/service__2Fservices_2Fddmetrics_2F/loadBalancer/servers/0/url"
17) "traefik/http/services/service__2Fservices_2Fnotebook-sharing_2F/loadBalancer/servers/0/url"
18) "traefik/http/routers/router__2Fservices_2Fpip-proxy_2F/service"
19) "jupyterhub/routes/router__2Fservices_2Fpip-proxy_2F/router"
20) "traefik/http/services/service__2Fservices_2Fnotebook-sharing_2F/loadBalancer/passHostHeader"
21) "jupyterhub/routes/router__2Fservices_2Fpip-proxy_2F/service"
22) "traefik/http/routers/router__2Fservices_2Fannouncement_2F/rule"
23) "jupyterhub/routes/router__2Fservices_2Fddmetrics_2F/routespec"
24) "jupyterhub/routes/router__2Fservices_2Fannouncement_2F/router"
25) "traefik/http/routers/router__2Fservices_2Fgithub-proxy_2F/rule"
26) "jupyterhub/routes/router__2Fservices_2Fannouncement_2F/routespec"
27) "jupyterhub/routes/router__2Fservices_2Fgithub-proxy_2F/target"
28) "jupyterhub/routes/router__2Fservices_2Fddmetrics_2F/data/service"
29) "traefik/http/routers/router__2F/rule"
30) "traefik/http/routers/router__2Fservices_2Fddmetrics_2F/rule"
31) "jupyterhub/routes/router__2Fservices_2Fannouncement_2F/service"
32) "jupyterhub/routes/router__2Fservices_2Fgithub-proxy_2F/data/service"
33) "jupyterhub/routes/router__2Fservices_2Fnotebook-sharing_2F/target"
34) "traefik/http/services/service__2Fservices_2Fpip-proxy_2F/loadBalancer/servers/0/url"
35) "jupyterhub/routes/router__2Fservices_2Fannouncement_2F/data/service"
36) "traefik/http/routers/router__2Fservices_2Fpip-proxy_2F/entryPoints/0"
37) "jupyterhub/routes/router__2Fservices_2Fddmetrics_2F/router"
38) "traefik/http/routers/router__2F/service"
39) "traefik/http/middlewares/auth_api/basicAuth/users/0"
40) "traefik/http/services/service__2Fservices_2Fgithub-proxy_2F/loadBalancer/servers/0/url"
41) "traefik/http/services/service__2Fservices_2Fgithub-proxy_2F/loadBalancer/passHostHeader"
42) "traefik/http/services/service__2F/loadBalancer/passHostHeader"
43) "jupyterhub/routes/router__2Fservices_2Fpip-proxy_2F/data/service"
44) "traefik/http/routers/router__2Fservices_2Fpip-proxy_2F/rule"
45) "jupyterhub/routes/router__2Fservices_2Fddmetrics_2F/target"
46) "traefik/http/services/service__2Fservices_2Fannouncement_2F/loadBalancer/servers/0/url"
47) "traefik/http/routers/router__2Fservices_2Fnotebook-sharing_2F/rule"
48) "traefik/http/routers/route_api/rule"
49) "jupyterhub/routes/router__2F/service"
50) "traefik/http/routers/route_api/middlewares/0"
51) "traefik/http/services/service__2Fservices_2Fpip-proxy_2F/loadBalancer/passHostHeader"
52) "jupyterhub/routes/router__2F/data/hub"
53) "traefik/http/routers/route_api/service"
54) "jupyterhub/routes/router__2Fservices_2Fddmetrics_2F/service"
55) "jupyterhub/routes/router__2Fservices_2Fannouncement_2F/target"
56) "traefik/http/services/service__2Fservices_2Fddmetrics_2F/loadBalancer/passHostHeader"
57) "traefik/http/routers/router__2Fservices_2Fnotebook-sharing_2F/entryPoints/0"
58) "traefik/http/routers/route_api/entryPoints/0"
59) "jupyterhub/routes/router__2Fservices_2Fnotebook-sharing_2F/service"
60) "jupyterhub/routes/router__2Fservices_2Fpip-proxy_2F/routespec"
61) "jupyterhub/routes/router__2Fservices_2Fgithub-proxy_2F/router"
62) "jupyterhub/routes/router__2Fservices_2Fnotebook-sharing_2F/routespec"
63) "traefik/http/routers/router__2F/entryPoints/0"
64) "traefik/http/routers/router__2Fservices_2Fgithub-proxy_2F/service"
65) "traefik/http/services/service__2F/loadBalancer/servers/0/url"

CarlosDominguezBecerril avatar Sep 04 '24 12:09 CarlosDominguezBecerril

Okay, found the issue. I still need to use provider_name redis instead of redis_sentinel.

Fixed code:

import asyncio
from urllib.parse import urlparse

from jupyterhub_traefik_proxy.redis import TraefikRedisProxy
from jupyterhub_traefik_proxy.traefik_utils import deep_merge
from redis.exceptions import ConnectionError
from traitlets import Any, Int, List, Tuple, Unicode

try:
    from redis.asyncio.sentinel import Sentinel
except ImportError:
    raise ImportError(
        "Please install `redis` package to use traefik-proxy with redis sentinel"
    )


class TraefikRedisSentinelProxy(TraefikRedisProxy):

    provider_name = "redis"

    sentinel_hosts = List(
        trait=Tuple,
        config=True,
        help="Sentinel hosts in the [(address, port), (address, port), ...] format",
    )
    master_name = Unicode(config=True, help="The name of the Sentinel master")
    max_reconnection_retries = Int(
        default_value=3,
        config=True,
        help="Maximum number of retries for reconnection attempts",
    )
    retry_interval = Int(
        default_value=1,
        config=True,
        help="Interval (in seconds) before retrying to get the master connection",
    )

    sentinel = Any()
    redis = Any()

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.log.info(
            f"sentinel hosts: {[(urlparse(url).netloc, port) for url, port in self.sentinel_hosts]}"
        )
        self.sentinel = Sentinel(
            [(urlparse(url).netloc, port) for url, port in self.sentinel_hosts],
            decode_responses=True,
        )
        self.redis = self.get_master_connection()

    def get_master_connection(self):
        self.log.info("Obtaining master server from sentinel")
        return self.sentinel.master_for(self.master_name)

    def get_slave_connection(self):
        self.log.info("Obtaining slave server from sentinel")
        return self.sentinel.slave_for(self.master_name)

    def _setup_traefik_static_config(self):
        self.log.info(
            "Setting up the redis provider and sentinel in the traefik static config"
        )
        redis_config = {
            "endpoints": [
                f"{urlparse(url).netloc}:{port}" for url, port in self.sentinel_hosts
            ],
            "rootKey": self.kv_traefik_prefix,
            "sentinel": {"masterName": self.master_name},
        }

        self.static_config = deep_merge(
            self.static_config, {"providers": {"redis": redis_config}}
        )

        self.log.info(f"Final static config: {self.static_config}")
        return super(TraefikRedisProxy, self)._setup_traefik_static_config()

    async def execute_with_reconnection(self, func, *args, **kwargs):
        """
        Execute a function with automatic reconnection handling.

        :param func: The Redis command to execute.
        :param args: Arguments for the Redis command.
        :param kwargs: Keyword arguments for the Redis command.
        """
        current_retries = 0
        for attempt in range(self.max_reconnection_retries):
            try:
                return await func(*args, **kwargs)
            except ConnectionError as e:
                self.log.error(
                    f"Error during Redis operation: {e}. Attempt {attempt + 1} of {self.max_reconnection_retries}. Reconnecting to master..."
                )
                self.redis = self.get_master_connection()
                await asyncio.sleep(self.retry_interval)
            current_retries += 1

        raise ConnectionError(
            f"Failed to reconnect after {self.max_reconnection_retries} attempts"
        )

    async def _kv_get_tree(self, prefix):
        async def wrapped():
            return await super(TraefikRedisSentinelProxy, self)._kv_get_tree(prefix)

        return await self.execute_with_reconnection(wrapped)

    async def _kv_atomic_set(self, to_set: dict):
        async def wrapped():
            return await super(TraefikRedisSentinelProxy, self)._kv_atomic_set(to_set)

        return await self.execute_with_reconnection(wrapped)

    async def _kv_atomic_delete(self, *keys):
        async def wrapped():
            return await super(TraefikRedisSentinelProxy, self)._kv_atomic_delete(*keys)

        return await self.execute_with_reconnection(wrapped)

CarlosDominguezBecerril avatar Sep 04 '24 13:09 CarlosDominguezBecerril

Thanks! I don't think we need a new class, I think we just need to add the sentinel_hosts option and change how we connect to the initial connection.

Sentinel claims it will handle reconnect and should failover automatically with our existing Retry configuration.

I'm having some difficulty setting up a local sentinel deployment, but I'll test when I get the chance. Knowing that your code works is a big help!

minrk avatar Sep 05 '24 10:09 minrk