gocql icon indicating copy to clipboard operation
gocql copied to clipboard

Expose shard information at session API

Open dkropachev opened this issue 1 year ago • 2 comments

This is idea from PR.

Proposal

There are two levels on what we can do

  1. Expose shard info by implementing following API:
func (s *Session) GetShardAwareRoutingInfo(table string, colums []string, values ...interface{}) (ShardAwareRoutingInfo, error)

type ShardAwareRoutingInfo struct {
	// RoutingKey - is bytes of primary key
	RoutingKey []byte
	// Host - is node to connect (HostAware policy)
	Host *HostInfo
	// Shard - is shard ID of node to connect (ShardAware policy)
	Shard int
}
  1. Have some option to do this optimization automatically when it makes sense and/or possible.

Implementation details

To make it properly work for tablets we will need to pull tablets info from system.tablet beforehand.

Pseudo code example

Borrowed from the same PR:

func routeRangeSelectToProperShards() {
	const shardsAbout = 100 // node * (cpu-1)
	// Split []T by chunks
	var (
		queryBatches = make(map[string][]T, shardsAbout) // []T grouped by chunks
		routingKeys  = make(map[string][]byte, shardsAbout) // routing key for query
	)
	for _, pk := range pks {
		var (
			shardID string
			routingKey []byte
		)
		// We receive information about the routing of our keys.
		// In this example, PRIMARY KEY consists of one column pk_column_name.
		info, err := session.GetShardAwareRoutingInfo(keyspaceName, tableName, []string{"pk_column_name"}, pk)
		if err != nil || info.Host == nil {
			// We may not get routing information for various reasons (change shema topology, etc).
			// It is important to understand the reason when testing (for example, you are not using tokenAwarePolicy)
			log.Printf("can't get shard id of pk '%d': %v", pk, err)
		} else {
			// build key: host + "/" + vShard (127.0.0.1/1)
			shardID = info.Host.Hostname() + "/" + strconv.Itoa(info.Shard)
			routingKey = info.RoutingKey
		}
		// Put key to corresponding batch
		batch := queryBatches[shardID]
		if batch == nil {
			batch = make([]int64, 0, len(pks)/shardsAbout)
		}
		batch = append(batch, pk)
		queryBatches[shardID] = batch
		routingKeys[shardID] = rk
	}
	const query = "SELECT * FROM table_name WHERE pk IN (?)"
	var wg sync.WaitGroup
	// we go through all the batches to execute queries in parallel
	for shard, batch := range batches {
		// We divide large batches into smaller chunks, since large batches in SELECT queries have a bad effect on RT scylla
		for _, chunk := range slices.ChunkSlice(batch, 10) { // slices.ChunkSlice some function that splits slice by N slices of M or less lenght (in our example M=10)
			wg.Add(1)
			go func(shard string, chunk []int64) {
				defer wg.Done()
				rk := keys[shard] // get our routing key
				scanner := r.session.Query(query, chunk).RoutingKey(rk).Iter().Scanner() // use RoutingKey
				for scanner.Next() {
					// ...
				}
				if err := scanner.Err(); err != nil {
					// ...
				}
			}(shard, chunk)
		}
	}
	// wait for all answers
	wg.Wait()
	// NOTE: this is not the most optimal strategy 'cause we're waiting for all queries done.
	// If at least one query has long response time it will affects on the response time of our method. (RT our method = max RT of queries)
}

dkropachev avatar Jun 28 '24 11:06 dkropachev

As it turns out, this approach has a couple of problems: first, if it's impossible to unambiguously identify a replica for a partition key (for example, there's no local replica, but there are N remote ones), then the keys grouped this way may end up on a different replica (not the one used for grouping) according to RoundRobin and Shuffle replicas policies. Second, Query and Batch don't have an API for explicitly specifying the host and shard ID for a query. Third, the batch size threshold for switching from HostAware to ShardAware isn't entirely clear.

For unlogged batch, we're thinking of creating a separate API that would split keys across hosts/shards and execute requests in parallel.

moguchev avatar Oct 07 '25 19:10 moguchev

As it turns out, this approach has a couple of problems: first, if it's impossible to unambiguously identify a replica for a partition key (for example, there's no local replica, but there are N remote ones), then the keys grouped this way may end up on a different replica (not the one used for grouping) according to RoundRobin and Shuffle replicas policies. Second, Query and Batch don't have an API for explicitly specifying the host and shard ID for a query. Third, the batch size threshold for switching from HostAware to ShardAware isn't entirely clear.

You are 100% correct here, complete scenario is not ready, we can start from exposing API to get information, enforce replicas on query. Original idea was to use load balancing policy from Cluster for that purpose, in this case you are going to get replicas according to your lb, but now i am thinking that probably we should have an API to override the policy.

For unlogged batch, we're thinking of creating a separate API that would split keys across hosts/shards and execute requests in parallel.

I don't think it is a great idea to have this API, solving listed two problems will give enough tooling to solve the problem. Running many queries instead of one brings tons of complexity to handle, that we don't want to interain, unless there is a real need. For exaple query statistics, retries, what if one query failed, but rest are completed, i don't think it worth it.

dkropachev avatar Oct 07 '25 20:10 dkropachev