cassandra icon indicating copy to clipboard operation
cassandra copied to clipboard

Add support for ordering on non-clustering columns using SAI

Open michaeljmarshall opened this issue 10 months ago • 2 comments

High Level Design

Add support for ORDER BY via SAI. The PR essentially makes the ANN vector ordering logic more generic by adding a few abstractions. This PR adds support for ordering all existing BA and CA indexes without writing any new index files. AA indexes cannot be sorted because the primary key map is based on partitions, not primary keys.

New Abstractions

  • PrimaryKeyWithSortKey - this is a primary key that has some metadata that can be used for sorting. The two implementations at the moment are a float used for vector similarity score and a ByteComparable object used from numeric and text indexes.
  • RowWithSourceTable and CellWithSourceTable - this abstraction gives us a way to validate the source of a given cell. That source is then used to confirm whether an index's ranking of the data is valid.

Notable design details

  • ORDER BY on SAI requires CL=1.
  • Updates on same memtable: unlike the VectorMemtableIndex, the TrieMemoryIndex does not override MemtableIndex#update. Instead, an update is handled as an insertion. That presents two problems. First, the memtable has both values, and second, the resulting sstable index has both values. This PR addresses that edge case by comparing the index's view of the value to memtable/sstable's live view of the value for the row. If implement MemtableIndex#update, we can remove the PrimaryKeyWithSortKey#isIndexDataEqualToLiveData guard.
  • The RowWithSourceTable and CellWithSourceTable are used by vector ordering to prevent re-calculating the score in the TopKProcessor. Once we solve the update problem for numeric and text indexes, we will do the same for those indexes.
  • In order for the "source table" logic to work, we must acquire a single view of the sstables and memtables and their associated indexes. Otherwise, we risk classifying a valid row as "updated". In the StorageAttachedIndexSearcher, we get this view and hold it for the duration of the query. Because hyrbid queries do not need to have the source table validated, they are not included in the view logic, which simplifies determining which indexes need to be locked.
  • Hybrid queries are only weakly index based. Because we do not have an easy way to map from PrimaryKey to value in our numeric and text indexes, the quickest solution to implement was to read the value from the sstable and then sort in the index.

Unsupported data types

Currently, there are 3 types that are not sortable via SAI:

  • InetAddressType because we need to add decoding logic based on SAI's TypeUtil.encode method
  • DecimalType because SAI truncates the value to 24 bytes
  • IntegerType because SAI truncates to 20 bytes

The first one could be fixed based in the current paradigm, and the last two will likely require a change to the index format. Technically, we could optionally pull out all of the values that have more than 20 or 24 bytes, but I think that isn't worth the effort.

Dependencies

The following PRs originally had the following dependencies merged to vsearch and then into this branch:

  • #1090
  • #1079 (aka #1100)

michaeljmarshall avatar Apr 03 '24 16:04 michaeljmarshall

Update: order by on date types does work. I missed the nuance that the byte comparable representation sorts on unsigned ints, so centering on 0 is only for signed integers. By updating the DateDataSet's comparator, the order by assertions pass tests now.

michaeljmarshall avatar May 09 '24 17:05 michaeljmarshall

cqlsh:test> select * from test order by a;
InvalidRequest: Error from server: code=2200 [Invalid query] message="Use of ANN OF in an ORDER BY clause requires a LIMIT that is not greater than 1000. LIMIT was NO LIMIT"

The error message needs to be updated, because there is no ANN OF now.

pkolaczk avatar May 10 '24 12:05 pkolaczk

I squashed it and rebased: #1150

pkolaczk avatar Jun 12 '24 12:06 pkolaczk

@eolivelli @pkolaczk - I think this PR is very close (I know we want it merged as soon as possible), but I reviewed some of the TODOs left in the code, and I think it might be worth either another pass before merging or shortly after merging. All of the tests are passing, which is a great sign, but the TODOs are some open questions I had when writing the PR and I still don't seem to have answers yet.

michaeljmarshall avatar Jul 26 '24 04:07 michaeljmarshall

I found a bug:

