scylla-rust-driver icon indicating copy to clipboard operation
scylla-rust-driver copied to clipboard

Shard aware batching - add Session::shard_for_statement & Batch::enforce_target_node

Open Ten0 opened this issue 2 years ago • 37 comments
trafficstars

Resolves #468 Resolves #974 Closes #975 (since that is just an alternate way to fix #974 and this implementation is probably more direct)

This is a follow-up on #508 and #658:

  • To minimize CPU usage related to network operations when inserting a very large number of lines, it is relevant to batch.
  • To batch in the most efficient manner, these batches have to be shard-aware. Since #508, batch will pick the shard of the first statement to send the query to. However it is left to the user to constitute the batches in such a way that the target shard is the same for all the elements of the batch.
  • This was made possible by #658, but it was still very boilerplate-ish. I was waiting for #612 to be merged (amazing work btw! 😃) to implement a more direct and factored API (as that would use it).
  • This new ~Session::first_shard_for_statement(self, &PreparedStatement, &SerializedValues) -> Option<(Node, Shard)> makes shard-aware batching easy on the users, by providing access to the first node and shard of the query plan.
  • As pointed out in the first review, this was not enough considering the load balancer (now?) picks a random replica at each call. To make that deterministic when constituting batches "by hand", a new enforce_target_node method is added on Batch to be able to choose the target node (that makes use of the load balancing API via an EnforceTargetNodePolicy struct, so no significant change).

Pre-review checklist

  • [ ] I have split my patch into logically separate commits. (I propose we squash-and-merge)
  • [x] All commit messages clearly explain what they change and why.
  • [x] I added relevant tests for new features and bug fixes.
  • [x] All commits compile, pass static checks and pass test.
  • [x] PR description sums up the changes and reasons why they should be introduced.
  • [x] I have provided docstrings for the public items that I want to introduce.
  • [ ] I have adjusted the documentation in ./docs/source/.
  • [x] I added appropriate Fixes: annotations to PR description.

Ten0 avatar Jun 03 '23 18:06 Ten0

One more clippy lint I don't agree with -.- Type seems clear enough and I feel like adding a new one would instead increase overall complexity. Fine if I silence that one?

Ten0 avatar Jun 03 '23 22:06 Ten0

Have you tested your solution? How? Could you write some unit tests?

wprzytula avatar Jul 03 '23 07:07 wprzytula

I can see that you merged main into your branch. As our policy is to rebase, could you instead rebase your branch on main, please?

wprzytula avatar Jul 03 '23 08:07 wprzytula

The approach with enforce_target_node() is very interesting and it seems to be working, at least conceptually. I'm a bit worried, however, that it may turn out to be somewhat fragile. For instance, consider a situation when an enforced node is removed from the cluster. Not only will the correct shard not be targeted, but also each query will involve an expensive call to fallback(). Even worse, in order to repair this, the application will have to be restarted.

wprzytula avatar Jul 03 '23 08:07 wprzytula

For instance, consider a situation when an enforced node is removed from the cluster. Not only will the correct shard not be targeted

If the node is removed from the cluster, new lines on which one would call shard_for_cluster wouldn't associate with this shard anymore, so the task dedicated to sending batches to this shard wouldn't receive any more messages to send, which would just be a dangling sleeping Tokio task until app restart, which is reasonably inexpensive (one could even clean that up but it seems overkill).

each query will involve an expensive call to fallback()

I'm making the call to the non-fallback function of self.fallback inside the pick function, so that should be pretty inexpensive for each message whose shard/batching task has already been picked before the node was removed.

Ten0 avatar Jul 03 '23 08:07 Ten0

I'm making the call to the non-fallback function of self.fallback inside the pick function, so that should be pretty inexpensive for each message whose shard/batching task has already been picked before the node was removed.

Correct, I missed it.

If the node is removed from the cluster, new lines on which one would call shard_for_cluster wouldn't associate with this shard anymore, so the task dedicated to sending batches to this shard wouldn't receive any more messages to send [...]

It seems that I don't quite understand your use case. As I imagine it, one would once (upon application initialisation) consistute/associate a batch with a specific shard using shard_for_node() and enforce_target_node(), and then keep calling Session::batch() with that batch for the whole lifetime of the application. That would mean incorrect node being enforced forever if the target node is removed. How does your use case prevent that?

