qpid-jms icon indicating copy to clipboard operation
qpid-jms copied to clipboard

QPIDJMS-552 JMS 2 Completion threads shouldn't scale with the number of sessions

Open franz1981 opened this issue 3 years ago • 28 comments

This implements an elastic per-connection thread pool that allows session completions to be consumed in order (as now) while saving create a dedicated thread per session.

In a shared connection use case, multiple sessions can benefit from re-using the same thread to handle their completions, reducing dramatically both wake-up cost and (native) memory usage.

In the case where each session is using its own connection, it would behave as the previous implementation.

franz1981 avatar Oct 26 '21 08:10 franz1981

@tabish121 @gemmellr I'm going to share 2 flamegraph to show how this PR change the way completions are processed with some shared connections and many sessions per-connection.

franz1981 avatar Oct 26 '21 09:10 franz1981

This PR is still in draft because of:

  • no JIRA yet: still waiting maintainers feedbacks
  • add a configuration option to cap the max numbers of threads used in the elastic pool (maybe turning it to be just one too, in case users knows that the completion handler isn't going to perform any heavy work and can be shared among sessions on the same connection)
  • add a configuration option to configure keep alive time of the elastic pool

franz1981 avatar Oct 26 '21 12:10 franz1981

These are the results of this change. Running a benchmark with:

  • 10 non-durable producer sessions
  • 5 sessions per connection
  • 2 connections handled by 2 separate threads (ie each thread is handling exclusively 5 producer sessions using the same connection)
  • 100 bytes messages
  • max 10 in-flight completions per producer

With main:

 -> TEST        12,040  msg/sec
 -> TEST        12,340  msg/sec
 -> TEST        12,081  msg/sec
 -> TEST        12,367  msg/sec
 -> TEST        11,971  msg/sec
 -> TEST        12,323  msg/sec
 -> TEST        12,026  msg/sec
 -> TEST        12,267  msg/sec
 -> TEST        12,120  msg/sec
 -> TEST        12,223  msg/sec
 -> *   121,764 msg/sec

with this PR:

 -> TEST        15,695  msg/sec
 -> TEST        15,441  msg/sec
 -> TEST        15,681  msg/sec
 -> TEST        15,455  msg/sec
 -> TEST        15,704  msg/sec
 -> TEST        15,451  msg/sec
 -> TEST        15,683  msg/sec
 -> TEST        15,485  msg/sec
 -> TEST        15,676  msg/sec
 -> TEST        15,436  msg/sec
 -> *   155,713 msg/sec

TLDR

121,764 msg/sec vs 155,713 msg/sec

The behaviour is, for main: image in violet, the 10 completion threads (1 per session), consuming 40 * 10 samples = 400 samples -> 40% of a single core

this PR: image in violet, the 2 completion threads, (1 per connection), consuming 71 * 2 samples = 142 samples -> ~14% of a single core

The other threads perform nearly the same amount of work, meaning that sharing connections now save a considerable amount of CPU time and memory too

franz1981 avatar Oct 26 '21 12:10 franz1981

There're still something to be considered while using this processing model in case JMS Sessions are used on different threads AND share the same connection:

  • on main the Netty thread handling the shared connection issue completion events vs exclusive single threaded thread pools
  • on this PR the Netty thread handling the shared connection issue completion events vs a shared thread pool

If the completions event processing is heavy weight, issuing completion events vs a shared thread pool shouldn't be the bottleneck, but each completion thread processing, while if completion event processing is light-weight, reusing the same completion thread(s), kept busy as much as possible, reduce the amount of context switches and native resources used.

franz1981 avatar Oct 26 '21 15:10 franz1981

So, one 'slight' problem with this hehe. It doesnt do at all what is described or I think both of us initially thought it would be doing. Digging into the executor behaviour, rather than it potentially being elastic up to n threads its instead binary where it has either 0 or 1 thread.

It only looks to create new threads if there are none, or if it is under the max number when it cant insert into the task queue, which it always can and so it only creates a first thread and then always uses that after (ignoring idle timeout). Essentially the current impl serializes all completions for all sessions on the connection onto 1 thread, and would halt completions across all sessions if the thread were ever used to do any kind of longer lasting / blocking work during any given completion callback, which is entirely allowed and so that isnt acceptable.

A search confirms this as expected executor behaviour. To have it behave otherwise you have to do stuff like play around with things like pretending the task queue queue is full, extending the rejected execution handler, or having it create core threads but allowing core threads to also timeout as well.

gemmellr avatar Oct 27 '21 14:10 gemmellr

Oops my bad, I should have used a synchronous queue instead of a linked blocking queue and it should behave as expected

I would run the bench again to be sure it still reuse the same thread

franz1981 avatar Oct 27 '21 15:10 franz1981

Oops my bad, I should have used a synchronous queue instead of a linked blocking queue and it should behave as expected

And I was wrong, despite the 0-capacity synchronous q should behave as expected, it seems to create more thread then expected, I'm going to change a bit the logic to see if it get better

franz1981 avatar Oct 28 '21 08:10 franz1981

I dont believe it will ever work the way you want with a simple queue of any size. You have to add hoops to manipulate it into behaving that way. E.g by pretending the queue is full when you want it to create threads, and inserting into the queue (directly or via the rejected execution handler) when you dont want more threads.

The docs for ThreadPoolExecutor covers the built in behaviour: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ThreadPoolExecutor.html

gemmellr avatar Oct 28 '21 08:10 gemmellr

I dont believe it will ever work the way you want with a simple queue of any size. You have to add hoops to manipulate it into behaving that way

A synchrnous queue (that's not a real queue, actually, but more a randevouz point for threads handoff) works exactly like that, but I've made a mistake about how completions are submitted...doc says

Direct handoffs. A good default choice for a work queue is a SynchronousQueue 
that hands off tasks to threads without otherwise holding them. Here, an attempt 
to queue a task will fail if no threads are immediately available to run it, 
so a new thread will be constructed. 

Let me think about it a bit more and I see if this PR can be closed or just need to be refined to achieve the expected behaviour

but I've made a mistake about how completions are submitted

It means that if the Netty event loop the connection belong is going to submit completions for 2 different sessions (eg let's say 1 and 2) and session 1 is already processing completions:

  • incoming completions vs session 1 should add them to the session completion queue without bothering the thread pool, because there's already a running completion thread handling them
  • new session 2 completions won't find any completion thread awake and would submit the request to the thread pool
  • the synchronous queue is built to reject any offer if there is no thread idle/awaiting on take, that means that the thread pool create a new one to handle session 2 completions or reuse any existing one (without rejecting the task offer)

It means that under heavy load completions can land so fast that we still have 2 separate threads handling the 2 sessions completion, that's not exactly what I want

franz1981 avatar Oct 28 '21 08:10 franz1981

Sure. However there is a fairly decent window that will happen even with 1 session with how it is done just now. Consider where it will often execute a second task for the same session, unless a previous already began, got back into the session, set the thread ref, and was still inside the inner poll loop before nulling the ref, while a next message drops. Especially since the worker nulls the thread ref before checking if the tasks are not-empty again and going back around the outer loop, where one of those other executions on another threads may have already got in, I think it likely even a single session will bounce between threads with all that happening. I absolutely expect 2 sessions would often use different threads at times for similar reasons since there will be multiple threads primed and waiting.

gemmellr avatar Oct 28 '21 09:10 gemmellr

Yep, probably what I really wanted is to save these threads to be created as a whole and let the completions be handled directly on the event loop threads (or on some externally provided thread pool from user): the JMS 2 API seems to miss it, while it's a key point to save resources if the user is aware that processing completions is a lightweight op...any idea how it would be possible to achieve something like that?

franz1981 avatar Oct 28 '21 09:10 franz1981

Using the event loop is out of the question, it runs the inner core and the callback is allowed to do most stuff on the outer connection that it services. Disaster waiting to happen.

I think optimising it by default may be more trouble than it is worth the more I think about it. Adding the option to e.g use a connection-wide pool of a given fixed size (and where the user is aware that blocking the callbacks in any significant way will cause cross-session completion starvation if they have more sessions) might be an...option. Or the factory has an extension mechanism that could be used to supply an actual pool.

gemmellr avatar Oct 28 '21 10:10 gemmellr

Adding the option to e.g use a connection-wide pool of a given fixed size (and where the user is aware that blocking the callbacks in any significant way will cause cross-session completion starvation if they have more sessions) might be an...option. Or the factory has an extension mechanism that could be used to supply an actual pool.

+100

I like this: it can still be pretty valuable in constrained environment...let's speak/let me find an appropriate way to expose it and I'm opened to any suggestion here

franz1981 avatar Oct 28 '21 10:10 franz1981

Alternatively, the connection already has an executor of its own (used for the exception listener etc to avoid blocking the event loop), an option could just be to use that for completions. For the factory extension bits follow JmsConnectionFactory.setExtension(String, BiFunction<Connection, URI, Object>).

gemmellr avatar Oct 28 '21 11:10 gemmellr

@gemmellr

I've pushed a change that allow using a single connection executor to handle all the sessions completions, although the processing algorithm has been changed to prevent starvation of completion tasks belonging to different sessions (assuming no blocking user code ;) )

I need to add tests if the idea seems good, let me know

For the factory extension bits follow JmsConnectionFactory.setExtension(String, BiFunction<Connection, URI, Object>).

Let me take a look, I'm still unhappy that the single session per connection use case doesn't have yet a proper solution here unless I'll use the JmsConnectionFactory to inject a thread pool that allow handling completions belonging to different session/connection...

franz1981 avatar Oct 28 '21 14:10 franz1981

@gemmellr I'm going to send a separate commit that handle this same issue in a different way, similar to what https://github.com/apache/qpid-jms/pull/45 has done ie completions are handled by a shared fork join pool (capable of work stealing) that will be shutdown deterministically when no JmsSession(s) are opened anymore (and reopened on demand).

Using a ForkJoin executor:

  • enable work-stealing (and better load balancing, better explained below)
  • better handle contention while submitting from multiple threads

A different way to handle this should be to create a shared (reference counted) event loop group and assign a different event loop executor in round-robin to each session (that need completion processing), with some good points:

  • each session would have a single threaded executor (no need to have a separate concurrent queue to handle completions)
  • completion thread won't change during the session lifecycle

And an important drawback:

  • no runtime load balancing: if an executor is shared by many busy sessions (processing many completions), an unlucky session won't have any other Thread helping to process its completions, despite the other event loops are idle

franz1981 avatar Oct 28 '21 14:10 franz1981

I'm not sure I like the idea of taking similar approach as 45, which I hadnt seen to look at yet (can only keep up with so many different large and invasive changes while trying to do other stuff). I havent even looked at this version yet.

I do specifically want to avoid a default case where users need to end up tweaking thread counts because an entirely legitimate blocking workload is drowning out other stuff happening though. Seems like there are 2 different routes for that to happen there (+based on seeing that happen elsewhere this is done).

gemmellr avatar Oct 28 '21 15:10 gemmellr

I do specifically want to avoid a default case where users need to end up tweaking thread counts because an entirely legitimate blocking workload is drowning out other stuff happening though. Seems like there are 2 different routes for that to happen there (+based on seeing that happen elsewhere this is done).

The default use case will remain the same as now ie one thread w its executor service per session, but we can still give the option for users that know that completions processing won't be blocking (or accept the trade-offs of slow completion processing) and they would be handled by a shared thread pool (configurable) that would auto-dispose when no JMS sessions requires it anymore. This would just keep the number of completion threads under control while fairly load balancing completion processing (FJ pool is pretty good at work-stealing) ie a blocking processing won't stop other completions processing until all completions threads are blocked.

franz1981 avatar Oct 28 '21 15:10 franz1981

Ok. It seemed like you wanted to make it the default.

gemmellr avatar Oct 28 '21 16:10 gemmellr

Ok. It seemed like you wanted to make it the default.

I didn't said it clearly on the PR, but I don't want to break existing users code :) https://github.com/apache/qpid-jms/pull/45 is using a ref counted approach that would GC the shared "resource" after all JMSSession that need it have been closed; I'm opened to other approaches, but it's not clear to me (yet) which class should decide the lifecycle of the shared completion thread pool, any idea is welcome eg connection factory finalization looks appealing (on Artemis JMS impl we do that), but I'm not a big fan of finalizers :P

franz1981 avatar Oct 28 '21 16:10 franz1981

There will definitely be no finalizers :) I think the connection is the most likely choice for a shared thing enabled by config.

gemmellr avatar Oct 28 '21 16:10 gemmellr

Yep, but if people decide to use 100 connections, why they should have 100 separate completion thread pools? I was thinking about a proper shared approach really that span across connections/sessions - sadly (JMS) connection factory doesn't have any close method :(

franz1981 avatar Oct 28 '21 16:10 franz1981

I didnt say it would be per connection. You asked about lifecycle. Factories dont have any 'close'. Connections do. Hence..

gemmellr avatar Oct 28 '21 16:10 gemmellr

The current/second version isnt quite what I had in mind when it occurred to me the connection executor was already there. You made it a sort of hybrid still mostly like your first version, calling back into the session to process a task list there. I was essentially thinking of more what it already did but just playing with the executor used when firing things off. I expect you wanted the task list for 'aggregating efficiency' though, albeit that does then introduce fairness issues you had to account for a little, though they are still there. Since you dont like this approach anyway, lets see what your alternative is like.

gemmellr avatar Oct 29 '21 12:10 gemmellr

I've sent a change that it's introducing HolderSuppliers (bad name, so happy to receive feedbacks/suggestions) and SharedDisposable as a util that could be used on https://github.com/apache/qpid-jms/pull/45 to handle shared disposable/pre-configured reference counted resources.

Some points re this change:

  1. thanks to FJ work stealing having a thread busy/blocked processing completions isn't a big deal as long as other FJ threads can handle other sessions completions
  2. fairness isn't still "right": a perfectly fair system should just process a single completion and continue on the FJ pool, to guarantee other completions to get their chance to get executed; I've chosen to execute burst of completions to amortize the "virtual context switch" cost, improving locality, but in theory would be better to cap (time based) for how much time a burst of completions should keep a single thread busy (a good number could be the N * default timeslack_ns of linux ie N* 50 us)

It can now save creating a number of completion threads that is bounded to the number of sessions, just by adding completionThreads=N to the qpid url

This is one run of the previous benchmarks with completionThreads=2, in violet, the FJ pool threads in action, with 89 + 88 = 177 samples: image

 -> TEST        16,959  msg/sec
 -> TEST        17,391  msg/sec
 -> TEST        16,986  msg/sec
 -> TEST        17,377  msg/sec
 -> TEST        16,997  msg/sec
 -> TEST        17,415  msg/sec
 -> TEST        16,978  msg/sec
 -> TEST        17,379  msg/sec
 -> TEST        16,987  msg/sec
 -> TEST        17,385  msg/sec
 -> *   171,857 msg/sec

franz1981 avatar Nov 02 '21 09:11 franz1981

I see that a version that just handle one completion task at time to guarantee FJ to handle fairness of completions isn't working that bad, really, but I need to perform some more tests on it, code-wise it means:

    private void processCompletions() {
        assert processCompletion.get();
        completionThread = Thread.currentThread();
        try {
            final Runnable completionTask = completionTasks.poll();
            if (completionTask != null) {
                try {
                    completionTask.run();
                } catch (Throwable t) {
                    LOG.debug("errored on processCompletions duty cycle", t);
                }
            }
        } finally {
            completionThread = null;
            processCompletion.set(false);
        }
        if (completionTasks.isEmpty()) {
            return;
        }
        // a racing asyncProcessCompletion has won: no need to fire a continuation
        if (!processCompletion.compareAndSet(false, true)) {
            return;
        }
        getCompletionExecutor().execute(this::processCompletions);
    }

franz1981 avatar Nov 02 '21 09:11 franz1981

I think single completion at a time is the way to go, at least by default, it just shouldn't really be a concern when using it whether the callback execution is fair or not.

gemmellr avatar Nov 02 '21 10:11 gemmellr

Having just read them all, the existing feedback and questions (e.g impact on default case) on the code and in the discussion essentially all still seems to apply now (even some of the now-'outdated' ones), given the changes remain essentially the same as back when they were made, just minus one holder and with some class/method renames.

gemmellr avatar Mar 01 '22 13:03 gemmellr