ERROR [ReadStage-1] 2024-07-26 16:37:15,253 AbstractLocalAwareExecutorService.java:169 - Uncaught exception on thread Thread[ReadStage-1,5,main]
java.lang.NullPointerException: null
	at org.apache.cassandra.utils.bytecomparable.ByteComparable.compare(ByteComparable.java:138)
	at org.apache.cassandra.index.sai.utils.PrimaryKeyWithByteComparable.compareTo(PrimaryKeyWithByteComparable.java:66)
	at org.apache.cassandra.index.sai.utils.PrimaryKeyWithByteComparable.compareTo(PrimaryKeyWithByteComparable.java:33)
	at java.base/java.util.Comparator.lambda$comparing$77a9974f$1(Comparator.java:469)
	at java.base/java.util.PriorityQueue.siftUpUsingComparator(PriorityQueue.java:675)
	at java.base/java.util.PriorityQueue.siftUp(PriorityQueue.java:652)
	at java.base/java.util.PriorityQueue.offer(PriorityQueue.java:345)
	at java.base/java.util.PriorityQueue.add(PriorityQueue.java:326)
	at org.apache.cassandra.index.sai.utils.MergePrimaryWithSortKeyIterator.computeNext(MergePrimaryWithSortKeyIterator.java:76)
	at org.apache.cassandra.index.sai.utils.MergePrimaryWithSortKeyIterator.computeNext(MergePrimaryWithSortKeyIterator.java:38)
	at org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:47)
	at com.google.common.collect.Iterators$PeekingImpl.hasNext(Iterators.java:1126)
	at org.apache.cassandra.index.sai.utils.MergePrimaryWithSortKeyIterator.computeNext(MergePrimaryWithSortKeyIterator.java:75)
	at org.apache.cassandra.index.sai.utils.MergePrimaryWithSortKeyIterator.computeNext(MergePrimaryWithSortKeyIterator.java:38)
	at org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:47)
	at org.apache.cassandra.index.sai.plan.StorageAttachedIndexSearcher$ScoreOrderedResultRetriever.nextSelectedKeyInRange(StorageAttachedIndexSearcher.java:536)
	at org.apache.cassandra.index.sai.plan.StorageAttachedIndexSearcher$ScoreOrderedResultRetriever.nextRowIterator(StorageAttachedIndexSearcher.java:504)
	at org.apache.cassandra.index.sai.plan.StorageAttachedIndexSearcher$ScoreOrderedResultRetriever.computeNext(StorageAttachedIndexSearcher.java:486)
	at org.apache.cassandra.index.sai.plan.StorageAttachedIndexSearcher$ScoreOrderedResultRetriever.computeNext(StorageAttachedIndexSearcher.java:447)
	at org.apache.cassandra.utils.AbstractIterator.hasNext(AbstractIterator.java:47)
	at org.apache.cassandra.index.sai.plan.TopKProcessor.filterInternal(TopKProcessor.java:222)
	at org.apache.cassandra.index.sai.plan.TopKProcessor.filter(TopKProcessor.java:157)
	at org.apache.cassandra.index.sai.plan.StorageAttachedIndexSearcher.search(StorageAttachedIndexSearcher.java:139)
	at org.apache.cassandra.db.ReadCommand.searchStorage(ReadCommand.java:512)
	at org.apache.cassandra.db.ReadCommand.executeLocally(ReadCommand.java:410)
	at org.apache.cassandra.db.ReadCommandVerbHandler.doVerb(ReadCommandVerbHandler.java:58)
	at org.apache.cassandra.net.InboundSink.lambda$new$0(InboundSink.java:79)
	at org.apache.cassandra.net.InboundSink.accept(InboundSink.java:98)
	at org.apache.cassandra.net.InboundSink.accept(InboundSink.java:46)
	at org.apache.cassandra.net.InboundMessageHandler$ProcessMessage.run(InboundMessageHandler.java:436)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$FutureTask.run(AbstractLocalAwareExecutorService.java:165)
	at org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$LocalSessionFutureTask.run(AbstractLocalAwareExecutorService.java:137)
	at org.apache.cassandra.concurrent.SEPWorker.run(SEPWorker.java:119)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:829)

Reproduction

Load data with latte order.rn workload mentioned here (https://docs.google.com/document/d/11qqu9D1DM8kYTFP3aFhaxFkd4zRnBI6Yh-QPrlnBzfw/edit?usp=sharing)

Do not flush after loading.

Issue the query:

SELECT * FROM orderby WHERE a >= 0.1 AND a < 0.2 ORDER BY a LIMIT 10

It works fine after flushing. I mean, almost - because it is still slow (takes 3 seconds!)

pkolaczk avatar Jul 26 '24 14:07 pkolaczk

@pkolaczk - nice catch. I introduced that bug while resolving merge conflicts.

michaeljmarshall avatar Jul 26 '24 15:07 michaeljmarshall

Retested it and now it works fine.

pkolaczk avatar Jul 26 '24 16:07 pkolaczk

Reviewed the most recent five commits, particularly the CANONICAL filter change. The CANONICAL change resolves all my local reproducers for issues with source Object mismatch when using LIVE sstables containing preemptively opened readers. LGTM!

jkni avatar Jul 30 '24 15:07 jkni

This is overall great new feature!

I'm conditionally approving this, provided the following issues are fixed:

* `SELECT * FROM tab WHERE a > const ORDER BY a` queries currently don't work because of huge performance cost; neither for on-disk or in-memory indexes. Either let's fix it or at least disable it?

* Don't forget to merge the PR for lazy memory index range scans as that PR solves the first performance issues I mentioned in the doc.

Merging this PR with the understanding that we will follow up with individual PRs to address these issues.

michaeljmarshall avatar Jul 31 '24 02:07 michaeljmarshall