wprzytula avatar Jul 03 '23 08:07 wprzytula

As I imagine it, one would once (upon application initialisation) consistute/associate a batch with a specific shard

No, rather I have an app that receives millions of lines per second to insert. These lines are processed, spread over multiple threads, in a manner that involves just identifying the shard, and then looking up in a HashMap<(NodeUuid, ShardIdxOnNode), Sender> for the lazily instantiated channel that sends to a task that constitutes and sends the batches for a given target shard. This allows getting rid of ~90% of the scylla driver CPU consumption (according to perf).

Now if a node is removed, the task will be left dangling, and a few messages will be sent to a default-policy-chosen shard by the time its channel is drained, but new messages will be assigned to an alive shard (and its corresponding channel).

Ten0 avatar Jul 03 '23 08:07 Ten0

No, rather I have an app that receives millions of lines per second to insert. These lines are processed, spread over multiple threads, in a manner that involves just identifying the shard, and then looking up in a HashMap<(NodeUuid, ShardIdxOnNode), Sender> for the lazily instantiated channel that sends to a task that constitutes and sends the batches for a given target shard.

Let me dig deeper. What is the lifetime of your Batch structs? Do they contain PreparedStatements? What is their lifetime? Are the lines that you get every second merely bind values to be sent with prepared statements with fixed query string?

wprzytula avatar Jul 03 '23 09:07 wprzytula

Are the lines that you get every second merely bind values to be sent with prepared statements with fixed query string?

Basically yes. There's a bunch of different prepared statements but those are prepared at application startup. A Batch object is initialized once in each task dedicated to a shard (with its overridden execution policy from this PR). When it has constituted a set of lines to insert, it sets the corresponding prepared statements in the Batch, then sends it.

Ten0 avatar Jul 03 '23 09:07 Ten0

Are the lines that you get every second merely bind values to be sent with prepared statements with fixed query string?

Basically yes. There's a bunch of different prepared statements but those are prepared at application startup. A Batch object is initialized once in each task dedicated to a shard (with its overridden execution policy from this PR). When it has constituted a set of lines to insert, it sets the corresponding prepared statements in the Batch, then sends it.

I think I've got it, many thanks. I'm convinced now that what you propose in this PR does work in your case. I still wonder, however, if we can make it more universal. So far no good ideas how to do that, though.

wprzytula avatar Jul 03 '23 09:07 wprzytula

Maybe you could write an example called, say, shard_aware_batching.rs, and present a complete minimal way to utilise shard_for_statement() and enforce_target_node() - similar to what you put in the docstring, but complete? This would let others follow your design more easily and adapt it to their workloads.

wprzytula avatar Jul 03 '23 09:07 wprzytula

Have you tested your solution? How? Could you write some unit tests?

Maybe you could write an example

I have added an test that may also serve as example in 59ced8f72420c0b2dae61f18ff83b969f7cdf2d7. However I'm not sure exactly how to control that the queries are indeed directed to the proper shard.

I have also tested the solution in a multi node cluster at 1M lines/s and could see a much reduced metric of non-shard-aware queries in the monitoring (that metric is documented in Grafana as being wrong if there are batches but the fact it went from 60% to 10% probably still says that it was more targeted, so properly targeted).

As our policy is to rebase, could you instead rebase your branch on main, please?

I like to be able to keep track of the previous commits and not lose history until this is fully closed. I propose we squash-and-merge at the end :)

Ten0 avatar Aug 06 '23 12:08 Ten0

@Ten0 We are currently working over a load balancing refactor such that instead returning an iterator of nodes, an LB policy would return pairs (node, shard number). How would this affect your idea of shard-aware batching? Does that help you, or rather obstruct?

wprzytula avatar Aug 09 '23 08:08 wprzytula

I guess that would simplify the code. 🙂👍

Ten0 avatar Aug 09 '23 09:08 Ten0

I guess that would simplify the code. slightly_smiling_face+1

Do you mind waiting with this PR until we merge the mentioned refactor? This way we would avoid merging this and then throwing away with the refactor introduced.

wprzytula avatar Aug 09 '23 10:08 wprzytula

