rumqtt icon indicating copy to clipboard operation
rumqtt copied to clipboard

Feature: Shutdown

Open AlexandreCassagne opened this issue 1 year ago • 22 comments

Expected Behavior

There should be a mechanism for shutting Rumqttd down.

Current Behavior

No such method (that I could find).

Failure Information (for bugs)

N/A

Context

Could you point me in the right direction so I can implement this? Should this be initiated by the link_tx? or other place?

AlexandreCassagne avatar Dec 15 '23 16:12 AlexandreCassagne

CC @ericschaal

AlexandreCassagne avatar Dec 15 '23 16:12 AlexandreCassagne

There should be a mechanism for shutting Rumqttd down.

can you elaborate more on what the scenario of shutdown look like? And how will shutting down differ from killing process?

like will we store the state & then restart where we left of? If yes, then what would be included in the state.

Thanks 😁

swanandx avatar Dec 15 '23 16:12 swanandx

Good questions!

I am running rumqttd alongside other tasks, and we need the flexibility of turning on and off the broker, without having the possibility of shutting down the process itself

I am not sure what that should look like internally, but my understanding is that when we kill the process, nothing special happens in terms of state, etc.

So, I would imagine a "shutdown" command to mirror that---whatever happens when receiving a kill signal would happen when shutting down (and indeed, you would be able to replace any logic on kill signal with the shutdown command newly created)

I don't know enough about the internals yet. Is there any such logic on process kill?

AlexandreCassagne avatar Dec 15 '23 16:12 AlexandreCassagne

Regarding your point on restarting where we left off, I see this outside of the scope of this issue (for my use case).

However, I think that if it is part of your plan to have restart logic, it should be taken into consideration!

AlexandreCassagne avatar Dec 15 '23 16:12 AlexandreCassagne

gotcha! so basically we want to return from broker.start() right?

Best way I can think of is using console! beacause console is spawned on same thread as start fn, if we join() the returned handle of console, like:

let handle = console_thread.spawn(move || {
    let mut runtime = tokio::runtime::Builder::new_current_thread();
    let runtime = runtime.enable_all().build().unwrap();
    runtime.block_on(console::start(console_link))
})?;

// here check the return value of the console, if we return the error
// ( which we will do for shutdown signal, we can return from start
if let Ok(Err(e)) = handle.join() {
    // determine what to return
    return _;
};

and add a endpoint to console like /shutdown which would return the error, it should work. ( we can use tokio select or something to run the axum server while also listening for shutdown signal ).

only concern is what would happen to other spawned threads ( ideally, they will be detached as their join handles will be dropped, but not so sure about the cleanup )

other than that, also the concern of how we would restart :thinking:

would be more than happy to help you with this :) but will have to first discuss regarding this feature though. cc: @tekjar @henil

swanandx avatar Dec 15 '23 17:12 swanandx

I wonder will it be possible to have a broker.stop() function? :eyes:

swanandx avatar Dec 15 '23 17:12 swanandx

gotcha! so basically we want to return from [broker.start()]

Exactly!

I was surprised to find that it is not possible to terminate a thread in std::thread. So, unless you transition to futures (which do have abort()), the current API doesn't support shutting down.

Alternatively, it could be more simple to return a handle in let handle = broker.start() that allows you to emit commands (e.g. Sender<HubControlMessage>: handle.sender.send(HubControlMessage::Shutdown for example).

This approach would be quite versatile but breaking, as developers would have to have an async runtime. Additionally, developers would have to .await the handle.task or something like that.

add a endpoint to console like /shutdown I'm unfamiliar with console! etc., so I'll need to delve into it a little deeper to comment.

But we could also simply return from start() (or pass as an argument to start()) an mpsc or oneshot channel on broker start, which can be called with a control message (such as shutdown). Then, if developers want to wire this to an endpoint elsewhere, they can

AlexandreCassagne avatar Dec 15 '23 18:12 AlexandreCassagne

returning a handle for sending control messages from start() seems a good idea! ( though will be a breaking change in API ).

The handle can act as console, but without HTTP endpoints, and console can use it internally as well haha

swanandx avatar Dec 15 '23 18:12 swanandx

I think returning handle might get tricky:

  • we can't return it from start(), as we need to keep execution going on in start()

  • we can have a fn like, broker.handle(), which will return the Sender, but not sure whether it will work because start() take &mut self! ( Getting handle before start might work, like we do for link() )

swanandx avatar Dec 15 '23 21:12 swanandx

i like the idea of using a shutdown endpoint in console but not everyone might want to use all the things from console (previously Console was required and someone opened an issue to make it optional) or even start and http endpoint just to have ability to do shutdown.

so i think adding a new oneshot channel which users can get handle to before calling broker.start() seems like the best option.

only concern is what would happen to other spawned threads ( ideally, they will be detached as their join handles will be dropped, but not so sure about the cleanup )

as @AlexandreCassagne mentioned i think for now we can ignore cleanup and just drop the handles.

h3nill avatar Dec 16 '23 04:12 h3nill

as @AlexandreCassagne mentioned i think for now we can ignore cleanup and just drop the handles

It may be worthwhile to open another issue to track cleanup

AlexandreCassagne avatar Dec 19 '23 14:12 AlexandreCassagne

I was experimenting with this, and I think we can't ignore the cleanup!

even if we drop the handles, threads will continue their execution in background, i.e. everything will still be running, even after user requested to shut down.

There is no way to kill a thread externally, afaik. And sending some rx to each of the thread, so that thread can listen on it for kill signal ( while they are executing their main task ) is getting kinda complicated haha.

I will try further to see if there is something we can figure out! lmk if you have any suggestions, thanks :)

