youki icon indicating copy to clipboard operation
youki copied to clipboard

Wait for a JobRemoved signal from Systemd after creating Scope, before intermediate process ends

Open CheatCodeSam opened this issue 1 month ago • 16 comments

Background

No response

Feature Request

I mentioned this as a more complete solution to the fix in #3262, the current implementation just runs in a loop waiting to see if the cgroup has been created.

Is the request related to some problem running youki?

No response

Proposed Solution

I'm thinking we start waiting for the JobRemoved signal before we even send the signal to create the cgroup, then we just use a mutex to prevent the intermediate process from continuing until the signal has been received and the mutex unlocks. Seems simple enough.

Considerations

No response

Additional Context

No response

CheatCodeSam avatar Nov 05 '25 20:11 CheatCodeSam

May I give this a try if no one else wants it? Understand if I'm too new though

cody-herbst avatar Nov 05 '25 21:11 cody-herbst

May I give this a try if no one else wants it? Understand if I'm too new though

@cody-herbst, I would go for it, this one shouldn't be too hard. Let me know if your having some trouble with it.

CheatCodeSam avatar Nov 06 '25 00:11 CheatCodeSam

Will do. Ill let you know if there's trouble.

cody-herbst avatar Nov 06 '25 00:11 cody-herbst

@CheatCodeSam

So I was taking a peek at the code and wanted to confirm/discuss a few things before I started coding:

  1. From what I've been able to tell, the dbus_native module doesn't support the asynchronous receiving of signal from systemd. Is that correct? It looks like we mostly communicate with systemd in a somewhat synchronous fashion where we send a method and listen for (any) method_return message.
  2. Considering the first point what I'm planning to do is something like the following: Spin up a thread that starts a new dbus session. This thread will pull all messages and ideally filter for a certain signal type.
  3. One thought I had was instead of using a mutex, maybe we can consider returning a channel to the client? There's a little more over head, but should grant the calling client more flexibility on what it wants to listen for. With this all we will have to do is modify your wait_for_process_in_cgroup to filter for a RemoveJob message with the correct unit name and status. The time out logic will remain roughly the same. I will make it so the thread will be shut down when the channel is dropped

I'll start coding with the above in mind and if there are any concerns we can discuss/pivot

This is a side issue but something else I noticed: I could be wrong, but in DbusConnection::send_message() it looks like we aren't filtering messages out with the serial that we pass with the method_call. From reading the docs there should only be one response message with a type of either method_return or error. If we filter for the method serial, we could update the code to return only a single message as opposed to a list. Is this correct or am I missing something? Possible enhancement we could make

cody-herbst avatar Nov 06 '25 16:11 cody-herbst

Yeah, that sounds good, I was going to use a thread just since I'm more familiar with it than channels, but that works too.

CheatCodeSam avatar Nov 07 '25 02:11 CheatCodeSam

I'd like to move forward with waiting for the JobRemoved signal before the intermediate process continues. Rather than spawning a new thread, the idea is to extend the existing D-Bus receive loop so that it captures JobRemoved signals, stores them in a Mutex<HashMap> guarded by a Condvar, and then have Manager::apply wait on that condition until the corresponding unit reports completion. This avoids extra threads or busy-wait polling. Does this approach sound good to you?


match msg.message_type() {
        MessageType::MethodReturn => { /* As before */ }
        MessageType::Signal if is_job_removed(&msg) => {
            let unit = extract_unit(&msg);
            let result = extract_result(&msg);
            let mut guard = self.job_events.lock().unwrap();
            guard.insert(unit.clone(), result);
            self.job_cv.notify_all();           // This is shared with Manager.
        }
        _ => continue,
    }

utam0k avatar Nov 09 '25 12:11 utam0k

@utam0k I do have one thought/concern with doing it this way. Please correct me if I'm wrong here

So from reading the documentation the only way to receive signals is to call the Subscribe function:

Systemd docs "Subscribe() enables most bus signals to be sent out. Clients which are interested in signals need to call this method. Signals are only sent out if at least one client invoked this method. Note that this behavior is applied to all the signals that the object in /org/freedesktop/systemd1 may emit, including the D-Bus standard org.freedesktop.DBus.Properties.PropertiesChanged signal and others that do not belong to the org.freedesktop.systemd1 interface."

So in order for us to do what you're suggesting we will have to call Subscribe() -> (What we currently call) StartTransientUnit() and only start reading after StartTransientUnit is done. I'm worried about 2 possibilities in this scenario and both are related to back pressure on the dbus:

  1. If StartTransientUnit takes too long
  2. If there are a lot of signals being fired at the time of invocation

