kopf icon indicating copy to clipboard operation
kopf copied to clipboard

How do I make sure a daemon is only running in the active peer?

Open jkupferer opened this issue 2 years ago • 7 comments

Keywords

daemon, peering

Problem

I need to run a daemon but only have it run for the active peer. So far I haven't been able to find any way to respond when the active peer switches in order to prevent daemons for the previous active peer from interfering with the new active peer. I had hoped that the on.daemon method would be stopped when the active peer changed but this seems to not be the case.

jkupferer avatar Oct 29 '21 15:10 jkupferer

Hello. The daemons are stopped when the operator's process loses its active role and pauses. The daemons themselves can notice that by checking the stopping flag's reason as (stopped.reason & kopf.DaemonStoppingRead.OPERATOR_PAUSING). They are restarted when the operator process becomes active again.

Do you observe that they continue running? Can you please add a repro snippet here? And maybe logs. Which version of Kopf do you use?

For example:

import kopf

@kopf.daemon('kopfexamples')
def daemon(stopped, logger, **_):
    while not stopped:
        logger.info('>>> ping')
        stopped.wait(1.0)
    logger.info(f'>>> Exiting reason: {stopped.reason}')
[2021-10-29 18:10:33,002] kopf.objects         [DEBUG   ] [default/kopf-example-1] Daemon 'daemon' is invoked.
[2021-10-29 18:10:33,003] kopf.objects         [INFO    ] [default/kopf-example-1] >>> ping
[2021-10-29 18:10:34,011] kopf.objects         [INFO    ] [default/kopf-example-1] >>> ping
kopf freeze -n default --peering default --priority 100 --lifetime 60
[2021-10-29 18:11:20,372] kopf._core.engines.p [DEBUG   ] Keep-alive in 'default' in 'default': ok.
[2021-10-29 18:11:21,257] kopf.objects         [INFO    ] [default/kopf-example-1] >>> ping
[2021-10-29 18:11:22,260] kopf.objects         [INFO    ] [default/kopf-example-1] >>> ping
[2021-10-29 18:11:22,478] kopf._core.engines.p [INFO    ] Pausing operations in favour of [<Peer nolar@SV-Pro: priority=100, lifetime=60, lastseen='2021-10-29T16:11:22.308659'>].
[2021-10-29 18:11:22,479] kopf.objects         [INFO    ] [default/kopf-example-1] >>> Exiting reason: DaemonStoppingReason.OPERATOR_PAUSING
[2021-10-29 18:11:22,482] kopf.objects         [INFO    ] [default/kopf-example-1] Daemon 'daemon' succeeded.
[2021-10-29 18:11:22,482] kopf.objects         [DEBUG   ] [default/kopf-example-1] Daemon 'daemon' has exited on request and will not be retried or restarted.
[2021-10-29 18:11:22,482] kopf.objects         [DEBUG   ] [default/kopf-example-1] Daemon 'daemon' has exited gracefully.
[2021-10-29 18:11:22,583] kopf._cogs.clients.w [DEBUG   ] Pausing the watch-stream for kopfexamples.v1.kopf.dev in 'default' (blockers: default@default).

Silence for 60 seconds.

[2021-10-29 18:12:22,483] kopf._core.engines.p [INFO    ] Resuming operations after the pause. Conflicting operators with the same priority are gone.
[2021-10-29 18:12:22,484] kopf._cogs.clients.w [DEBUG   ] Resuming the watch-stream for kopfexamples.v1.kopf.dev in 'default'.
[2021-10-29 18:12:22,620] kopf.objects         [DEBUG   ] [default/kopf-example-1] Daemon 'daemon' is invoked.
[2021-10-29 18:12:22,621] kopf.objects         [INFO    ] [default/kopf-example-1] >>> ping
[2021-10-29 18:12:23,623] kopf.objects         [INFO    ] [default/kopf-example-1] >>> ping

nolar avatar Oct 29 '21 16:10 nolar

Ah! I see my confusion. I didn't understand the nature of the stopped parameter... rethinking my approach...

What I am really trying to accomplish is scheduling an action to happen once after a variable time interval. I was trying to use a threading Timer to schedule activity for later execution. I see I can use on.create and on.resume to launch activity like...

