kopf icon indicating copy to clipboard operation
kopf copied to clipboard

Watch resources in a specific namespace?

Open jkupferer opened this issue 4 years ago • 4 comments

I want to have an operator watch only resources in a specific namespace, something like:

@kopf.on.create('myop.example.com', 'v1', 'widgets', namespace='foo')
def watch_providers(body, **kwargs):
    ...

Is this possible? If not, can this feature be added?

jkupferer avatar Jul 30 '19 22:07 jkupferer

@jkupferer There is no such exact feature now, but there are some equivalents:

You can run the whole operator with --namespace CLI option, and it will limit itself to that namespace (brief doc).


On the per-handler level, this probably can be done soon with the new filtering solution (from #123 — available in kopf==0.20), just extended for more filtering criteria.

Related: #98 (planned), #32 (done).

nolar avatar Jul 31 '19 10:07 nolar

The --namespace option looks like it will address many of the use cases I'm looking at. I do have one use case where the watched namespace(s) are determined at runtime with namespace information fetched from other custom resources. In the current design the operator may not have a cluster role binding that allows it watch all namespaces at a cluster level... so I don't think the filtering solution will cover it if it would attempt to watch for resources at a cluster level.

Thanks for the work on this great project and the quick response!

jkupferer avatar Jul 31 '19 11:07 jkupferer

Indeed. This is an interesting use-case. Some thoughts on this:

Kopf does watch-streaming GET requests on the resources, using the global namespace setting. It is unclear how it should behave in case of two handlers with two different namespaces:

@kopf.on.event('zalando.org', 'v1', 'kopfexamples', namespace='ns1')
def fn1(**_): pass

@kopf.on.event('zalando.org', 'v1', 'kopfexamples', namespace='ns2')
def fn1(**_): pass

Should it have 2 watch-streams running? Or should it have one cluster-scoped watch-stream, and filtering on arrival (may hit the RBAC permissions)? The K8s API URL is either global or with one namespace only.


Another problem is the dynamic watch-stream creation. Currently, there is one central place in the very beginning of the process lifecycle (create_tasks()/spawn_tasks()), which takes all registered resources from the registry, and starts watching on them infinitely.

Let's assume we decide to use 2 separate watch-stream for the question 1. There is a way to manipulate the registered handlers/resources via the global registry (kopf.get_global_registry()).

However, currently, once Kopf has started and created all its asyncio tasks, it is not able to start a new watch-stream on demand. And to stop it afterwards without killing the whole operator. Putting this dynamic resource/namespace orchestration into Kopf can be not so easy.

One way to work around this, is to start a thread per namespace (when created), with its own operator running. They will share the logging machinery and global variables (if any). But generally, they will be isolated from each other, more or less, in theory. And to stop that operator/thread when the namespace is going to be deleted.

In the assumption that #156 is merged:

import asyncio
import threading
from typing import MutableMapping, Tuple

import kopf

operators: MutableMapping[str, Tuple[asyncio.Task, threading.Thread]]  = {}
ns_registry = kopf.GlobalRegistry()

@kopf.on.resume('', 'v1', 'namespaces')
@kopf.on.create('', 'v1', 'namespaces')
def ns_created(name, **_):
    if name not in operators:
        # NB: every operator is namespace-limited, similar to --namespace=... CLI.
        operator = kopf.operator(namespace=name, registry=ns_registry)
        thread = threading.Thread(target=kopf.operator, args=(operator,))
        thread.start()
        operators[name] = (operator, thread)

@kopf.on.delete('', 'v1', 'namespaces')
def ns_deleted(name, **_):
    if name in operators:
        operator, thread = operators[name]
        operator.cancel()
        thread.join()  # TODO: add timeout, raise if not stopped
        del operators[name]

def operator_fn(operator):
    try:
        loop = asyncio.get_event_loop()
        loop.run_until_completed(operator)
    except asyncio.CancelledError:
        pass

@kopf.on.event('zalando.org', 'v1', 'kopfexamples', registry=ns_registry)
def something_created(**_):
    pass
# ... and other ns-specific handlers

It will be fun if this construct will run :-)

nolar avatar Jul 31 '19 15:07 nolar

I would think it should have two separate watch streams in this example. Thank you for your exploration on this.

jkupferer avatar Aug 14 '19 13:08 jkupferer