In theory the second we call Subscribe() our socket will start receiving signal messages for all actions being done in systemd. If we don't read them fast enough we run the risk of losing messages to back pressure. In a local environment I don't believe with will be an issue. However, in a server environment with a lot of systemd activity we could run into a scenario where messages are dropped. Even if we do start reading in time to prevent back pressure, it could add a lot of additional messages we have to read on the same thread as StartTransientUnit().

In a multithreaded approach, I was going to spin up a new DbusConnection to listen to the Subscribe messages in parallel and filter out the ones we don't care about. There's also the consideration that the JobId is only given to us as an output of StartTransientUnit() (Though we MIGHT be able to ignore this by using unit name)

This is all under the assumption I understand how our Manager/DbusConnection code works lol. I could very easily be missing something and be completely wrong lol

cody-herbst avatar Nov 09 '25 14:11 cody-herbst

Also, out of curiosity, have we considered using existing dbus library crates as opposed to hand rolling it ourselves? I see there's this: dbus-rs

cody-herbst avatar Nov 09 '25 17:11 cody-herbst

Thanks for flagging the Subscribe concern. We can mitigate the back-pressure risk by issuing AddMatch right after Subscribe, e.g. type='signal',interface='org.freedesktop.systemd1.Manager',member='JobRemoved', so only that signal reaches our connection.

so, out of curiosity, have we considered using existing dbus library crates as opposed to hand rolling it ourselves? I see there's this: dbus-rs

Our native D-Bus client is the best tool for our use case. We avoid relying on libdbus.

utam0k avatar Nov 10 '25 12:11 utam0k

So I was looking at the code yesterday and I feel like I'm missing something. So the core function we are trying to block for is the following:

   cmanager.add_task(pid).map_err(|err| {
        tracing::error!(?pid, ?err, ?init, "failed to add task to cgroup");
        IntermediateProcessError::Cgroup(err.to_string())
    })?;

    if let Some(resources) = resources {
        if init {
            let controller_opt = libcgroups::common::ControllerOpt {
                resources,
                freezer_state: None,
                oom_score_adj: None,
                disable_oom_killer: false,
            };

            cmanager.apply(&controller_opt).map_err(|err| {
                tracing::error!(?pid, ?err, ?init, "failed to apply cgroup");
                IntermediateProcessError::Cgroup(err.to_string())
            })?;
        }
    }

We want to make sure we get the RemoveJob signal before the cmanager.apply() executes. Which makes sense. Where I'm a little confused is the following:

the idea is to extend the existing D-Bus receive loop so that it captures JobRemoved signals, stores them in a Mutex guarded by a Condvar, and then have Manager::apply wait on that condition until the corresponding unit reports completion

From what I can tell.... everything in the container_intermediate_process (once its forked) is single thread. This includes creating the cgroup manager and the following calls to add_task(), apply(). It looks like we create a single dbus connection when we create systemd::manager::Manager::new(). Now this dbus connection is used for every dbus method call in add_task() and apply(). The method call function in this module appears to synchronous in nature (looking at systemd::dbus_native::proxy::Proxy::method_call() ). We write the method call message to the socket then we read from the same socket looking specifically for a dbus response (either Error or MethodReturn). We stop reading from the socket once we receive the MethodReturn response. Please see systemd::dbus_native::DbusConnection::send_message()

So from what I'm reading here, there doesn't appear to be a separate thread continuously reading from the socket. We write to the socket, read looking for the method return and move on once we receive the response. So I'm a little confused how adding a mutex guarded by a condvar would help here as everything is single threaded. In theory, we could update the Method call function to look for a specific signal as opposed to MethodReturn dbus message but I'm not sure that's something we want to do as error is also a possible response.

If anything I said is dead wrong please let me know... I'm still new to rust and all this lol.

We could make new versions of systemd::dbus_native::proxy::Proxy::method_call() and systemd::dubs_native::DbusConnection::send_message() that subscribes to signals and looks for both the MethodReturn/Error and the following remove JobSignal. We call these functions on start_transient_unit(). If we go this route, no additional locks/threads will be necessary as we will be waiting for the signal response as if it were the response from systemd::dbus_native::proxy::Proxy::method_call()

Thanks for flagging the Subscribe concern. We can mitigate the back-pressure risk by issuing AddMatch right after Subscribe, e.g. type='signal',interface='org.freedesktop.systemd1.Manager',member='JobRemoved', so only that signal reaches our connection.

Also keep in mind we will have to manage these match rules carefully because we are using a single dbus client connection. From looking online it seems we can only add one rule at a time. So we will have to make 3 AddMatch calls to allow the original functionality to work. "Signal" "Error" "MethodReturn". We can in remove Signal rule after we are done with start_transient_unit(). This is a little dangerous imo on a single dbus connection but its up to you.