This way we would avoid merging this and then throwing away with the refactor introduced.

Hmm AFAICT this is the only line that would change: https://github.com/Ten0/scylla-rust-driver/blob/59ced8f72420c0b2dae61f18ff83b969f7cdf2d7/scylla/src/transport/session.rs#L1934-L1936 (besides the signatures in https://github.com/scylladb/scylla-rust-driver/pull/738/files#diff-f6639af23649651f9041afec7f5914e45c0821ab7950c9fc429515b983470a15R24) so it doesn't seem that there's anything significant to gain by doing that. :/

Ten0 avatar Aug 09 '23 12:08 Ten0

@Ten0 https://github.com/scylladb/scylla-rust-driver/pull/944 has been merged. If you are still interested in bringing shard-aware batching to the driver, now it should be much easier.

wprzytula avatar Mar 19 '24 08:03 wprzytula

Is this PR still relevant? This is also related to #974 where a batch that is token-aware would be much more efficient.

lvboudre avatar Apr 03 '24 17:04 lvboudre

I don't see why it wouldn't be relevant. I may work on this still if there's a change at getting it merged.

Ten0 avatar Apr 03 '24 20:04 Ten0

I don't see why it wouldn't be relevant. I may work on this still if there's a change at getting it merged.

Sorry, "relevant" was not the good word .

I meant if the PR is still "active" since they have not been any commit since a long time. I do think the issue is still relevant.

lvboudre avatar Apr 04 '24 21:04 lvboudre

cargo semver-checks found no API-breaking changes in this PR! 🎉🥳 Checked commit: d5f71a93ef73cac47c4d513ca4f2d231c168f7e5

github-actions[bot] avatar May 25 '24 17:05 github-actions[bot]

@wprzytula

This is also related to #974 where a batch that is token-aware would be much more efficient.

Updated the PR. It looks like it indeed does solve #974. 😊 Also #788 did indeed solve this issue, thanks! 😊

(NB: I know this will need squash because oldest commits are very old and we don't want spaghetti on main - can we merge this by using Github's squash-and-merge feature rather than force-pushing here and losing history?)

Ten0 avatar May 25 '24 20:05 Ten0

After discussion with @Lorak-mmk, our impression is that we still don't understand the approach taken by this PR. Why would we want to hardwire the destination node and shard manually, instead of relying on the default load balancer? There is a heuristics that chooses the target node and shard based on token of the first statement in a batch. Why is it not enough? Once other statements in a batch are destined to the same node and shard, then performance will be optimal.

Hardwiring the target node and shard has significant drawbacks. RetryPolicy and SpeculativeExecutionPolicy are going to malfunction on single-target query plans. It's the responsibility of the user to select the correct node and shard.

Clearly, it's unclear to us why the problem of shard-aware batches is approached this way.

wprzytula avatar Jun 20 '24 11:06 wprzytula

Why is it not enough?

Thanks for the review.

I initially thought it was enough as well, however

As pointed out in the first review, this was not enough considering the load balancer (now?) picks a random replica at each call.

This means that if we check which node a statement would pick to put it in a batch, then send the batch, the batch may randomly be sent to a different shard that is appropriate for that statement, but not necessarily for all other statements of the batch, resulting in them having one chance out of nb_replicas to actually hit in shard-aware.

Following this change:

I have also tested the solution in a multi node cluster at 1M lines/s and could see a much reduced metric of non-shard-aware queries in the monitoring (that metric is documented in Grafana as being wrong if there are batches but the fact it went from 60% to 10% probably still says that it was more targeted, so properly targeted).

Ten0 avatar Jun 20 '24 12:06 Ten0

Why is it not enough?

Thanks for the review.

I initially thought it was enough as well, however

As pointed out in the first review, this was not enough considering the load balancer (now?) picks a random replica at each call.

This means that if we check which node a statement would pick to put it in a batch, then send the batch, the batch may randomly be sent to a different shard that is appropriate for that statement, but not necessarily for all other statements of the batch, resulting in them having one chance out of nb_replicas to actually hit in shard-aware.

The solution is to batch by token, not by "first" replica.

Following this change:

