couchdb
couchdb copied to clipboard
feat(`mango`): rolling execution statistics
In case of map-reduce views, the arrival of the complete
message is not guaranteed for the view callback (at the shard) when a stop
is issued during the aggregation (at the coordinator). Due to that, internally collected shard-level statistics may not be fed back to the coordinator which can cause data loss hence inaccuracy in the overall execution statistics.
Address this issue by switching to a "rolling" model where row-level statistics are immediately streamed back to the coordinator. Support mixed-version cluster upgrades by activating this model only if requested through the map-reduce arguments and the given shard supports that.
Note that this change shares some similarities with @chewbranca's work on Couch Stats Resource Tracker though it stems from a different origin. In the long run, the goal is to either have it merged or even replaced with that approach to facilitate convergence. It has a smaller scope hence it is less complicated and it is ready for inclusion.
Testing recommendations
Running the respective Mango unit and integration test suites shall suffice (which is done by the CI):
make eunit apps=mango
make mango-test MANGO_TEST_OPTS="15-execution-stats-test"
But there is a detailed description in related the ticket (see below) on how to trigger the problem, which might also be used to exercise the code change.
Related Issues or Pull Requests
Originates from #4735, forerunner of #4812 Fixes #4560
Checklist
- [x] Code is written and works correctly
- [x] Changes are covered by tests
Sorry, I did mean to send feedback when I saw this ages ago.
I'm a little concerned at the increase in messages here (every n*q
shard for every _find
request will send a message for each row it processes?) to workaround a problem that, perhaps, we should instead fix directly?
Specifically, would it not be possible to alter how we implement the end of the workers, so they do get an opportunity to send their final stats while exiting (as their exit reason, say, which I believe we can capture).
@rnewson that is a fair concern. Even though we have started to explore ways to make sure that the complete
message is delivered and could be processed timely, note that @chewbranca's recent work on Couch Stats Resource Tracker (in #4812) promotes this streaming approach. If he did not start working on this and if he did not do some preliminary performance benchmarks where it was supported, I would not have thought about retrying with #4735 again.
Based on the discussion with @chewbranca and others, the main idea is that these rolling execution statistics would be anyway superseded by the CSRT framework at some later point. But it is not clear when it will be finished and merged, and until that happens I still would like to have a simplified version of that added to main
so that we could improve on the reliability of these statistics and everything that may build upon that (through the extensibility of couch_stats
) now.
Would some batching work to accumulate stats for some time (100 msec?), and only send stats intermittently? It might miss some stats at the end but for a long running request it should capture the stats as it goes along without emitting too many messages?
@nickva currently, the M/R complete
message is used as a cue to submit the statistics. If this is not handled properly, it does not matter if there is batching or not, we will miss some data at the end. For the details, please see #4735.
@pgj the idea is that it would be the difference between not sending any data back if complete
doesn't run, so for an hour long running request we'd have no stats for the whole hour, or, send most of the data back and only miss the last few msec or so. That is if finalization would be tricky to do like @rnewson suggested. and sending messages for every row is expensive.
I believe this does not work for short-lived requests. Actually, the real cause of this problem is a race condition: workers often die earlier they could touch the complete
message due to their untimely cancellation. In such cases, zero counts are observed, therefore the _find
results are invisible.
But why is the increase in the message count problematic? @chewbranca's upcoming is much more about the same approach. Or did I miss something, perhaps I misunderstood that? Is not there an optimization somewhere down the stack that enables coalesencing of messages? The messages would not increase the required bandwith much due to their small size...?
I was mainly suggesting a way to have a bound on the uncertainty in the stats, if it the increase in messages is a problem. It could be a time-bound (milliseconds) or doc examined count (if N > 100 emit ...) bound. It's a half-way hack, it's better than getting no stats at all, but maybe less expensive than emitting stats for every row. But, I don't know if it's a performance issue, I think you and @chewbranca did some measurements?
In such cases, zero counts are observed, therefore the _find results are invisible.
From what I remember in mango the issue was happening when we have a limit
specified? With a limit
we spawn all the workers with the same limit value but we will stop sooner based on when we hit the limit on the coordinator side. So then, some workers won't get a chance to emit a completed
message and will keep emitting rows, others will emit a completed
message but the coordinator by that time is already stopping and won't care. When the coordinator stops it will immediately issue a kill_all
rexi call to terminate all the workers. Is that roughly what was happing?
Is not there an optimization somewhere down the stack that enables coalesencing of messages?
I don't know if rexi has message batching. There is buffering if the send channel is blocked, and there is an un-acked message limit. But you're also right that the lower level dist and tcp channel will probably do the batching depending on how the environment is set up.
If @chewbranca's work would also significantly increase message count in a way that adversely affected performance (or latency of other requests trying to make progress inbetween such messages) I would also want to see that improved before it was merged (and, if it were unavoidably severe, not merged). I'll focus on reviewing the work in hand here and not speculate further about other changes until they are ready for review.
I like nick's middle ground of having the workers send info periodically, we would benefit from seeing those stats rising during the processing of a long request (though I think _find
unlike _view
has a short-ish finite timeout by default)
@pgj's point is also relevant, very short requests won't appear at all, and to my mind that lines up with my original point that we should improve how the end of the request is handled. Ideally we would wait for all workers to return their final message (perhaps as their exit reason as already noted), but without forcing the user to wait for more responses than are needed to form a complete response (i.e, the first response from each shard range).
Put simply, it was not anticipated that we would care about the other workers when this was built, the code fires off the jobs and wants to return as soon as it can, killing the remaining processes. A good fix here should tackle that directly.
I was mainly suggesting a way to have a bound on the uncertainty in the stats, if it the increase in messages is a problem.
@nickva I am afraid we cannot allow any uncertainty in the statistics if downstream consumers wanted to build uses cases on top of this mechanism that involves billing.
I don't know if it's a performance issue, I think you and @chewbranca did some measurements?
@chewbranca might have some performance results on his side, but I have not collected any myself. Probably I should do some just for curiosity and for including here.
From what I remember in mango the issue was happening when we have a
limit
specified?
Yes, but if I recall correctly there is always a limit assigned for the _find
requests. I believe the emphasis is on the relative distance of the limit and the number of results returned. It is technically a result of a race condition: if the stop
signal arrives at the wrong time, either the completed
message is not emitted or it is not handled in the time.
Ideally we would wait for all workers to return their final message (perhaps as their exit reason as already noted), but without forcing the user to wait for more responses than are needed to form a complete response (i.e, the first response from each shard range).
This can be done easily: replace the stop
for an ok
in mango_cursor_view:handle_doc/2
. Actually, this is also case that could be tested for performance. Maybe it is not that expensive and sending the stats for every row.
Put simply, it was not anticipated that we would care about the other workers when this was built, the code fires off the jobs and wants to return as soon as it can, killing the remaining processes. A good fix here should tackle that directly.
@rnewson do you have any pointers or ideas where to start with this? In the previous discussions, you had some suggestions, but I could not make them work.
@nickva I am afraid we cannot allow any uncertainty in the statistics if downstream consumers wanted to build uses cases on top of this mechanism that involves billing.
Hmm, if billing is involved then even the current implementation of the PR, will not produce repeatable results unless we're talking about Q=1 databases only. I was under the impression that this is mostly about providing slightly more accurate results (less 0 ones). Maybe this uncertainty is ok, depending on the downstream consumers' requirements, but if billing is involved, it's good to make that clear.
To convince ourselves I quickly patched an old script I had to run a few _find results https://gist.github.com/nickva/31e0f1f1c2a5a651259dc897a1bb5cfa
With 5000 docs we get a wild variety of results:
% make && ./dev/run --admin=adm:pass -n 1
Q=8
(venv3) % ./findlimit.py
* deleting http://127.0.0.1:15984/db
* creating http://127.0.0.1:15984/db {'q': '8'}
* creating 5000 docs with val range 200
* created docs in 3.5 seconds
* calling _find with selector: {'x': 50}
* _find in 0.1 seconds 10 docs {'total_keys_examined': 4253, 'total_docs_examined': 4253, 'total_quorum_docs_examined': 0, 'results_returned': 10, 'execution_time_ms': 66.808}
(venv3) % ./findlimit.py
* deleting http://127.0.0.1:15984/db
* creating http://127.0.0.1:15984/db {'q': '8'}
* creating 5000 docs with val range 200
* created docs in 3.5 seconds
* calling _find with selector: {'x': 50}
* _find in 0.1 seconds 10 docs {'total_keys_examined': 4222, 'total_docs_examined': 4222, 'total_quorum_docs_examined': 0, 'results_returned': 10, 'execution_time_ms': 67.306}
(venv3) % ./findlimit.py
* deleting http://127.0.0.1:15984/db
* creating http://127.0.0.1:15984/db {'q': '8'}
* creating 5000 docs with val range 200
* created docs in 3.6 seconds
* calling _find with selector: {'x': 50}
* _find in 0.1 seconds 10 docs {'total_keys_examined': 4218, 'total_docs_examined': 4218, 'total_quorum_docs_examined': 0, 'results_returned': 10, 'execution_time_ms': 68.228}
Q=16
(venv3) % ./findlimit.py
* deleting http://127.0.0.1:15984/db
* creating http://127.0.0.1:15984/db {'q': '16'}
* creating 5000 docs with val range 200
* created docs in 5.1 seconds
* calling _find with selector: {'x': 50}
* _find in 0.1 seconds 10 docs {'total_keys_examined': 4975, 'total_docs_examined': 4975, 'total_quorum_docs_examined': 0, 'results_returned': 10, 'execution_time_ms': 61.336}
(venv3) % ./findlimit.py
* deleting http://127.0.0.1:15984/db
* creating http://127.0.0.1:15984/db {'q': '16'}
* creating 5000 docs with val range 200
* created docs in 4.8 seconds
* calling _find with selector: {'x': 50}
* _find in 0.1 seconds 10 docs {'total_keys_examined': 4889, 'total_docs_examined': 4889, 'total_quorum_docs_examined': 0, 'results_returned': 10, 'execution_time_ms': 65.843}
(venv3) % ./findlimit.py
* deleting http://127.0.0.1:15984/db
* creating http://127.0.0.1:15984/db {'q': '16'}
* creating 5000 docs with val range 200
* created docs in 5.5 seconds
* calling _find with selector: {'x': 50}
* _find in 0.1 seconds 10 docs {'total_keys_examined': 4931, 'total_docs_examined': 4931, 'total_quorum_docs_examined': 0, 'results_returned': 10, 'execution_time_ms': 68.646}
To be clear, I don't know if it is a performance impact to send all the stats rows, I was merely mentioning a possible strategy to apply if it is a concern. All this given the assumption there is already some variability in results anyway.
I am slightly concerned with the synchronous stream finalization strategy: we'd dealing with a streaming protocol during the active phase, only to switch back to a synchronous termination, which involves extra waiting applied to every request (finalize all Q workers, wait for all Q workers to respond, possibly also handle broken, timed out cases, etc). That would add some fixed delay to all the requests (extra 10-20 msec?). Maybe we could do the finalization after we respond the user, but we'd have to see how that interacts with request resource cleanup part.
I have done some preliminary benchmarking with k6.io
on my Apple M1 Max, with 3 nodes (r=2, w=2, q=2), 100K documents, /{db}/_find
queries of various complexity, 2 minutes warmup and 5 minutes run (where count
tells how many queries were completed, the higher is the better).
Here are the results:
-
couchdb/main
(baseline, as of1f77e2e3
):- 1 thread:
min=0.60ms max=8676.29ms p(95)=6259.13ms p(98)=6631.37ms p(99)=7139.79ms, count=240
- 2 threads:
min=0.82ms max=17990.32ms p(95)=13364.63ms p(98)=13960.05ms p(99)=14177.66ms, count=206
- 1 thread:
-
pgj/feat/mango/rolling-execution-stats
(rebased tocouchdb/main
):- 1 thread:
min=0.82ms max=11053.97ms p(95)=6880.65ms p(98)=7219.82ms p(99)=7493.13ms, count=196
- 2 threads:
min=0.73ms max=20467.99ms p(95)=15500.12ms p(98)=17401.71ms p(99)=19277.02ms, count=189
- 1 thread:
-
couchdb/main
wherestop
is replaced forok
inmango_cursor_view:handle_doc/2
:- 1 thread:
min=1.67ms max=15335.05ms p(95)=7943.36ms p(98)=9496.31ms p(99)=12347.95ms, count=88
- 2 threads:
min=0.82ms max=30654.26ms p(95)=16628.83ms p(98)=17278.53ms p(99)=20883.10ms, count=75
- 1 thread:
There seem to be a 10-20% performance penalty for the streaming statistics, while the synchronous termination causes 66%.
Hmm, if billing is involved then even the current implementation of the PR, will not produce repeatable results unless we're talking about Q=1 databases only.
I do not think it should produce repeatable results. If a query happened to cause the scanning of 2,000 documents once and then 1,500 for another instance is acceptable — that is how the system works. But returning zero in either case is definitely a miss on the side of accounting.
To convince ourselves I quickly patched an old script I had to run a few _find results
Great, thanks @nickva!
To be clear, I don't know if it is a performance impact to send all the stats rows
Unfortunately there is (see my comment above) :-(
I am slightly concerned with the synchronous stream finalization strategy: we'd dealing with a streaming protocol during the active phase, only to switch back to a synchronous termination, which involves extra waiting applied to every request (finalize all Q workers, wait for all Q workers to respond, possibly also handle broken, timed out cases, etc).
If that is what replacing stop
for ok
in mango_cursor_view:handle_doc/2
means, then the performance is much worse than that of streaming statistics.
I wonder whether we need to decouple the implementation of statistics collection for user-facing results when execution_stats=true
is specified vs the chttpd_stats_reporter
use case.
For the user-facing case, performance of a few extra 10s of ms seems unlikely to matter so much as it's typically a one-time analysis (e.g. via Fauxton) - a correct answer via a synchronous solution seems appropriate.
For the chttpd_stats_reporter
case, we could stream stats directly to chttpd_stats_reporter
instead of going through the same framework that execution stats uses, and would be free to complete sending statistics after the response has been returned to the user.
noting that "billing" of requests is not a couchdb concern.
noting that "billing" of requests is not a couchdb concern.
Yes, I agree. But it is CouchDB that could ensure that the data that is going to chttpd_stats_reporter
could be trusted and provide support for scenarios like billing.
A higher level question occurs. It seems to me you want to account for the cost of the workers that did not contribute to the result (they were killed by the coordinator because a response from another copy was received first), whereas couchdb has an interest in not doing the unnecessary work. Did I misunderstand what the "missing" data is here?
actually, I'm going to assume none of us mean the execution stats of non-contributing workers.
What I think is being proposed here sounds like an attempt to fix the current, and by implication quite weird, execution stats code.
Only a _find
request would read a view row or fetch a document and not pass this to the client, so the execution stats only apply here, and only because it is useful to compare the number of rows retrieved versus the number of rows (or docs) retrieved to find those rows (which could be a much larger number).
by inference the execution stats have been calculated and transmitted out-of-band relative to the returned rows, and this, if I'm right, is a mistake we should fix.
i.e, I suggest that the workers include the number of rows/docs they examined and discarded with each matching row they return to the coordinator. this is a small amount of extra data to an existing message. For very efficient query executions, it's the number 0 on every row. The coordinator can obviously count how many rows it passed to the client and can sum these new numbers to calculate how many rows/docs were examined. As its the same message, we can't get the wrong answer.
Now I'll wait until the morning to learn how completely wrong I am about the problem.
Thanks for sharing your thoughts @rnewson.
Yes, the execution stats are currently passed around in a dedicated execution_stats
message but only at the end of the transmission. The other, separate generic complete
message is the cue for submitting execution_stats
which may go missing if complete
is dropped on early termination.
The problem is that this can happen even on a single-node cluster. Although I can imagine the scenario you described, I am still inclined to believe that the use of stop
is such a hard stop that it even shuts down the actual worker who actively contributed to the response. Using stop
in the view callback is on the coordinator — that is, it already has all the data it needs in order to complete serving the request. But the worker does not have the chance to say "okay, please find the execution stats in the next, final message" because it is immediately stopped right after stop
is received. A remedy to this could be to let the worker hang around for a while to finalize the transmission that way, but it would also sacrifice some of the performance gain we are currently having with stop
. This technically leaves us with streaming.
Your suggestion looks similar to the one which is contained in this PR. However, I notice that you recommended to extend the row
message (for mango_cursor_view:handle_message/2
) directly and not to emit fine-grained, per-row execution_stats
messages. I can see a case that it works better performance-wise, and I am happy to do a benchmark with that approach as well.
Essentially that,yes. But a bit further. The only execution stat we need at the row level is how many rows were examined but discarded prior to the row we are sending.
Thanks for running the performance check, @pgj!
I do not think it should produce repeatable results. If a query happened to cause the scanning of 2,000 documents once and then 1,500 for another instance is acceptable — that is how the system works. But returning zero in either case is definitely a miss on the side of accounting.
It will depend not just on how many were scanned but also in what order they arrived and how they were merged at the coordinator. The idea is there is some fudge factor there as is. I updated the query script to query the API endpoint multiple times in a row for the same docs:
% ./findlimit.py
* deleting http://127.0.0.1:15984/db
* creating http://127.0.0.1:15984/db {'q': '16'}
* creating 10000 docs with val range 100
* created docs in 6.0 seconds
* _find in 0.1 seconds 5 docs {'total_keys_examined': 3382, 'total_docs_examined': 3382, 'total_quorum_docs_examined': 0, 'results_returned': 5, 'execution_time_ms': 50.381}
* _find in 0.0 seconds 5 docs {'total_keys_examined': 3323, 'total_docs_examined': 3323, 'total_quorum_docs_examined': 0, 'results_returned': 5, 'execution_time_ms': 37.183}
* _find in 0.0 seconds 5 docs {'total_keys_examined': 3359, 'total_docs_examined': 3359, 'total_quorum_docs_examined': 0, 'results_returned': 5, 'execution_time_ms': 32.422}
* _find in 0.0 seconds 5 docs {'total_keys_examined': 3347, 'total_docs_examined': 3347, 'total_quorum_docs_examined': 0, 'results_returned': 5, 'execution_time_ms': 31.574}
* _find in 0.0 seconds 5 docs {'total_keys_examined': 3282, 'total_docs_examined': 3282, 'total_quorum_docs_examined': 0, 'results_returned': 5, 'execution_time_ms': 31.719}
* _find in 0.0 seconds 5 docs {'total_keys_examined': 3325, 'total_docs_examined': 3325, 'total_quorum_docs_examined': 0, 'results_returned': 5, 'execution_time_ms': 30.074}
* _find in 0.0 seconds 5 docs {'total_keys_examined': 3276, 'total_docs_examined': 3276, 'total_quorum_docs_examined': 0, 'results_returned': 5, 'execution_time_ms': 29.015}
* _find in 0.0 seconds 5 docs {'total_keys_examined': 3274, 'total_docs_examined': 3274, 'total_quorum_docs_examined': 0, 'results_returned': 5, 'execution_time_ms': 30.524}
* _find in 0.0 seconds 5 docs {'total_keys_examined': 3120, 'total_docs_examined': 3120, 'total_quorum_docs_examined': 0, 'results_returned': 5, 'execution_time_ms': 29.053}
* _find in 0.0 seconds 5 docs {'total_keys_examined': 3368, 'total_docs_examined': 3368, 'total_quorum_docs_examined': 0, 'results_returned': 5, 'execution_time_ms': 31.881}
* docs: 3382 - 3120 = 262
I like @rnewson's idea to add stats to the view row. The view_row
is a record, so we'd have to do a bit of tuple wrangling across multiple commits (PRs) to avoid breaking online cluster upgrades, but that's the cleanest solution overall.
As a bonus, we'd also then avoid odd out-of-order processing of stats like we have here: https://github.com/apache/couchdb/blob/2501fe69b06dd88611b4a3e290f080823476af70/src/fabric/src/fabric_view_all_docs.erl#L260-L274 Where we may accumulate worker stats even though we might not actually have processed the row emitted before.
With per-row stats we may miss some stats emitted by workers at the end from workers which sent the rows already, but they just didn't go towards producing the response. So there is some discrepancy between total work induced in the cluster as a result of the API request vs work which took place on the workers before the emitted rows, if those rows were included in the response.
For the chttpd_stats_reporter case, we could stream stats directly to chttpd_stats_reporter instead of going through the same framework that execution stats uses, and would be free to complete sending statistics after the response has been returned to the user.
Ah, that's interesting to think about chttpd_stats, @willholley. That's in chttpd and it takes in request/response objects. But if we wanted to emit online stats for each request as it's being processed (a long running request), we could alter the API there such that each worker would get some set of request bits (path, principal, nonce, request id?...) passed to it (in #mrargs extras), and then the worker can report stats independently without having to shuffle them back to the coordinator. Kind of a larger change, but that way it could account for all the work generated as a side-effect of an API call, even if it didn't make as part of the response.
I have implemented the half of @rnewson's suggestions and the code is now sending execution stats along with the view rows only. That is still a separate message, but it already improved the performance (same configuration as above):
- 1 thread:
min=0.79ms max=7556.06ms p(95)=6461.86ms p(98)=6609.84ms p(99)=6741.12ms, count=209
- 2 threads:
min=0.78ms max=31902.38ms p(95)=13449.74ms p(98)=14130.32ms p(99)=14873.91ms, count=191
It seems to be only 5-10% off with this change only.
There is an updated version now where the stats are sent along with the row data. Instead of creating a new record type, I switched to using maps and added support for coordinators and workers of the old version. The benchmark shows good performance (same configuration as earlier):
- 1 thread:
min=0.77ms max=9711.29ms p(95)=5879.81ms p(98)=6864.91ms p(99)=7143.41ms, count=248
- 2 threads:
min=0.60ms max=14197.50ms p(95)=13028.72ms p(98)=13755.92ms p(99)=13950.17ms, count=238
I tested this manually with old worker—new coordinator, and new worker—old coordinator configurations with a random set of /{db}/_find
queries, and it was working.
Also, the combined message format revealed that in case of limits, there is always an extra key and sometimes and extra document examined. Previously this information was lost since the corresponding execution_stats
message could not make its way to the coordinator.
The only question that remains (for me) is whether in #mrargs.extra
, a separate key should be created for the map-based row messages. Currently, this is tied to execution_stats_rolling
. It obviously depends on maps, but it might make sense to introduce maps with its own dedicated view_row_map
key e.g., even in a separate PR. The execution_stats_rolling
could then depend on the presence of this feature.
@nickva thanks for the confirmation and the suggestions! I will address them in the other PR about introducing view_row_map
. I will then rebase this PR to that one and take care of the remainders, if there will be any.
Thank you all for your help too!