HiGHS
HiGHS copied to clipboard
Release GIL and Run in Background Thread
Fixes #1593
This PR makes two changes:
- Release the GIL when calling
run/solveso that other threads can run. - Execute
run/solvein a background thread so that keyboard interrupts work.
I think it should be fine, but it is worth double checking. Let me get back to you.
I tried the branch and I think there is something missing. When I now press CTRL+C, I immediately end up in my "except KeyboardInterrupt:" block as intended, but the solver keeps running in the background as can be seen by the continued production of log output.
That's odd. I didn't see behavior like that. I'll take another look.
Well, it works if the python program immediately terminates on CTRL+C, but not if it keeps running and doing other stuff.
My use case is to collect intermediate solutions with a callback (requires #1882) and when the user hits CTRL+C, I want to continue to work with the best intermediate solution. Having the solver still running and modifying its state is a problem.
Ahh. Got it. Let me make some modifications.
I tried the branch and I think there is something missing. When I now press CTRL+C, I immediately end up in my "except KeyboardInterrupt:" block as intended, but the solver keeps running in the background as can be seen by the continued production of log output.
~~I'm having trouble reproducing this. What OS are you seeing this on?~~ Scratch that. I am seeing this behavior.
Sanity check - probably me being dumb about Python
When running in background thread (singular), presumably HiGHS can still run using multiple threads.
@jajhall, I confirmed that HiGHS can still use multiple threads.
I should probably make modifications so that a results object is still returned even if there is a keyboard interrupt.
The only way I see to handle the keyboard interrupt is with events, which would have to be set in a callback or something.
This seems to work:
import highspy
import time
def main():
h = highspy.Highs()
h.readModel("30n20b8.mps")
last_intermediate_solution = None
solver_should_stop = False
solver_stoped = False
def callback(callback_type, message, data_out, data, user_callback_data):
if callback_type == highspy.cb.HighsCallbackType.kCallbackMipImprovingSolution:
nonlocal last_intermediate_solution
last_intermediate_solution = data_out.mip_solution
elif callback_type in (highspy.cb.HighsCallbackType.kCallbackSimplexInterrupt, highspy.cb.HighsCallbackType.kCallbackIpmInterrupt, highspy.cb.HighsCallbackType.kCallbackMipInterrupt):
nonlocal solver_should_stop, solver_stoped
if solver_should_stop:
data.user_interrupt = True
solver_stoped = True
h.setCallback(callback, None)
h.startCallback(highspy.cb.HighsCallbackType.kCallbackMipImprovingSolution)
h.startCallback(highspy.cb.HighsCallbackType.kCallbackSimplexInterrupt)
h.startCallback(highspy.cb.HighsCallbackType.kCallbackIpmInterrupt)
h.startCallback(highspy.cb.HighsCallbackType.kCallbackMipInterrupt)
try:
h.run()
except KeyboardInterrupt:
solver_should_stop = True
print("KeyboardInterrupt: Waiting for highs to finish...")
while not solver_stoped:
time.sleep(0.1)
print("KeyboardInterrupt: Highs finished")
h.stopCallback(highspy.cb.HighsCallbackType.kCallbackMipImprovingSolution)
h.stopCallback(highspy.cb.HighsCallbackType.kCallbackSimplexInterrupt)
h.stopCallback(highspy.cb.HighsCallbackType.kCallbackIpmInterrupt)
h.stopCallback(highspy.cb.HighsCallbackType.kCallbackMipInterrupt)
model_status = h.getModelStatus()
print(f"model_status: {model_status}")
solution = None
if model_status == highspy.HighsModelStatus.kOptimal:
solution = h.getSolution().col_value
elif model_status == highspy.HighsModelStatus.kInterrupt and last_intermediate_solution is not None:
solution = last_intermediate_solution
model_status = highspy.HighsModelStatus.kOptimal
elif model_status == highspy.HighsModelStatus.kInfeasible:
pass
print(f"solution found: {solution is not None}")
main()
The mps file can be found here: https://miplib.zib.de/instance_details_30n20b8.html
If one waits until the solver reports BestSol != inf, then last_intermediate_solution will contain that solution.
While writing this I came across these issues: #1904, #1905
EDIT: I tried replacing the time.sleep(0.1) with a threading.Condition.wait, but then one always runs into #1905 with model_status == kNotSet instead of kInterrupt.
Thanks, @few. I can work something like this into the PR, but I'd like to hear from @jajhall or @galabovaa, first. This would involve creating a callback that ran every time HiGHS was run from Python. If we do this, I think the callback should be in c++ (which is not a problem). I'm not sure I see a way around it if we want keyboard interrupts to work properly. However, this might negate the need to run in a background thread. If @jajhall is okay with using a callback for this, I'll go ahead and update the PR.
I'm not sure if fiddling with the signal handling in a library is such a nice thing for the application. I was thinking about this: a) Merge this PR as is b) After #1905 is fixed, add an example based on my code above showcasing how to set up callbacks to properly deal with keyboard interrupts. For simple scripts, your PR will just work as those terminate immediately and don't notice that the highs thread keeps running. Larger applications that want to deal with KeyboardInterrupt in their own way then have all the flexibility.
I just tried the branch on Windows... and it does not work. The solver just keeps running. Once it finishes, "KeyboardInterrupt: Waiting for highs to finish..." is printed, which hangs, because highs has already stopped.
I might have found a way to make it work on Windows. It depends on the pywin32 package though. Add these import at the top of highspy.py and replace the solve() function.
from threading import local, Thread, Event
import sys
if sys.platform == 'win32':
import win32event
import win32api
def solve(self):
"""Runs the solver on the current problem.
Returns:
A HighsStatus object containing the solve status.
"""
res = _ThreadingResult()
if sys.platform == 'win32':
finished_event = win32event.CreateEvent(None, 0, 0, None)
def thread_func():
try:
self._run(res)
finally:
win32event.SetEvent(finished_event)
t = Thread(target=thread_func)
t.start()
stop_event = win32event.CreateEvent(None, 0, 0, None)
win32api.SetConsoleCtrlHandler(lambda x: win32event.SetEvent(stop_event), True)
wait_objects = [stop_event, finished_event]
wait_result = win32event.WaitForMultipleObjects(wait_objects, False, win32event.INFINITE)
if wait_result == win32event.WAIT_OBJECT_0:
raise KeyboardInterrupt
else:
t = Thread(target=self._run, args=(res,))
t.start()
t.join()
return res.out
In my minimal testing, the solver works as usual if not interrupted and interrupts in the same way as on linux on CTRL+C.
I'm learning more about Python multithreading in a parallelism session at the NumFOCUS Project Summit, and some of the discussion above is making a lot more sense 🙂. If nothing else, I know what "releasing the GIL" refers to!
I found a version that works on linux and windows with the branch as it was merged. Is there interest to have that as an example? I would wait for the fix for #1905 though.
import highspy
import threading
class CallbackManager:
def __init__(self, model):
self.last_intermediate_solution = None
self._model = model
self._solver_should_stop = False
self._solver_stopped = threading.Condition()
def _callback(self, callback_type, message, data_out, data_in, user_callback_data):
if callback_type == highspy.cb.HighsCallbackType.kCallbackMipImprovingSolution:
self.last_intermediate_solution = data_out.mip_solution
elif callback_type in (highspy.cb.HighsCallbackType.kCallbackSimplexInterrupt, highspy.cb.HighsCallbackType.kCallbackIpmInterrupt, highspy.cb.HighsCallbackType.kCallbackMipInterrupt):
if self._solver_should_stop:
data_in.user_interrupt = True
with self._solver_stopped:
self._solver_stopped.notify()
def _stop(self):
self._solver_should_stop = True
with self._solver_stopped:
self._solver_stopped.wait()
def _enable_callbacks(self):
self._model.setCallback(self._callback, None)
self._model.startCallback(highspy.cb.HighsCallbackType.kCallbackMipImprovingSolution)
self._model.startCallback(highspy.cb.HighsCallbackType.kCallbackSimplexInterrupt)
self._model.startCallback(highspy.cb.HighsCallbackType.kCallbackIpmInterrupt)
self._model.startCallback(highspy.cb.HighsCallbackType.kCallbackMipInterrupt)
def _disable_callbacks(self):
self._model.stopCallback(highspy.cb.HighsCallbackType.kCallbackMipImprovingSolution)
self._model.stopCallback(highspy.cb.HighsCallbackType.kCallbackSimplexInterrupt)
self._model.stopCallback(highspy.cb.HighsCallbackType.kCallbackIpmInterrupt)
self._model.stopCallback(highspy.cb.HighsCallbackType.kCallbackMipInterrupt)
def start_solver(self, solve_func):
self._enable_callbacks()
try:
t = threading.Thread(target=solve_func)
t.start()
while t.is_alive():
t.join(0.1)
except KeyboardInterrupt:
print("KeyboardInterrupt: Waiting for highs to finish...")
self._stop()
print("KeyboardInterrupt: Highs finished")
finally:
self._disable_callbacks()
h = highspy.Highs()
assert h.readModel("30n20b8.mps") == highspy.HighsStatus.kOk, "model file not found. download it here: https://miplib.zib.de/instance_details_30n20b8.html"
callback_manager = CallbackManager(h)
callback_manager.start_solver(lambda: h.run())
model_status = h.getModelStatus()
print(f"model_status: {model_status}")
solution = None
if model_status == highspy.HighsModelStatus.kOptimal:
solution = h.getSolution().col_value
elif model_status in (highspy.HighsModelStatus.kInterrupt, highspy.HighsModelStatus.kNotset) and callback_manager.last_intermediate_solution is not None:
solution = callback_manager.last_intermediate_solution
model_status = highspy.HighsModelStatus.kOptimal
elif model_status == highspy.HighsModelStatus.kInfeasible:
pass
print(f"solution found: {solution is not None}")
Hi @michaelbynum, @jajhall, and @few!
After rebasing my branch to include this PR I'm seeing many random deadlocks in my extended highspy unit tests. I've made some fixes, but wanted to confirm some details.
-
I don't think
highspy.solve()should be called automatically in a separate thread. It seems to be causing the deadlock issues and it becomes messy when the user actually wants to run multiple HiGHS instances in different threads. If the user wants support for KeyboardInterupts, then the code by @few works great! I don't think we can integrate this behaviour into highspy by default, since the user might want to use the callbacks for other things, and again, it gets messy if the user has multiple instances of HiGHS running in different threads. -
I believe the GIL release code below can be simplified to
.def("run", &Highs::run, py::call_guard<py::gil_scoped_release>()). That is, thepy::gil_scoped_releasereleases the GIL in the constructor and acquires the GIL in the destructor. Whereaspy::gil_scoped_acquire()does the opposite - and is unnecessary?
HighsStatus highs_run(Highs* h) {
py::gil_scoped_release release;
HighsStatus status = h->run();
py::gil_scoped_acquire(); # unnecessary?
return status;
}
- Since we're releasing the GIL in
run, don't we need to re-acquire the GIL when C++ calls the python callbacks? e.g., see the code below. Also, this fixes #1904, by interpreting thevoid*as a python "object" (using handle to avoid reference counting issues).
// Wrap the setCallback function. Pass a lambda wrapper around the python function
// that acquires the GIL and appropriately handle user data passed to the callback
HighsStatus highs_setCallback(
Highs* h,
std::function<void(int, const std::string&, const HighsCallbackDataOut*, HighsCallbackDataIn*, py::handle)> fn,
py::handle data) {
return h->setCallback([fn, data](int callbackType, const std::string& msg,
const HighsCallbackDataOut* dataOut,
HighsCallbackDataIn* dataIn, void* d) {
py::gil_scoped_acquire acquire;
return fn(callbackType, msg, dataOut, dataIn, py::handle(reinterpret_cast<PyObject*>(d)));
}, data.ptr());
}
Thanks! BTW: I did some tests with multiple HiGHS instances/threads in python, and releasing the GIL does make a big performance improvement!
I think I might agree. I think callbacks may be a better way to handle keyboard interrupts. I think you are also correct about python callbacks, except that the GIL needs released again after the callback.
I can create an optional callback that can be enabled with an option to handle keyboard interrupts?
I think I might agree. I think callbacks may be a better way to handle keyboard interrupts. I think you are also correct about python callbacks, except that the GIL needs released again after the callback.
I believe the py::gil_scoped_acquire should release the GIL in its destructor.
I can create an optional callback that can be enabled with an option to handle keyboard interrupts?
Yeah I agree. Thinking on this further, I think we can have an additional python callback proxy that can enable optional keyboard interupts, while still allowing the user to have custom callback code. I'll have a quick play with the idea I have in mind.
Ahh... That is convenient.
I have an approach that seems reasonable to me. For example:
h = highspy.Highs()
h.readModel('30n20b8.mps')
h.HandleKeyboardInterrupt = True
solutions = []
h.cbMipImprovingSolution.subscribe(lambda e: solutions.append(e.data_out.mip_solution))
h.solve()
print(solutions)
The HandleKeyboardInterrupt = True will add callbacks that check for termination and will launch the solve in a separate thread. I've added support for multiple callbacks on each event, so the user can still capture events - and even specify up-front which event they'd like to handle.
The user can also manually cancel the solve, e.g., the following will cancel when the first MIP solution is found, and then wait until HiGHS finishes up.
h.cbMipImprovingSolution.subscribe(lambda e: h.cancelSolve())
h.solve()
The code seems to be working well; however I will need to add some extra unit tests.
That sounds great.
h = highspy.Highs() h.readModel('30n20b8.mps') h.HandleKeyboardInterrupt = True solutions = [] h.cbMipImprovingSolution.subscribe(lambda e: solutions.append(e.data_out.mip_solution)) h.solve() print(solutions)
What happens from the POV of the python script when a KeyboardInterrupt occurs during h.solve()? I'd assume there is no KeyboardInterrupt to capture and h.solve() just finishes with model status kInterrupt.
What happens if a second keyboard interrupt occurs while the solver is still running (and hasn't reached one of the interrupt callbacks yet)?
Great questions!
What happens from the POV of the python script when a KeyboardInterrupt occurs during
h.solve()? I'd assume there is no KeyboardInterrupt to capture andh.solve()just finishes with model statuskInterrupt.
KeyboardInterrupts are supported if HandleKeyboardInterrupts = True. I borrowed and adapted your code. See below for a rough outline (I've removed docstrings for brevity).
Threading Code
def solve(self):
if self.HandleKeyboardInterrupt == False:
return super().run()
else:
return self.joinSolve(self.startSolve())
# Starts the solver in a separate thread. Useful for handling KeyboardInterrupts.
# Do not attempt to modify the model while the solver is running.
def startSolve(self):
if self.is_solver_running() == False:
self.__solver_started.acquire()
self.__solver_should_stop = False
self.__solver_status = None
t = Thread(target=self.__solve, daemon=True)
t.start()
# wait for solver thread to acquire __solver_stopped to avoid synchronization issues
with self.__solver_started:
return t
else:
raise Exception("Solver is already running.")
def joinSolve(self, solver_thread=None):
if solver_thread is not None:
try:
while solver_thread.is_alive():
solver_thread.join(0.1)
except KeyboardInterrupt:
print('KeyboardInterrupt: Waiting for HiGHS to finish...')
self.cancelSolve()
try:
# wait for shared lock, i.e., solver to finish
self.__solver_stopped.acquire(True)
except KeyboardInterrupt:
pass # ignore additional KeyboardInterrupt here
finally:
self.__solver_stopped.release()
return self.__solver_status
def wait(self, timeout=-1.0):
result = False, None
try:
result = self.__solver_stopped.acquire(True, timeout=timeout), self.__solver_status
return result
finally:
if result[0] == True:
self.__solver_status = None # reset status
self.__solver_stopped.release()
# internal solve method for use with threads
# will set the status of the solver when finished and release the shared lock
def __solve(self):
with self.__solver_stopped:
self.__solver_started.release() # allow main thread to continue
self.__solver_status = super().run()
What happens if a second keyboard interrupt occurs while the solver is still running (and hasn't reached one of the interrupt callbacks yet)?
I'm happy for feedback here. By default, I ignore any future Ctrl-C interrupts. We could instead force terminate after X attempts (see below), or we could just leave it up to the user if they want this behaviour.
def custom_wait(h, limit=5):
result = (False, None)
for count in range(limit):
try:
while result[0] == False:
result = h.wait(0.1)
return result[1]
except KeyboardInterrupt:
print(f'Ctrl-C pressed {count+1} times: Waiting for HiGHS to finish. ({limit} times to force termination)')
h.cancelSolve()
# if we reach this point, we should force termination
print("Forcing termination...")
exit(1)
h = highspy.Highs()
h.readModel("30n20b8.mps")
h.HandleUserInterrupt = True # add interrupt callbacks, but user will handle KeyboardInterrupts manually
h.startSolve()
custom_wait(h)
Which gives:
Ctrl-C pressed 1 times: Waiting for HiGHS to finish. (5 times to force termination)
Ctrl-C pressed 2 times: Waiting for HiGHS to finish. (5 times to force termination)
Ctrl-C pressed 3 times: Waiting for HiGHS to finish. (5 times to force termination)
Ctrl-C pressed 4 times: Waiting for HiGHS to finish. (5 times to force termination)
Ctrl-C pressed 5 times: Waiting for HiGHS to finish. (5 times to force termination)
Forcing termination...
FYI @michaelbynum, @jajhall, and @few,
I found the cause of deadlocking when running HiGHS in a temporary thread. e.g., the following code will randomly deadlock.
Note: this is not just a python issue. C++ will also deadlock. BTW: I tagged @jajhall since you'll be looking at some parallel code in the near future.
for _ in range(10):
h = # some small highs model, can be same model for each loop or different instance (doesn't matter)
t = Thread(target=h.run)
t.start()
t.join()
TLDR: calling HighsTaskExecutor::shutdown() or Highs::resetGlobalScheduler() within the temporary thread will avoid the issue. Alternatively, we can rewrite the synchronization code in HighsTaskExecutor.
# e.g., this does not deadlock
for i in range(10):
t = Thread(target=lambda: h.run() and highspy.Highs.resetGlobalScheduler(False))
t.start()
t.join()
Details:
- Each time we solve in a new thread,
highs::parallel::initialize_scheduleris called, which initializes a newthread_local mainWorkerHandleand spins up new worker threads (that are detached immediately). - These worker threads can take some time to start, but HiGHS can start processing on the already active ones.
- HiGHS can finish before all the worker threads start, and the main "temporary thread" terminates.
- When the main thread dies, the
thread_local mainWorkerHandledestructor callsHighsTaskExecutor::shutdown()which blocks until all worker threads are initialized and synchronized. - This is where it's hard to debug, but it seems that the thread scheduler (at least in Windows), waits to start these "un-started" worker threads until the "currently destructing" thread dies - which is blocked, waiting for these "un-started" worker threads.
- Hence deadlock.
If we call HighsTaskExecutor::shutdown() at the end of the temporary thread (instead of in its destructor), the thread scheduler isn't blocked, and it can synchronize as expected.
That said, this synchronization isn't strictly necessary. It should be possible to inform the worker threads to shutdown without waiting for them to initialize - since they already share the same this pointer. The last worker thread to die will also delete the shared memory. The required code change is small, but probably needs significant testing.
@jajhall: Do you think this is something worth doing? Should I log an issue / submit a PR?
This is potentially very interesting, as we've got a long-standing issue #1044 that @galabovaa is currently working on
Do you think this is something worth doing? Should I log an issue / submit a PR?
Thanks for the offer to help, but let's see what @galabovaa thinks first
Good point! I hadn't made that connection, but after looking at the history, it's very possibly related. I've also noticed the deadlock is more likely to occur if numThreads is larger.
Looks like there's a lot of connected issues and history, so I don't know reproduction steps for all of them (to verify). But I'm happy to work with @galabovaa if she'd like!
Thank you for your observations on the thread scheduler, @mathgeekcoder! I have been trying to work out from where within HiGHS to call shutdown(), and how to restructure the code, in order to resolve this threads issue. It's popped up from different users calling HiGHS from different interfaces, and it is very hard to reproduce or debug. As you say, changes would require signifiacnt testing, so we have been cautious about this one.
I will get a chance to dig deeper next week. We have to make a change to the code, for sure. Testing-wise, we get data race warnings from the thread sanitizer, so clearing those would be a great first step. Valgrind pops some warnings too and I suspect they are all related.
If what you have in mind is a small change to the code, it would be great if you could open a PR with your suggested modifications. As long as it is not something you would have to spend a lot of time on. We may get another suggestion for handling it next week, but I think it would be great to explore your idea of informing the worker threads to shutdown without waiting for them to initialize.