rippled
rippled copied to clipboard
Optimize thread pool implementation:
The existing thread pool code uses several layers of indirection which uses a custom lock-free stack, and offers functionality that supports features that are never used (e.g. the ability to dynamically adjust the number of threads in the pool).
This refactoring aims to simplify the code, making it easier to reason about (although lock-free multi-threaded code is always tricky) what is happening, and reduce the latency of the thread pool internals.
High Level Overview of Change
Context of Change
Type of Change
- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] Refactor (non-breaking change that only restructures code)
- [ ] Tests (You added tests for code that already exists, or your new feature included in this PR)
- [ ] Documentation Updates
- [ ] Release
So having looked at your proposed changes, I'm not sure I'm sold. I dislike the need for a vector to hold the thread instances. I suspect that simply changing the std::this_thread::sleep_for(std::chrono::seconds(1)); to something like std::this_thread::yield() will be sufficient to restore the performance.
One person's cake is another person's poison. Indeed, another approach would be to see if a yield() instead of a sleep_for() returns the performance. But I like the vector of threads because it leaves an artifact behind that future maintainers can easily see. A thread that is detached is virtually invisible to a maintainer. A join() on shutdown is ubiquitous in the code base, effective, and easy for a maintainer to understand.
One person's cake is another person's poison. Indeed, another approach would be to see if a
yield()instead of asleep_for()returns the performance. But I like the vector of threads because it leaves an artifact behind that future maintainers can easily see. A thread that isdetached is virtually invisible to a maintainer. Ajoin()on shutdown is ubiquitous in the code base, effective, and easy for a maintainer to understand.
So, yield does return the performance back to where it was. And while I personally prefer not having the std::vector to be joined (especially since C++20 brings with it the ability to wait on std::atomic) I see your point: the std::vector does expose the "artifact" for future maintainers, which is something I should be mindful of.
I'll think about this some more.
@seelabs writes:
This is a large behavior change that probably should get it's own patch and not be folded into an unrelated change. There are times where we purposely crash the app on unhandled exceptions - because we don't want to continue in an insane state. We've discussed making changes like this in the past, and have always opted to keep the same behavior.
I'm not necessarily against making this change, but at minimum we need to make this change much more visible.
I hear what you're saying; is this behavior change appropriate for this PR?
Let's take a step back. My position is that relying on this "hidden" behavior (i.e. that the job queue doesn't catch exceptions and that to throw signals "ABANDON SHIP!") is anathema to good software engineering practices. We've both encountered core dumps that might be traceable to this behavior.
So at some point we're going to need to fix this. The way I see it is that:
- Either the job handlers need to be marked as
noexcept, so that the compiler knows they won't throw and can warn at compile time if some code path is unportected and can throw, and if something slips through, the runtime is required by the standard to kill the process throughstd::terminate; or - The job queue itself must replicate these semantics by catching these exceptions, warning loudly, and, terminating.
I think that terminating is reasonable because, as you said, it's hard for the job queue to properly handle exceptions. So I would suggest that we leave the new try/catch in place and simply add an explicit call to LogicError (which, honestly, is horribly named and ought to be renamed to ripple::abort(std::string const& msg) and should just call std::abort, but that's another story) in the catch handler.
@nbougalis Re: catching the exception (I wish github would let us respond to top level comments): That's a reasonable position - my real concern was putting this behavior change in without giving it the attention it deserves (because it looks innocuous and it part of an optimization patch). And now it's getting attention, so I'm happy.
We have two behaviors that need to be accounted for:
-
We threw an exception, but it's not a big deal, we can continue on just fine, but we didn't catch it and terminated the server. You're right, we've encountered those exceptions in the past and needed to patch.
-
We threw an exception, and the exception means we're in an insane state and the best thing we can do is terminate the application.
Unfortunately, it's really hard to audit the code for this type be behavior. There are no good tools that I know of that let me look at where an an exception is thrown and find where it might be caught or find the call paths where it won't be caught (and if anyone knows of such a tool I'd LOVE to hear about it - I made a feature request to rtags a while back, but no love so far: https://github.com/Andersbakken/rtags/issues/1383).
Without an audit we have to decide to either terminate where we shouldn't have, or continue in an insane state where we shouldn't have. Neither choice is good. The advantage of terminating when we shouldn't have is we will be notified and we can patch these issues. If we paper them over we'll never fix them. On the other hand, these issues can be a potential source of DOS attacks on a server.
Bottom line: I vote to terminate, but either choice is reasonable and I'm fine with whatever you decide.
One minor note: I don't think marking an function noexcept will get us compiler warnings if the function actually throws - at least I've never seen this. I could see if it did this one level deep, but it almost certainly won't do this more than one level. Heck, new can throw, so we'd see warnings EVERYWHERE.
Just FYI, the Refactor JobQueue commit message has some junk tagged onto the end. You may wish to clean that up.
I tried out the most recent version of these changes on macOS. I see two things that need to be addressed:
-
Clang pointed out that
JobTypeData(JobTypeData&& other) = defaultis invalid. SinceLoadMonitoris immovable, thenJobTypeDatamust also be immovable. This line could be removed or you could use= deleteinstead. -
While running the unit tests on a debug build, the
Coroutinetest failed with a segmentation fault. Here is the message I got:
ripple.core.Coroutine thread specific storage
Logic error: Job: LocalValue-Test: Exception caught during task processing: mutex lock failed: Invalid argument
Segmentation fault: 11
The stack trace points at JobQueue.cpp line 265: https://github.com/XRPLF/rippled/pull/4172/files#diff-85e882e68cc55760882821eba100a325cad49b5bce66e21047bd861b3a7e6fedR262-R264
The good news is that the exception message got out. The bad news is that since we caught the exception the trace to the actual source of the exception is lost.
The Coroutine unit test crash is not consistent, but if I loop the unit tests I get a crash every 20 repetitions or so. So it's not hard to reproduce.
I removed the try/catch in Workers.cpp to see what I could find out from the stack trace. Here's that stack trace from the crashed thread:
Thread 6 Crashed:: Job: LocalValue-Test
0 libsystem_kernel.dylib 0x00007fff6996e33a __pthread_kill + 10
1 libsystem_pthread.dylib 0x00007fff69a2ae60 pthread_kill + 430
2 libsystem_c.dylib 0x00007fff698f5808 abort + 120
3 libc++abi.dylib 0x00007fff66b54458 abort_message + 231
4 libc++abi.dylib 0x00007fff66b458a7 demangling_terminate_handler() + 238
5 libobjc.A.dylib 0x00007fff686805b1 _objc_terminate() + 104
6 libc++abi.dylib 0x00007fff66b53887 std::__terminate(void (*)()) + 8
7 libc++abi.dylib 0x00007fff66b561a2 __cxxabiv1::failed_throw(__cxxabiv1::__cxa_exception*) + 27
8 libc++abi.dylib 0x00007fff66b56169 __cxa_throw + 113
9 libc++.1.dylib 0x00007fff66b3055b std::__1::__throw_system_error(int, char const*) + 77
10 libc++.1.dylib 0x00007fff66b2754d std::__1::mutex::lock() + 29
11 rippled 0x000000010da12e83 std::__1::lock_guard<std::__1::mutex>::lock_guard(std::__1::mutex&) + 35 (__mutex_base:91)
12 rippled 0x000000010da1260d std::__1::lock_guard<std::__1::mutex>::lock_guard(std::__1::mutex&) + 29 (__mutex_base:91)
13 rippled 0x000000010da2698a ripple::test::Coroutine_test::gate::signal() + 42
14 rippled 0x000000010da35bf9 ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()::operator()() const + 89 (Coroutine_test.cpp:172)
15 rippled 0x000000010da35b8e ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>::operator()() + 30 (ClosureCounter.h:140)
16 rippled 0x000000010da35b4d decltype(std::__1::forward<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>&>(fp)()) std::__1::__invoke<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>&>(ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>&) + 29 (type_traits:3545)
17 rippled 0x000000010da35afd void std::__1::__invoke_void_return_wrapper<void>::__call<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>&>(ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>&) + 29 (__functional_base:349)
18 rippled 0x000000010da35acd std::__1::__function::__alloc_func<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>, std::__1::allocator<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()> >, void ()>::operator()() + 29 (functional:1546)
19 rippled 0x000000010da3491e std::__1::__function::__func<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>, std::__1::allocator<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()> >, void ()>::operator()() + 30 (functional:1720)
20 rippled 0x000000010b3fb1b5 std::__1::__function::__value_func<void ()>::operator()() const + 53 (functional:1873)
21 rippled 0x000000010b3fb155 std::__1::function<void ()>::operator()() const + 21 (functional:2548)
22 rippled 0x000000010bd19b7d ripple::Job::doJob() + 141 (Job.cpp:68)
23 rippled 0x000000010bd20593 ripple::JobQueue::processTask(unsigned int) + 723 (JobQueue.cpp:328)
24 rippled 0x000000010bd64341 ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0::operator()(unsigned int) const + 977 (Workers.cpp:80)
25 rippled 0x000000010bd63ee1 decltype(std::__1::forward<ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0>(fp)(std::__1::forward<unsigned int>(fp0))) std::__1::__invoke<ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0, unsigned int>(ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0&&, unsigned int&&) + 49 (type_traits:3545)
26 rippled 0x000000010bd63e2e void std::__1::__thread_execute<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0, unsigned int, 2ul>(std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0, unsigned int>&, std::__1::__tuple_indices<2ul>) + 62 (thread:274)
27 rippled 0x000000010bd63576 void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0, unsigned int> >(void*) + 118 (thread:284)
28 libsystem_pthread.dylib 0x00007fff69a2b109 _pthread_start + 148
29 libsystem_pthread.dylib 0x00007fff69a26b8b thread_start + 15
The issue that @scottschurr encountered had to do with timing, but it did lead me to discover an implementation issue that could cause a worker thread to sleep despite the fact that work was waiting.
I've reworked the code to address this and now leverage more C++20 functionality, since the PR to enable this was merged into develop.
I know it's annoying to have to do this, but please re-review the first commit (that changes the Workers logic) in its entirety.
With the most recent code I consistently get the following failure on the JobQueue unit test:
$ build/rippled --unittest=JobQueue
ripple.core.JobQueue
Assertion failed: (!stopped_), function postCoro, file JobQueue.h, line 391.
Abort trap: 6
I'm running macOS 12.5.1, Apple clang version 13.1.6 (clang-1316.0.21.2.5).
With the most recent code I consistently get the following failure on the JobQueue unit test:
$ build/rippled --unittest=JobQueue ripple.core.JobQueue Assertion failed: (!stopped_), function postCoro, file JobQueue.h, line 391. Abort trap: 6I'm running macOS 12.5.1, Apple clang version 13.1.6 (clang-1316.0.21.2.5).
Yeah, that was just sloppy of me. It's been fixed and the ugly-looking code in the unit tests has been removed.
The most recent version of this pull request fails the clang-format check. The top-most commit on this brand will fix that: https://github.com/scottschurr/rippled/commits/nik-jqt-clang-format
Other than the clang-format problems, is this branch ready to be re-reviewed? Or is there more work to be done?
@scottschurr Thanks for the patience, the comments and the clang-format fix. I believe I've addressed everything, rebased the code to the latest and it should be ready for review.
I know there's a couple of clang-format failures still; I'll work around them. The existing formatting rules don't like attributes very much (e.g. [[likely]]) which is unfortunate.
Ping...
@thejohnfreeman @seelabs @scottschurr - looks like this is ready for (re)review.
If you won't be able to have an initial look at this within 2 weeks (and provide some feedback), please comment here to let us know.
@intelliot, I'm not ignoring you, but I have another higher priority code review I'm working on. I can't predict whether or not I'll get to this one in the coming two weeks.
This isn't a high priority item, imo. I'm sure Scott et al are focused on business priorities and that's fine; this can wait.
@scottschurr, no worries. I know this was (a) a large PR; (b) not a priority; and (c) you're swamped. I appreciate you taking another look.
Re: your comment about an apparently missing lock in this bit of code:
return {jobData_[t].load().sample(), std::move(name), true};
You say:
Pretty much all of the other accesses to
jobData_are done withmutex_locked. I have the uneasy sensation that there's a lock missing here.
Here's my (preliminary) rationale for why a lock isn't missing:
The jobData_ array is constructed once and each entry in the array contains a LoadMonitor instance load_. The load() member function returns a reference to the given member.
The LoadMonitor instance contains a LoadSampler member variable which is:
using LoadSampler = std::function<void(
char const*,
std::chrono::steady_clock::duration,
std::chrono::steady_clock::duration)>;
And sample() returns a std::reference_wrapper<LoadSampler const> to that member.
Given that, I think that it's fine as is, although it's been a long time since I've even looked at this code and I will have to re-check to be sure.
I'll take a closer looks when I rebase this.
@nbougalis if this PR is ready for review and merging (in your opinion), then please avoid force-pushing to the branch. Best practices would require that any changes to the PR after a review should invalidate that review.
Also, when a branch is force-pushed or any changes are made to it, please comment on the PR to explain why the push / change was made.
How can I avoid force-pushing if I rebase a branch to resolve merge conflicts with upstream?
@nbougalis Instead of rebasing, just do a merge commit (git merge). The merge commit resolves merge conflicts with upstream. The merge commit will go away when the PR is merged to develop because we use Squash and merge.
@thejohnfreeman what's the status of your review of this?
@seelabs will you be able to review this?
@intelliot I won't be able to review this week for sure
@nbougalis - does this PR still make sense to merge? If so, please consider the above comments and update the branch to be up-to-date with develop. If not, then share a brief explanation and close. Thanks!