swanandx avatar Feb 06 '24 14:02 swanandx

Hey @swanandx! have you pushed your code somewhere? I'd love to take a look

AlexandreCassagne avatar Feb 06 '24 15:02 AlexandreCassagne

Is there any way to migrate from a threads implementation to tokio tasks? This would allow us to use the tokio::task::AbortHandle.

AlexandreCassagne avatar Feb 06 '24 15:02 AlexandreCassagne

have you pushed your code somewhere? I'd love to take a look

nope :( I was just playing around locally, I can provide this though:

fn main() {
    let h = work();

    println!("Done with the stream!!");

    drop(h);
    println!("Dropped!!");
    loop {}
}

fn work() -> thread::JoinHandle<()> {
    // tokio::spawn(async {
    //     loop {
    //         println!("still alive");
    //         sleep(Duration::from_millis(500)).await;
    //     }
    // })
    thread::spawn(|| loop {
        println!("alive :)");
        sleep(Duration::from_millis(500));
    })
}

even after we drop the handle, the thread stays alive!

Is there any way to migrate from a threads implementation to tokio tasks? This would allow us to use the tokio::task::AbortHandle.

in order to know when to abort , we would still need to use something like select! blocks everywhere right? and if we are gonna use select!, we don't need to migrate anywhere. Just in every block_on call, add this block to check for kill signal, if we get it, return. this will return from the spawned thread as well! ( as there is no further thing to do! )

not sure about if this excess use of select! is bad or fine, so that is why I'm kinda not happy with it.

swanandx avatar Feb 06 '24 16:02 swanandx

I will try to push code with select! blocks as poc tomorrow! :smile:

swanandx avatar Feb 06 '24 16:02 swanandx

here is poc - https://github.com/bytebeamio/rumqtt/compare/main...rumqttd-shutdown

we return an BrokerHandle, which can be used to stop the broker! @AlexandreCassagne

swanandx avatar Feb 07 '24 08:02 swanandx

nice! it doesn't look as bad as you suggested yesterday!

AlexandreCassagne avatar Feb 07 '24 09:02 AlexandreCassagne

Hey @AlexandreCassagne @swanandx I was looking into using rumqttd as part of loadable/unloadable DLL extension on windows process that outlives the extension. Not having an ability to cleanup resources used by DLL is a showstopper. Is this going to be merged eventually?

aohotnik avatar Mar 03 '24 12:03 aohotnik

Is this going to be merged eventually?

Hey @aohotnik, yes, having ability to shutdown would be required especially when rumqttd is used as library ( like case you mentioned )

I got caught up with other work, so this got delayed 😕

btw, will something similar to shown in PoC here would resolve your needs? https://github.com/bytebeamio/rumqtt/issues/771#issuecomment-1931514715

Would be cool if you provide the feedback so we can work in getting it from PoC to production haha. We still have to analyse perf / memory usage impact if any ( likely there won't be much ).

Thanks!

swanandx avatar Mar 03 '24 12:03 swanandx

btw, will something similar to shown in PoC here would resolve your needs?

@swanandx If it will stop all spun up by broker threads and frees all resources when DLL is unloaded by the main process - I think it would be enough. I'm brainstorming right now, so there is no prototype to test on, but I will be able to use your linked code in a fork to try it out when time comes.

aohotnik avatar Mar 03 '24 13:03 aohotnik