rippled icon indicating copy to clipboard operation
rippled copied to clipboard

Optimize thread pool implementation:

Open nbougalis opened this issue 3 years ago • 11 comments

The existing thread pool code uses several layers of indirection which uses a custom lock-free stack, and offers functionality that supports features that are never used (e.g. the ability to dynamically adjust the number of threads in the pool).

This refactoring aims to simplify the code, making it easier to reason about (although lock-free multi-threaded code is always tricky) what is happening, and reduce the latency of the thread pool internals.

High Level Overview of Change

Context of Change

Type of Change

  • [ ] Bug fix (non-breaking change which fixes an issue)
  • [ ] New feature (non-breaking change which adds functionality)
  • [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • [ ] Refactor (non-breaking change that only restructures code)
  • [ ] Tests (You added tests for code that already exists, or your new feature included in this PR)
  • [ ] Documentation Updates
  • [ ] Release

nbougalis avatar May 23 '22 01:05 nbougalis

So having looked at your proposed changes, I'm not sure I'm sold. I dislike the need for a vector to hold the thread instances. I suspect that simply changing the std::this_thread::sleep_for(std::chrono::seconds(1)); to something like std::this_thread::yield() will be sufficient to restore the performance.

nbougalis avatar May 27 '22 05:05 nbougalis

One person's cake is another person's poison. Indeed, another approach would be to see if a yield() instead of a sleep_for() returns the performance. But I like the vector of threads because it leaves an artifact behind that future maintainers can easily see. A thread that is detached is virtually invisible to a maintainer. A join() on shutdown is ubiquitous in the code base, effective, and easy for a maintainer to understand.

scottschurr avatar May 27 '22 16:05 scottschurr

One person's cake is another person's poison. Indeed, another approach would be to see if a yield() instead of a sleep_for() returns the performance. But I like the vector of threads because it leaves an artifact behind that future maintainers can easily see. A thread that is detached is virtually invisible to a maintainer. A join() on shutdown is ubiquitous in the code base, effective, and easy for a maintainer to understand.

So, yield does return the performance back to where it was. And while I personally prefer not having the std::vector to be joined (especially since C++20 brings with it the ability to wait on std::atomic) I see your point: the std::vector does expose the "artifact" for future maintainers, which is something I should be mindful of.

I'll think about this some more.

nbougalis avatar May 30 '22 03:05 nbougalis

@seelabs writes:

This is a large behavior change that probably should get it's own patch and not be folded into an unrelated change. There are times where we purposely crash the app on unhandled exceptions - because we don't want to continue in an insane state. We've discussed making changes like this in the past, and have always opted to keep the same behavior.

I'm not necessarily against making this change, but at minimum we need to make this change much more visible.

I hear what you're saying; is this behavior change appropriate for this PR?

Let's take a step back. My position is that relying on this "hidden" behavior (i.e. that the job queue doesn't catch exceptions and that to throw signals "ABANDON SHIP!") is anathema to good software engineering practices. We've both encountered core dumps that might be traceable to this behavior.

So at some point we're going to need to fix this. The way I see it is that:

  • Either the job handlers need to be marked as noexcept, so that the compiler knows they won't throw and can warn at compile time if some code path is unportected and can throw, and if something slips through, the runtime is required by the standard to kill the process through std::terminate; or
  • The job queue itself must replicate these semantics by catching these exceptions, warning loudly, and, terminating.

I think that terminating is reasonable because, as you said, it's hard for the job queue to properly handle exceptions. So I would suggest that we leave the new try/catch in place and simply add an explicit call to LogicError (which, honestly, is horribly named and ought to be renamed to ripple::abort(std::string const& msg) and should just call std::abort, but that's another story) in the catch handler.

nbougalis avatar Jun 06 '22 05:06 nbougalis

@nbougalis Re: catching the exception (I wish github would let us respond to top level comments): That's a reasonable position - my real concern was putting this behavior change in without giving it the attention it deserves (because it looks innocuous and it part of an optimization patch). And now it's getting attention, so I'm happy.