https://kopf.readthedocs.io/en/latest/handlers/

TASKS = {}

@kopf.on.delete('kopfexamples')
async def my_handler(spec, name, **_):
    if name in TASKS:
        TASKS[name].cancel()

@kopf.on.resume('kopfexamples')
@kopf.on.create('kopfexamples')
def my_handler(spec, **_):
    if name not in TASKS:
        TASKS[name] = asyncio.create_task(some_coroutine(spec))

But... in the case of an active peer becoming passive, how would I cancel the task?

jkupferer avatar Oct 29 '21 16:10 jkupferer

I suppose I could use an asnyc daemon that just sleeps? Like...

@kopf.daemon('kopfexamples', cancellation_timeout=1.0)
async def do_timed_thing(spec, **kwargs):
    try:
        await asyncio.sleep(spec.get('delay', 10))
        do_the_thing()
    except asyncio.CancelledError:
        print("We are done. Bye.")

jkupferer avatar Oct 29 '21 16:10 jkupferer

Though... thinking about this more... my use case could also involve the delay changing before execution. Like someone updating the spec.delay in the example above...

jkupferer avatar Oct 29 '21 16:10 jkupferer

But... in the case of an active peer becoming passive, how would I cancel the task?

You cannot. Peering status is not reported to handlers, so they cannot pass it down to their sub-tasks/sub-handles/sub-timers.

But the handlers can exchange state via the memo object. The daemon can sleep forever, notice the "pausing", and raise a flag in the memo; and lower it back when restarted. The regular on-update handlers can check that flag in the memo. This sounds complicated though.

I suppose I could use an async daemon that just sleeps? Like...

Correct. The good thing is that async daemons can be gracefully cancelled (e.g. when pausing due to peering), so they have no need to check for the stopped flag.

I do not fully get the nature of the delay here: it is a delay of spec.delay seconds relative to which moment? The moment of the operator startup? Or relative to the moment of the operator resume (even if there are several of them, e.g. every 5 mins)? Or the moment of the resource/spec update? Or all of the above combined?

nolar avatar Oct 29 '21 18:10 nolar

The specific delay I am implementing is an auto-delete for a resource. The operator observes the creation of the resource object which triggers an action to occur. The resource status then records the result of the activity and should stick around for a configured lifespan after completion.

If one of these objects is complete pending delete, it should be possible to change the settings to extend the deletion delay and have the logic follow that and adjust the logic to do the auto-delete. Using Python threading.Timer objects I am using kopf.on.create, kopf.on.resume, and kopf.on.update to set or reset the Timer each time the object state is reported. This works fairly well except for the need to clear the timers on transition from active to passive peer.

If I use on.daemon... I could write a polling loop? The auto-delete doesn't need to have specific resolution... hrm... but how would I go about getting updates to the spec/status from within the daemon function?

jkupferer avatar Oct 29 '21 18:10 jkupferer

Can standard synchronisation primitives work for this task?

import kopf
import time
import threading  # or asyncio, if you wish

@kopf.on.create('kopfexamples')
@kopf.on.resume('kopfexamples')
def init_primitives(memo, **_):
    memo.cond = threading.Condition()
    memo.delete_me_at = None

@kopf.on.update('kopfexamples')
def something_changed(memo, spec, **_):
    if worth_deleting():
        with memo.cond:
            memo.delete_me_at = time.monotonic() + int(spec.get('delay', 3*60))
            memo.cond.notify_all()
    elif worth_keeping():
        with memo.cond:
            memo.delete_me_at = None
            memo.cond.notify_all()

@kopf.daemon('kopfexamples')
def delete_tracker(memo, stopped, namespace, name, **):
    while not stopped:
        with memo.cond:
            # capped to 1.0 from above — or how fast do you want the daemon to exit (to notice the "stopped" flag value)
            timeout = min(1.0, memo.delete_me_at - time.monotonic()) if memo.delete_me_at else 1.0
            if timeout <= 0:
                delete_resource(namespace, name)
            else:
                memo.cond.wait(timeout=timeout)

nolar avatar Oct 29 '21 22:10 nolar