I have also tested the solution in a multi node cluster at 1M lines/s and could see a much reduced metric of non-shard-aware queries in the monitoring (that metric is documented in Grafana as being wrong if there are batches but the fact it went from 60% to 10% probably still says that it was more targeted, so properly targeted).

Lorak-mmk avatar Jun 20 '24 12:06 Lorak-mmk

The solution is to batch by token, not by "first" replica.

Unless I'm mistaken because the space of tokens is large this would imply that we very rarely are able to batch, whereas batching by replica allows to group by "one random acceptable replica" for each statement.

Ten0 avatar Jun 20 '24 12:06 Ten0

The solution is to batch by token, not by "first" replica.

Unless I'm mistaken because the space of tokens is large this would imply that we very rarely are able to batch, whereas batching by replica allows to group by "one random acceptable replica" for each statement.

Afaik it doesn't really make sense to perform multi partition batches. This incurs additional work for the coordinator node and you would be better off just using single, non-batched writes. So if your data to insert is spread across partitions so that there is no significant amount of data to update in any single partition - just use single non-batched writes. If you have data that is more grouped by partitions (meaning you have many updates to a partition) then it may make sense to use batches (but you should benchmark if it gives you performance advantage over single writes), and you can group them by Token.

Lorak-mmk avatar Jun 20 '24 13:06 Lorak-mmk

Afaik it doesn't really make sense to perform multi partition batches. This incurs additional work for the coordinator node and you would be better off just using single, non-batched writes.

The main issue with not batching is that going through the heavy Tokio & networking machinery for every statement puts x5 CPU load on the server that makes the scylla calls using the Rust driver, whereas with batching that load is shared across coordinators (and probably less significant than managing Tokio futures is even if ignoring the fact that work is shared, because Tokio turned out to be heavy in my benchmarks). I'm aware that the driver currently attempts to batch networking at low level to some extent (and have read the relevant blog post), but that 1. doesn't cut off Tokio heaviness and 2. Only waits for a single Tokio yield to try to append more, whereas I significantly benefit from waiting more to build larger network batches.

Using batches enabled me to x5 my insertion speed due to this.

Ten0 avatar Jun 20 '24 14:06 Ten0

Afaik it doesn't really make sense to perform multi partition batches. This incurs additional work for the coordinator node and you would be better off just using single, non-batched writes.

I forge my own batches and I do see increase in performance and lower latency. I did this couple months ago, I don't have the number with me, but it was a no brainer to code more in order to do batching locally before sending off to the scylla driver.

lvboudre avatar Jun 20 '24 18:06 lvboudre

Afaik it doesn't really make sense to perform multi partition batches. This incurs additional work for the coordinator node and you would be better off just using single, non-batched writes.

The main issue with not batching is that going through the heavy Tokio & networking machinery for every statement puts x5 CPU load on the server that makes the scylla calls using the Rust driver, whereas with batching that load is shared across coordinators (and probably less significant than managing Tokio futures is even if ignoring the fact that work is shared, because Tokio turned out to be heavy in my benchmarks). I'm aware that the driver currently attempts to batch networking at low level to some extent (and have read the relevant blog post), but that 1. doesn't cut off Tokio heaviness and 2. Only waits for a single Tokio yield to try to append more, whereas I significantly benefit from waiting more to build larger network batches.

Would you be able to share some benchmark we could reproduce? If the problem is driver's performance then my preferred solution would be to improve driver's performance instead of doing hacky workarounds. As for the solution for you for now: shard_for_token makes no sense because of tablets and mixed-shards clusters. If I understand correctly you want to batch together statements that have 1 common replica, and send them to this replica. I think you can do this today. You can use ClusterData::replica_locator to get ReplicaLocator struct, which has replicas_for_token method you can call. This method returns ReplicaSet which implements IntoIterator of replicas for a given token. You can then use whatever heuristics you like to batch statements (IIUC you will want to use first element of this iterator as grouping key), and create your own LoadBalancingPolicy to use on such batch. This LBP would return plan consisting of 1 element. If you have trouble implementing this let me know and I'll try to prepare some PoC

Using batches enabled me to x5 my insertion speed due to this.

I'd really like to see and reproduce the benchmark to investigate if we can improve this.

Lorak-mmk avatar Jun 21 '24 10:06 Lorak-mmk