pingora icon indicating copy to clipboard operation
pingora copied to clipboard

support consul/k8s service discovery

Open leyou240 opened this issue 1 year ago • 7 comments
trafficstars

What is the problem your feature solves, or the need it fulfills?

A clear and concise description of why this feature should be added. What is the problem? Who is this for? Our microservices are based on k8s, and will be dynamically expanded and reduced during daily operation and maintenance. Currently, static configuration is very inconvenient. We hope to support k8s or consul service discovery.

Describe the solution you'd like

What do you propose to resolve the problem or fulfill the need above? How would you like it to work?

Describe alternatives you've considered

What other solutions, features, or workarounds have you considered that might also solve the issue? What are the tradeoffs for these alternatives compared to what you're proposing?

Additional context

This could include references to documentation or papers, prior art, screenshots, or benchmark results.

leyou240 avatar Aug 23 '24 12:08 leyou240

This will be an add-on business logic on top of the existing framework.

Internally we have systems on top of Pingora working that way.

Do you have a reference to a standard way of doing such thing or are you looking for guidance how to implement such custom logic.

eaufavor avatar Aug 30 '24 18:08 eaufavor

This will be an add-on business logic on top of the existing framework.

Internally we have systems on top of Pingora working that way.

Do you have a reference to a standard way of doing such thing or are you looking for guidance how to implement such custom logic.

thx, I'm looking for guidance how to implement such custom logic.

leyou240 avatar Sep 02 '24 14:09 leyou240

take a look at this. https://gist.github.com/Object905/6cafd5e8e56dd60670149296411a407f

MMADUs avatar Sep 21 '24 07:09 MMADUs

Since publishing this gist I've updated code to be more self-contained, removed dependency on crossbeam and made it work with kube-rs>0.92.0, because it changed store internals and old version was deadlocking/not discovering right away. Now this should be copy-pastable. So, updated the gist too right now.

It works on my production and also achieves zero downtime upgrades of services. As a bonus included DNS discovery that I made too.

Object905 avatar Sep 21 '24 12:09 Object905

Since publishing this gist I've updated code to be more self-contained, removed dependency on crossbeam and made it work with kube-rs>0.92.0, because it changed store internals and old version was deadlocking/not discovering right away. Now this should be copy-pastable. So, updated the gist too right now.

It works on my production and also achieves zero downtime upgrades of services. As a bonus included DNS discovery that I made too.

interesting! some question, does the dns discovery doesnt need the updater background service?

MMADUs avatar Sep 22 '24 12:09 MMADUs

My use case for dns discovery doesn't account for short living dns entries (like coredns in kubernetes), yes. Setting LoadBalancer.update_frequency should be enough for most use cases when resolving "real" domains, hickory_resolver client does some dns caching inside based on real ttl of entries, so they're not actually re-queried every time when back ends are updated.

And it will be hard to achieve zero downtime with DNS anyway. That may be remedied by retrying when handling upstream errors, but that seems to be a bit flaky anyway.

Object905 avatar Sep 22 '24 15:09 Object905

@Object905 Thanks, this gist help me a lot!

leyou240 avatar Sep 26 '24 00:09 leyou240

interesting approach. I'm working on using the -u upgrade feature to change the config (or the binary), rather than trying to keep a lot of dynamic config around in pingora.

From k8s, docker, or whatever, I generate a config file that pingora reads once, at startup. the config file has the resolved DNS names (for example)

Any drawbacks to this approach?

pszabop avatar Oct 23 '24 01:10 pszabop

From k8s, docker, or whatever, I generate a config file that pingora reads once, at startup. the config file has the resolved DNS names (for example)

Any drawbacks to this approach?

Are you referring to pod or service IPs? Pod IPs are not stable

simonhammes avatar Oct 23 '24 19:10 simonhammes

If pingora upgrade is seamless, then unstable Pod IPs are not a problem. Upgrade every few minutes.