cody-herbst avatar Nov 10 '25 16:11 cody-herbst

Hey, apologies I didn't see this issue earlier. I have impl the existing dbus system, so let me give the reasoning for the specific choices we made. See https://github.com/youki-dev/youki/issues/2208 for very detailed reasoning behind the choices, but a short version is -

  1. We do not use any existing libraries because mostly they do not support our use case. for zbus, it is multi-threaded, which simply does not work for us (see next point) and the dbus-rs is binding over libdbus which adds another system level dep for youki AND still it does not allow us to correctly specify the uid for calling process. Hence, I decided to write our own code for dbus interfacing.
  2. We cannot do anything multi-threaded, especially in the main process where we call the dbus. This is because one of the syscall (unshare) will error if the calling process had multiple threads. There might be ways to work-around this, but I'm of strong opinion that we should not add a bulky runtime just to support async for this case, and then set that runtime to just use one thread. I'm open to discussion, though.
  3. The reason we do not support signals right now, is because I didn't feel they were necessary at the time I was impl this. There are several short-cuts I had taken just to finish the impl at that time, and maybe ignored the "proper" dbus spec where I felt we did not need the feature. For example, if I remember correctly, we are simply ignoring the message id we send and get in response, because our communication is sync. Actually we can receive responses in out-of-order way if we are doing parallel dbus calls ; but we do not do parallel calls, hence we always have exactly one response we are waiting for.

Please feel free to tag/ping me if anyone needs more explanation / reasoning / help for dbus stuff, you can also ping me in Youki's discord.

Again apologies, I haven't been keeping up with issues and prs here, so didn't mention all this sooner.

YJDoc2 avatar Nov 11 '25 13:11 YJDoc2

@YJDoc2 Thanks for the reply. This definitely clears some things up. One follow up question:

So if I'm reading the unshare docs correctly we can technically still use threads... However we just have to make sure they are joined before any calls to unshare are made. Its risky but still possible correct? I agree that we definitely don't need a runtime but managing one extra thread for signal processing should be workable. At least imo.

If the answer to that question is yes from where I sit we have two options:

  1. We spin up a thread that lives no longer than the systemd::manager::Manager::add_task() function. This thread will do the following in my head: Establish a new Dbus connection, call systemd subscribe(), AddMatch only for JobRemove, read until we get the correct signal message we are looking for, signal the parent thread (either with channels or condvar), join the thread before cmanager.add_task() returns. This should limit the exposure to multiple threads existing to just this function and remove the risk of conflicts with unshare
  2. We implement new methods that allow for receiving signals in a sync fashion. This will have the side affect that we will have to manage the dbus Match filters carefully. One possible implementation:

We could make new versions of systemd::dbus_native::proxy::Proxy::method_call() and systemd::dubs_native::DbusConnection::send_message() that subscribes to signals and looks for both the MethodReturn/Error and the following remove JobSignal. We call these functions on start_transient_unit(). If we go this route, no additional locks/threads will be necessary as we will be waiting for the signal response as if it were the response from systemd::dbus_native::proxy::Proxy::method_call()

@utam0k / @YJDoc2 Thoughts?

cody-herbst avatar Nov 11 '25 14:11 cody-herbst

Just to confirm I wrote a little test to be sure

use std::thread;
use std::time::Duration;
use nix::libc::CLONE_NEWUSER;
use nix::sched::{unshare, CloneFlags};

fn main() -> Result<(), nix::Error> {

    let test = thread::spawn( || {
        for _ in 0..10 {
            thread::sleep(Duration::from_secs(1));
            println!("Hello, world!");
        }
    });

    test.join().unwrap();

    unshare(CloneFlags::CLONE_NEWUSER)?;

    Ok(())
}

The above code will complete without error. However, if I move that join after the unshare call the function will result in a Error: EINVAL

cody-herbst avatar Nov 12 '25 16:11 cody-herbst

Ok, with this info, I'm fine with adding a thread while making sure we join it before calling unshare. As for the approach, I'll need to think more on the new method approach you have suggested, but overall I'd be on side of something that is simple and straightforward to understand and debug. Thanks for doing this exploration!

YJDoc2 avatar Nov 14 '25 13:11 YJDoc2

Sweet. For the time being then I'll continue on the threaded approach. The method approach concerns me a bit more considering we will have to actively manipulate the single dbus connection we make.

I'll do my best to make the threaded approach straight forward and isolated to systemd::manager::Manager::add_task() considering the thread cannot be allowed to live for long

cody-herbst avatar Nov 14 '25 13:11 cody-herbst

Please be careful about performance and memory footprint.

utam0k avatar Nov 25 '25 11:11 utam0k