We have two behaviors that need to be accounted for:

  1. We threw an exception, but it's not a big deal, we can continue on just fine, but we didn't catch it and terminated the server. You're right, we've encountered those exceptions in the past and needed to patch.

  2. We threw an exception, and the exception means we're in an insane state and the best thing we can do is terminate the application.

Unfortunately, it's really hard to audit the code for this type be behavior. There are no good tools that I know of that let me look at where an an exception is thrown and find where it might be caught or find the call paths where it won't be caught (and if anyone knows of such a tool I'd LOVE to hear about it - I made a feature request to rtags a while back, but no love so far: https://github.com/Andersbakken/rtags/issues/1383).

Without an audit we have to decide to either terminate where we shouldn't have, or continue in an insane state where we shouldn't have. Neither choice is good. The advantage of terminating when we shouldn't have is we will be notified and we can patch these issues. If we paper them over we'll never fix them. On the other hand, these issues can be a potential source of DOS attacks on a server.

Bottom line: I vote to terminate, but either choice is reasonable and I'm fine with whatever you decide.

One minor note: I don't think marking an function noexcept will get us compiler warnings if the function actually throws - at least I've never seen this. I could see if it did this one level deep, but it almost certainly won't do this more than one level. Heck, new can throw, so we'd see warnings EVERYWHERE.

seelabs avatar Jun 06 '22 13:06 seelabs

Just FYI, the Refactor JobQueue commit message has some junk tagged onto the end. You may wish to clean that up.

scottschurr avatar Jun 17 '22 19:06 scottschurr

I tried out the most recent version of these changes on macOS. I see two things that need to be addressed:

  1. Clang pointed out that JobTypeData(JobTypeData&& other) = default is invalid. Since LoadMonitor is immovable, then JobTypeData must also be immovable. This line could be removed or you could use = delete instead.

  2. While running the unit tests on a debug build, the Coroutine test failed with a segmentation fault. Here is the message I got:

ripple.core.Coroutine thread specific storage
Logic error: Job: LocalValue-Test: Exception caught during task processing: mutex lock failed: Invalid argument
Segmentation fault: 11

The stack trace points at JobQueue.cpp line 265: https://github.com/XRPLF/rippled/pull/4172/files#diff-85e882e68cc55760882821eba100a325cad49b5bce66e21047bd861b3a7e6fedR262-R264

The good news is that the exception message got out. The bad news is that since we caught the exception the trace to the actual source of the exception is lost.

The Coroutine unit test crash is not consistent, but if I loop the unit tests I get a crash every 20 repetitions or so. So it's not hard to reproduce.

I removed the try/catch in Workers.cpp to see what I could find out from the stack trace. Here's that stack trace from the crashed thread:

Thread 6 Crashed:: Job: LocalValue-Test
0   libsystem_kernel.dylib        	0x00007fff6996e33a __pthread_kill + 10
1   libsystem_pthread.dylib       	0x00007fff69a2ae60 pthread_kill + 430
2   libsystem_c.dylib             	0x00007fff698f5808 abort + 120
3   libc++abi.dylib               	0x00007fff66b54458 abort_message + 231
4   libc++abi.dylib               	0x00007fff66b458a7 demangling_terminate_handler() + 238
5   libobjc.A.dylib               	0x00007fff686805b1 _objc_terminate() + 104
6   libc++abi.dylib               	0x00007fff66b53887 std::__terminate(void (*)()) + 8
7   libc++abi.dylib               	0x00007fff66b561a2 __cxxabiv1::failed_throw(__cxxabiv1::__cxa_exception*) + 27
8   libc++abi.dylib               	0x00007fff66b56169 __cxa_throw + 113
9   libc++.1.dylib                	0x00007fff66b3055b std::__1::__throw_system_error(int, char const*) + 77
10  libc++.1.dylib                	0x00007fff66b2754d std::__1::mutex::lock() + 29
11  rippled                       	0x000000010da12e83 std::__1::lock_guard<std::__1::mutex>::lock_guard(std::__1::mutex&) + 35 (__mutex_base:91)
12  rippled                       	0x000000010da1260d std::__1::lock_guard<std::__1::mutex>::lock_guard(std::__1::mutex&) + 29 (__mutex_base:91)
13  rippled                       	0x000000010da2698a ripple::test::Coroutine_test::gate::signal() + 42
14  rippled                       	0x000000010da35bf9 ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()::operator()() const + 89 (Coroutine_test.cpp:172)
15  rippled                       	0x000000010da35b8e ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>::operator()() + 30 (ClosureCounter.h:140)
16  rippled                       	0x000000010da35b4d decltype(std::__1::forward<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>&>(fp)()) std::__1::__invoke<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>&>(ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>&) + 29 (type_traits:3545)
17  rippled                       	0x000000010da35afd void std::__1::__invoke_void_return_wrapper<void>::__call<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>&>(ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>&) + 29 (__functional_base:349)
18  rippled                       	0x000000010da35acd std::__1::__function::__alloc_func<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>, std::__1::allocator<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()> >, void ()>::operator()() + 29 (functional:1546)
19  rippled                       	0x000000010da3491e std::__1::__function::__func<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>, std::__1::allocator<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()> >, void ()>::operator()() + 30 (functional:1720)
20  rippled                       	0x000000010b3fb1b5 std::__1::__function::__value_func<void ()>::operator()() const + 53 (functional:1873)
21  rippled                       	0x000000010b3fb155 std::__1::function<void ()>::operator()() const + 21 (functional:2548)
22  rippled                       	0x000000010bd19b7d ripple::Job::doJob() + 141 (Job.cpp:68)
23  rippled                       	0x000000010bd20593 ripple::JobQueue::processTask(unsigned int) + 723 (JobQueue.cpp:328)
24  rippled                       	0x000000010bd64341 ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0::operator()(unsigned int) const + 977 (Workers.cpp:80)
25  rippled                       	0x000000010bd63ee1 decltype(std::__1::forward<ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0>(fp)(std::__1::forward<unsigned int>(fp0))) std::__1::__invoke<ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0, unsigned int>(ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0&&, unsigned int&&) + 49 (type_traits:3545)
26  rippled                       	0x000000010bd63e2e void std::__1::__thread_execute<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0, unsigned int, 2ul>(std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0, unsigned int>&, std::__1::__tuple_indices<2ul>) + 62 (thread:274)
27  rippled                       	0x000000010bd63576 void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0, unsigned int> >(void*) + 118 (thread:284)
28  libsystem_pthread.dylib       	0x00007fff69a2b109 _pthread_start + 148
29  libsystem_pthread.dylib       	0x00007fff69a26b8b thread_start + 15

scottschurr avatar Aug 09 '22 18:08 scottschurr

The issue that @scottschurr encountered had to do with timing, but it did lead me to discover an implementation issue that could cause a worker thread to sleep despite the fact that work was waiting.

I've reworked the code to address this and now leverage more C++20 functionality, since the PR to enable this was merged into develop.

I know it's annoying to have to do this, but please re-review the first commit (that changes the Workers logic) in its entirety.

nbougalis avatar Aug 29 '22 23:08 nbougalis

With the most recent code I consistently get the following failure on the JobQueue unit test:

$ build/rippled --unittest=JobQueue
ripple.core.JobQueue
Assertion failed: (!stopped_), function postCoro, file JobQueue.h, line 391.
Abort trap: 6

I'm running macOS 12.5.1, Apple clang version 13.1.6 (clang-1316.0.21.2.5).

scottschurr avatar Sep 01 '22 17:09 scottschurr

With the most recent code I consistently get the following failure on the JobQueue unit test:

$ build/rippled --unittest=JobQueue
ripple.core.JobQueue
Assertion failed: (!stopped_), function postCoro, file JobQueue.h, line 391.
Abort trap: 6

I'm running macOS 12.5.1, Apple clang version 13.1.6 (clang-1316.0.21.2.5).

Yeah, that was just sloppy of me. It's been fixed and the ugly-looking code in the unit tests has been removed.

nbougalis avatar Sep 06 '22 16:09 nbougalis

The most recent version of this pull request fails the clang-format check. The top-most commit on this brand will fix that: https://github.com/scottschurr/rippled/commits/nik-jqt-clang-format

Other than the clang-format problems, is this branch ready to be re-reviewed? Or is there more work to be done?

scottschurr avatar Sep 21 '22 22:09 scottschurr

@scottschurr Thanks for the patience, the comments and the clang-format fix. I believe I've addressed everything, rebased the code to the latest and it should be ready for review.

I know there's a couple of clang-format failures still; I'll work around them. The existing formatting rules don't like attributes very much (e.g. [[likely]]) which is unfortunate.

nbougalis avatar Dec 19 '22 05:12 nbougalis

Ping...

nbougalis avatar Jan 12 '23 21:01 nbougalis

@thejohnfreeman @seelabs @scottschurr - looks like this is ready for (re)review.

If you won't be able to have an initial look at this within 2 weeks (and provide some feedback), please comment here to let us know.

intelliot avatar Jan 19 '23 00:01 intelliot

@intelliot, I'm not ignoring you, but I have another higher priority code review I'm working on. I can't predict whether or not I'll get to this one in the coming two weeks.

scottschurr avatar Jan 20 '23 23:01 scottschurr

This isn't a high priority item, imo. I'm sure Scott et al are focused on business priorities and that's fine; this can wait.

nbougalis avatar Jan 23 '23 15:01 nbougalis

@scottschurr, no worries. I know this was (a) a large PR; (b) not a priority; and (c) you're swamped. I appreciate you taking another look.

Re: your comment about an apparently missing lock in this bit of code:

return {jobData_[t].load().sample(), std::move(name), true};

You say:

Pretty much all of the other accesses to jobData_ are done with mutex_ locked. I have the uneasy sensation that there's a lock missing here.

Here's my (preliminary) rationale for why a lock isn't missing:

The jobData_ array is constructed once and each entry in the array contains a LoadMonitor instance load_. The load() member function returns a reference to the given member.

The LoadMonitor instance contains a LoadSampler member variable which is:

using LoadSampler = std::function<void(
    char const*,
    std::chrono::steady_clock::duration,
    std::chrono::steady_clock::duration)>;

And sample() returns a std::reference_wrapper<LoadSampler const> to that member.

Given that, I think that it's fine as is, although it's been a long time since I've even looked at this code and I will have to re-check to be sure.

I'll take a closer looks when I rebase this.

nbougalis avatar Apr 29 '23 19:04 nbougalis

@nbougalis if this PR is ready for review and merging (in your opinion), then please avoid force-pushing to the branch. Best practices would require that any changes to the PR after a review should invalidate that review.

Also, when a branch is force-pushed or any changes are made to it, please comment on the PR to explain why the push / change was made.

intelliot avatar May 09 '23 16:05 intelliot

How can I avoid force-pushing if I rebase a branch to resolve merge conflicts with upstream?

nbougalis avatar May 13 '23 04:05 nbougalis

@nbougalis Instead of rebasing, just do a merge commit (git merge). The merge commit resolves merge conflicts with upstream. The merge commit will go away when the PR is merged to develop because we use Squash and merge.

intelliot avatar May 17 '23 22:05 intelliot

@thejohnfreeman what's the status of your review of this?

intelliot avatar Jun 26 '23 16:06 intelliot

@seelabs will you be able to review this?

intelliot avatar Jul 19 '23 01:07 intelliot

@intelliot I won't be able to review this week for sure

seelabs avatar Jul 19 '23 14:07 seelabs

@nbougalis - does this PR still make sense to merge? If so, please consider the above comments and update the branch to be up-to-date with develop. If not, then share a brief explanation and close. Thanks!

intelliot avatar Sep 06 '23 06:09 intelliot