featurebase
featurebase copied to clipboard
Key Translation Version 2
Overview
Key translation currently supports the mapping of string keys to an autoincrementing integer value that allows us to map onto Roaring bitmaps. The basic functionality works well but there are several limitations in the current implementation:
- Monolithic design—all keys are stored in a single file and not broken up by index or field.
- Slow initialization—initialization of the key file is single threaded.
- Scaling—the key translation file is fully replicated to all nodes in the cluster.
Current Design
The interface to the store currently exists as:
type TranslateStore interface {
TranslateColumnsToUint64(index string, values []string) ([]uint64, error)
TranslateColumnToString(index string, values uint64) (string, error)
TranslateRowsToUint64(index, field string, values []string) ([]uint64, error)
TranslateRowToString(index, field string, values uint64) (string, error)
Reader(ctx context.Context, off int64) (io.ReadCloser, error)
}
This provides translation called for both the index (column) and field (row). Translation from keys to integers can be done in bulk while translation from integers to keys must be done one at a time. This design was because of the nature of the caller to the store and the aim to reduce allocations for slices to and from the calls.
The Reader()
provides a log-based stream of all changes so that replica stores can maintain the same data asynchronously.
Proposed Changes
Per-index, per-field stores
Instead of separate column & row translation calls, the new proposed store would simply have a single pair of translation calls. There would then be one separate store for each index & field.
type TranslateStore interface {
TranslateToID(key string) (uint64, error)
TranslateToKey(id uint64) (string, error)
}
We originally stored translation data in Badger so the duplicate index
and field
did not affect storage size because duplicate prefixes were trimmed internally. However, after moving to our current log-based RHH design, the index
and field
data does cause a lot of duplicate data. This new design should eliminate that issue.
By creating separate per-index & per-field stores, we can also drop data in bulk when an index or field is deleted. See #1976.
Change underlying storage
The current log-based RHH design has two advantages:
- Logs can easily be replicated to replica nodes as a stream.
- RHH is very fast and lookups are O(1).
However, as seen in #1807, there is an upper limit to the size we can store. Logs are slow to replay since they are single threaded and memory still grows with size with our current RHH approach.
I suggest we attempt to use Badger or BoltDB again. We will see a performance decrease compared to the current design but we'll be able to trade off for better scaling and startup time. We may be able to place an LRU cache in front of the store to help alleviate lookup speed issues.
Sharding
Lastly, our current approach replicates all translation data to all nodes. For obvious reasons, this doesn't scale. This was intentionally chosen initially to allow us get a first version of key translation working with the expectation that we would need to revisit this.
I believe the easiest approach is to use the same consistent hashing and replica count that is built into pilosa.Cluster
. That will reduce the amount of configuration required. We can generate a hash key using a fast hash function (such as xxhash
) on the string key. We can pass this hash key to Hasher.Hash()
to determine the primary and replicas.
By sharding the translation, we'll need to allow nodes to bulk translate keys & IDs. For that, we'll expand our TranslateStore
to include 2 additional methods:
TranslateToIDs(keys []string) ([]uint64, error)
TranslateToKeys(ids []uint64) ([]string, error)
This will allow nodes to send bulk translation requests and minimize RPC calls.
Sharding will only apply to index/column keys. Field/row keys will be fully replicated across the cluster.
Migration
These 3 issues have been combined into a single proposal so that we can minimize migration overhead. We will need to break apart translation files not only into per-index & per-field files but also spread out per-index files across the cluster.
The most practical way to migrate is to have the new implementation stream changes from the current translation file implementation on the primary node and rewrite to separate nodes in the background. When the rewrite has completed, we can lock the server and swap the implementation to the new sharded model. This should minimize downtime and be seamless to the user other than increased CPU usage.
/cc @jaffee
Thanks @benbjohnson this is awesome, I have a few notes:
Making this a seemless upgrade seems like it will be pretty involved. Based on some of the other work we have going on, I think we are on the brink of a 2.0 release. Let's consider the option that we can do this as a breaking change if it will reduce the development burden significantly.
Sharding the column translation feels like it should be pretty straightforward but I don't think we need to shard the field/row key translation (I think it makes sense to replicate it to every node). We may be on the same page here, but I don't think the proposal makes this clear.
Switching to Badger or Bolt seems reasonable, go-pilosa actually does some caching on the client side to speed up imports, so I think the mass translation speed is less of an issue. Are RHH lookups really O(n) or was that a typo? I would have thought O(1)...
Making this a seemless upgrade seems like it will be pretty involved. Based on some of the other work we have going on, I think we are on the brink of a 2.0 release. Let's consider the option that we can do this as a breaking change if it will reduce the development burden significantly.
👌
Sharding the column translation feels like it should be pretty straightforward but I don't think we need to shard the field/row key translation (I think it makes sense to replicate it to every node). We may be on the same page here, but I don't think the proposal makes this clear.
I was thinking that we'd shard field/row as well. It seems like a lot of people might have one large index and we'll eventually hit an upper limit with that. /cc @travisturner
Are RHH lookups really O(n) or was that a typo? I would have thought O(1)...
Oops. Yes, that was a typo. I fixed it above.
My thinking was that there is already an inherent upper limit on the number of rows you can have in Pilosa since all the rows of all the fields are stored on every node. Sharding out the rows would add a lot of latency to things like Group By results that might need to look up large numbers of keys across the cluster. Maybe caching solves this though 🤔
If we're moving to an on-disk K/V store, the key translation tables don't seem like they'll be the upper bound for scaling in most use cases.
Another related thing to think about in this proposal is having the ability to support SQL LIKE
queries (or something like them). Being able to quickly get the set of row IDs for strings matching some basic pattern would be very useful in some instances.
I don't think I have a good enough feel for the tradeoffs between sharding/not sharding rows. Sharding+caching definitely gives us the most flexibility going forward though... just at the cost of complexity. I'll marinate for a while... hopefully others have input.
I understand the argument to replicate field/row translation data to every node, but it might not make sense to do that for the case where the column space for a particular index does not span every node in the cluster. For example, one might have a 10-node cluster containing multiple indexes, one of which is only made up of 3 shards. Even with replication=2, the data for that index would be placed on at most 6 of the 10 nodes. So replicating the translation data for the field/rows of that index to every node would be overkill. At the very least you would want to limit that field/row replication to only those nodes containing shards for that index.
Additionally, @jaffee's argument that we're already bound by an upper limit on rows may not always be the case going forward. If, for example, we are able to implement some kind of cold-storage of fragment data, then it may eventually be possible to have more rows associated to an index than actually fit in memory.
With that said, I don't fully understand the performance hit we take by decoupling the field/row translation data from the row data (like the "group by" example).
@jaffee @travisturner I marinated on it as well and I think that there's not enough understanding of the tradeoffs for field/row sharding. We already have a lot of changes going in as-is so I think doing column-only sharding makes the most sense right now. I'll update the issue.
However, I think it still makes sense to split out translation stores per-index & per-field (even if we aren't sharding the field level).
seems like a good place to start, thanks Ben!