Expose shard information at session API
This is idea from PR.
Proposal
There are two levels on what we can do
- Expose shard info by implementing following API:
func (s *Session) GetShardAwareRoutingInfo(table string, colums []string, values ...interface{}) (ShardAwareRoutingInfo, error)
type ShardAwareRoutingInfo struct {
// RoutingKey - is bytes of primary key
RoutingKey []byte
// Host - is node to connect (HostAware policy)
Host *HostInfo
// Shard - is shard ID of node to connect (ShardAware policy)
Shard int
}
- Have some option to do this optimization automatically when it makes sense and/or possible.
Implementation details
To make it properly work for tablets we will need to pull tablets info from system.tablet beforehand.
Pseudo code example
Borrowed from the same PR:
func routeRangeSelectToProperShards() {
const shardsAbout = 100 // node * (cpu-1)
// Split []T by chunks
var (
queryBatches = make(map[string][]T, shardsAbout) // []T grouped by chunks
routingKeys = make(map[string][]byte, shardsAbout) // routing key for query
)
for _, pk := range pks {
var (
shardID string
routingKey []byte
)
// We receive information about the routing of our keys.
// In this example, PRIMARY KEY consists of one column pk_column_name.
info, err := session.GetShardAwareRoutingInfo(keyspaceName, tableName, []string{"pk_column_name"}, pk)
if err != nil || info.Host == nil {
// We may not get routing information for various reasons (change shema topology, etc).
// It is important to understand the reason when testing (for example, you are not using tokenAwarePolicy)
log.Printf("can't get shard id of pk '%d': %v", pk, err)
} else {
// build key: host + "/" + vShard (127.0.0.1/1)
shardID = info.Host.Hostname() + "/" + strconv.Itoa(info.Shard)
routingKey = info.RoutingKey
}
// Put key to corresponding batch
batch := queryBatches[shardID]
if batch == nil {
batch = make([]int64, 0, len(pks)/shardsAbout)
}
batch = append(batch, pk)
queryBatches[shardID] = batch
routingKeys[shardID] = rk
}
const query = "SELECT * FROM table_name WHERE pk IN (?)"
var wg sync.WaitGroup
// we go through all the batches to execute queries in parallel
for shard, batch := range batches {
// We divide large batches into smaller chunks, since large batches in SELECT queries have a bad effect on RT scylla
for _, chunk := range slices.ChunkSlice(batch, 10) { // slices.ChunkSlice some function that splits slice by N slices of M or less lenght (in our example M=10)
wg.Add(1)
go func(shard string, chunk []int64) {
defer wg.Done()
rk := keys[shard] // get our routing key
scanner := r.session.Query(query, chunk).RoutingKey(rk).Iter().Scanner() // use RoutingKey
for scanner.Next() {
// ...
}
if err := scanner.Err(); err != nil {
// ...
}
}(shard, chunk)
}
}
// wait for all answers
wg.Wait()
// NOTE: this is not the most optimal strategy 'cause we're waiting for all queries done.
// If at least one query has long response time it will affects on the response time of our method. (RT our method = max RT of queries)
}
As it turns out, this approach has a couple of problems: first, if it's impossible to unambiguously identify a replica for a partition key (for example, there's no local replica, but there are N remote ones), then the keys grouped this way may end up on a different replica (not the one used for grouping) according to RoundRobin and Shuffle replicas policies. Second, Query and Batch don't have an API for explicitly specifying the host and shard ID for a query. Third, the batch size threshold for switching from HostAware to ShardAware isn't entirely clear.
For unlogged batch, we're thinking of creating a separate API that would split keys across hosts/shards and execute requests in parallel.
As it turns out, this approach has a couple of problems: first, if it's impossible to unambiguously identify a replica for a partition key (for example, there's no local replica, but there are N remote ones), then the keys grouped this way may end up on a different replica (not the one used for grouping) according to RoundRobin and Shuffle replicas policies. Second, Query and Batch don't have an API for explicitly specifying the host and shard ID for a query. Third, the batch size threshold for switching from HostAware to ShardAware isn't entirely clear.
You are 100% correct here, complete scenario is not ready, we can start from exposing API to get information, enforce replicas on query.
Original idea was to use load balancing policy from Cluster for that purpose, in this case you are going to get replicas according to your lb, but now i am thinking that probably we should have an API to override the policy.
For unlogged batch, we're thinking of creating a separate API that would split keys across hosts/shards and execute requests in parallel.
I don't think it is a great idea to have this API, solving listed two problems will give enough tooling to solve the problem. Running many queries instead of one brings tons of complexity to handle, that we don't want to interain, unless there is a real need. For exaple query statistics, retries, what if one query failed, but rest are completed, i don't think it worth it.