But upgrade may not be as seamless as one would hope for (e.g. the HTTP cache doesn't get upgraded. I think, so effectively a flush at every pod IP change). That's why I asked.

pszabop avatar Oct 24 '24 16:10 pszabop

Since publishing this gist I've updated code to be more self-contained, removed dependency on crossbeam and made it work with kube-rs>0.92.0, because it changed store internals and old version was deadlocking/not discovering right away. Now this should be copy-pastable. So, updated the gist too right now.

It works on my production and also achieves zero downtime upgrades of services. As a bonus included DNS discovery that I made too.

@Object905 Follow this gist, How to get kube_client in non-async function ?

shenshouer avatar Nov 27 '24 09:11 shenshouer

Since publishing this gist I've updated code to be more self-contained, removed dependency on crossbeam and made it work with kube-rs>0.92.0, because it changed store internals and old version was deadlocking/not discovering right away. Now this should be copy-pastable. So, updated the gist too right now. It works on my production and also achieves zero downtime upgrades of services. As a bonus included DNS discovery that I made too.

@Object905 Follow this gist, How to get kube_client in non-async function ?

You can use block_on() (see the example).

As an example, I have the following code at the start of main() to instantiate a client:

let runtime = Runtime::new().expect("Could not start runtime");
let client = runtime.block_on(async {
    let client = Client::try_default()
        .await
        .expect("Could not create client");

    let version = client
        .apiserver_version()
        .await
        .expect("Could not get version")
        .git_version;

    info!("K8S: {version}");

    client
});

simonhammes avatar Nov 27 '24 09:11 simonhammes

@simonhammes The code as follow report error: watcher error: failed to perform initial object list: ServiceError: buffer's worker closed unexpectedly

fn main() -> anyhow::Result<()> {
    tracing_subscriber::fmt().init();
    info!("Starting...");

    let client = tokio::runtime::Runtime::new()
        .unwrap()
        .block_on(async { Client::try_default().await })
        .expect("Failed to create kube client");

    tokio::runtime::Runtime::new()
        .unwrap()
        .block_on(async move {
            let api = Api::<EndpointSlice>::namespaced(client, "default");

            let (_reader, writer) = reflector::store();
            let filter = Config::default().labels("app=nginx");
            let watcher = runtime::watcher(api, filter);
            let mut watcher_stream = watcher.default_backoff().reflect(writer).boxed();

            loop {
                info!("Waiting for next event...");
                match watcher_stream.try_next().await {
                    Ok(data) => match data {
                        Some(e) => match e {
                            Event::Apply(k) => info!("Applied {}", k.name_any()),
                            Event::Delete(k) => info!("Deleted {}", k.name_any()),
                            Event::Init => info!("Init"),
                            Event::InitDone => info!("InitDone"),
                            Event::InitApply(k) => info!("InitApply {}", k.name_any()),
                        },
                        None => info!("no data"),
                    },
                    Err(e) => {
                        warn!("watcher error: {e}");
                        break;
                    }
                }
            }
        });

    Ok(())
}

shenshouer avatar Nov 27 '24 09:11 shenshouer

@simonhammes The code as follow report error: watcher error: failed to perform initial object list: ServiceError: buffer's worker closed unexpectedly

fn main() -> anyhow::Result<()> {
    tracing_subscriber::fmt().init();
    info!("Starting...");

    let client = tokio::runtime::Runtime::new()
        .unwrap()
        .block_on(async { Client::try_default().await })
        .expect("Failed to create kube client");

    tokio::runtime::Runtime::new()
        .unwrap()
        .block_on(async move {
            let api = Api::<EndpointSlice>::namespaced(client, "default");

            let (_reader, writer) = reflector::store();
            let filter = Config::default().labels("app=nginx");
            let watcher = runtime::watcher(api, filter);
            let mut watcher_stream = watcher.default_backoff().reflect(writer).boxed();

            loop {
                info!("Waiting for next event...");
                match watcher_stream.try_next().await {
                    Ok(data) => match data {
                        Some(e) => match e {
                            Event::Apply(k) => info!("Applied {}", k.name_any()),
                            Event::Delete(k) => info!("Deleted {}", k.name_any()),
                            Event::Init => info!("Init"),
                            Event::InitDone => info!("InitDone"),
                            Event::InitApply(k) => info!("InitApply {}", k.name_any()),
                        },
                        None => info!("no data"),
                    },
                    Err(e) => {
                        warn!("watcher error: {e}");
                        break;
                    }
                }
            }
        });

    Ok(())
}

It's ok when in one runtime

fn main() -> anyhow::Result<()> {
    tracing_subscriber::fmt().init();
    info!("Starting...");

    let rt = tokio::runtime::Runtime::new().unwrap();

    let client = rt
        .block_on(async { Client::try_default().await })
        .expect("Failed to create kube client");

    rt.block_on(async move {
        let api = Api::<EndpointSlice>::namespaced(client, "default");

        let (_reader, writer) = reflector::store();
        let filter = Config::default().labels("app=nginx");
        let watcher = runtime::watcher(api, filter);
        let mut watcher_stream = watcher.default_backoff().reflect(writer).boxed();

        loop {
            info!("Waiting for next event...");
            match watcher_stream.try_next().await {
                Ok(data) => match data {
                    Some(e) => match e {
                        Event::Apply(k) => info!("Applied {}", k.name_any()),
                        Event::Delete(k) => info!("Deleted {}", k.name_any()),
                        Event::Init => info!("Init"),
                        Event::InitDone => info!("InitDone"),
                        Event::InitApply(k) => info!("InitApply {}", k.name_any()),
                    },
                    None => info!("no data"),
                },
                Err(e) => {
                    warn!("watcher error: {e}");
                    break;
                }
            }
        }
    });

    Ok(())
}

shenshouer avatar Nov 27 '24 10:11 shenshouer

@shenshouer Sorry, I can't help you with that.

I'm using the code from https://gist.github.com/Object905/6cafd5e8e56dd60670149296411a407f#file-register-rs inside main() and it works without issues.

simonhammes avatar Nov 27 '24 19:11 simonhammes

@simonhammes The issue has been resolved. Thanks to @Object905 for providing the sample code at https://gist.github.com/Object905/6cafd5e8e56dd60670149296411a407f. Here is the modified example based on their code:

use std::sync::LazyLock;

use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::discovery::v1::EndpointSlice;
use kube::{
    runtime::{
        self,
        reflector::{self},
        watcher::{Config, Event},
        WatchStreamExt,
    },
    Api, Client, ResourceExt,
};
use log::info;
use tracing::warn;

pub static SHARED_RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .expect("Failed to create tokio shared runtime")
});

pub static KUBE: LazyLock<Option<Client>> = LazyLock::new(|| {
    let config = kube::Config::incluster()
        .ok()
        .or(SHARED_RUNTIME.block_on(kube::Config::infer()).ok())?;
    let _guard = SHARED_RUNTIME.enter();
    let client = Client::try_from(config).ok()?;
    Some(client)
});

fn main() -> anyhow::Result<()> {
    tracing_subscriber::fmt().init();
    info!("Starting...");

    let rt = tokio::runtime::Runtime::new().unwrap();

    let client = KUBE.clone().expect("Failed to get kube client");

    let api = Api::<EndpointSlice>::namespaced(client, "default");

    let (_reader, writer) = reflector::store();
    let filter = Config::default().labels("app=nginx");
    let watcher = runtime::watcher(api, filter);
    let mut watcher_stream = watcher.default_backoff().reflect(writer).boxed();

    rt.block_on(async move {
        loop {
            info!("Waiting for next event...");
            match watcher_stream.try_next().await {
                Ok(data) => match data {
                    Some(e) => match e {
                        Event::Apply(k) => info!("Applied {}", k.name_any()),
                        Event::Delete(k) => info!("Deleted {}", k.name_any()),
                        Event::Init => info!("Init"),
                        Event::InitDone => info!("InitDone"),
                        Event::InitApply(k) => info!("InitApply {}", k.name_any()),
                    },
                    None => info!("no data"),
                },
                Err(e) => {
                    warn!("watcher error: {e}");
                    break;
                }
            }
        }
    });

    Ok(())
}

shenshouer avatar Nov 28 '24 02:11 shenshouer

I'm closing the issue here as not planned since this is something done on top of pingora rather than inside the framework itself.

Feel free to keep using this issue to discuss the topic though!

Noah-Kennedy avatar Dec 14 '24 19:12 Noah-Kennedy