lance icon indicating copy to clipboard operation
lance copied to clipboard

Bitmap index should support spilling

Open westonpace opened this issue 5 months ago • 13 comments

Currently the bitmap index just accumulates everything into a hashmap during training. If there are many unique values this can easily lead to OOM. We should spill on occasion to prevent this.

Example RAM-hungry operation:

import pyarrow as pa
import lance
import shutil

shutil.rmtree("/tmp/bitmap.lance", ignore_errors=True)

print("Writing data")
ds = lance.write_dataset(pa.table({"x": range(1_000_000_000)}), "/tmp/bitmap.lance")
print("Creating index")
ds.create_scalar_index("x", "BITMAP")

westonpace avatar Oct 09 '25 16:10 westonpace

Can I take this up as well @westonpace? In my mind, portions of this likely could be generalized to kind of Spillable trait which different index types can implement and use ad hoc.

jtuglu1 avatar Oct 13 '25 04:10 jtuglu1

Sure. I agree there is a kind of general pattern of "spill and merge at end". I think it's also coupled with the idea of distributed indexing (spilling is just serialized distributed indexing?).

westonpace avatar Oct 14 '25 16:10 westonpace

spilling is just serialized distributed indexing?).

Yeah exactly. Something I've seen in other db is pretty generic spilling serialization logic implemented very specifically for some operations, which not only makes it hard to integrate with other operations that may need it, but also prone to bugs since everyone implements their own spill/merge logic. Ideally, we can make it abstract enough where there's sufficient overlap for it to be useful for future uses.

jtuglu1 avatar Oct 14 '25 16:10 jtuglu1

Hi @westonpace @jtuglu1 , is this issue still being worked on?

I’ve come up with an approach:

  1. We can request the the data be sorted by value using BitmapIndexPlugin::new_training_request before training BitmapIndex, which will reuse the scan's spilling policy.
  2. Then we can stream-process the sorted training data formated (value, rowid) into bitmap index files formated (value, bitmap).
  3. In updating bitmap index, the only different is that we need merge sorted old and new data into a single sorted data stream through streaming.

If this approach is acceptable, I’d be happy to submit a PR.

ddupg avatar Nov 06 '25 03:11 ddupg

Hmm, this issue is mainly a problem at large scales. However, at large scales, the out-of-core sort will be expensive.

I wonder if it would be better to implement distributed indexing instead?

Then this could solve both "training large data without too much RAM" and "training large data with multiple machines"?

westonpace avatar Nov 08 '25 12:11 westonpace

Hmm, this issue is mainly a problem at large scales. However, at large scales, the out-of-core sort will be expensive.

I wonder if it would be better to implement distributed indexing instead?

Then this could solve both "training large data without too much RAM" and "training large data with multiple machines"?

@westonpace I agree that we ultimately need to implement distributed training index for large-scale data. However, I still have two concerns:

  • I tested the sample code you provided in the issue description on my 64GB RAM machine and found that 50 million rows were sufficient to trigger OOM. Perhaps 50 million do not constitute a large enough to convince users that they must use a distributed computing engine for index training. Therefore, I believe we should provide a standalone indexing implementation that, while slower, but completes successfully without OOM. If faster performance is required, users can opt for a distributed indexing implementation.
  • In distributed index implementations, we still prevent OOM on the computing nodes of the distributed computing engine. Perhaps we should also consider spilling data to disk when training each shared to reduce memory pressure.

ddupg avatar Nov 10 '25 08:11 ddupg

Hmm, this issue is mainly a problem at large scales. However, at large scales, the out-of-core sort will be expensive.

I implemented spilling in #5201 and performed performance tests. Below are some of the test results, it can be evaluated whether the performance loss is acceptable.

data 0.39.0 bitmap spilling rate
range(1_000_000) 3.16341 2.13855 32%
range(10_000_000) 35.80929 21.42475 40%
range(50_000_000) 239.06249 107.34151 55%
range(100_000_000) OOM 223.20483 -
[num for _ in range(10_000) for num in range(1_000)] 1.77947 2.67116 -50%
[num for _ in range(100_000) for num in range(1_000)] 25.19043 29.01521 -15%
[random.randint(0, 1_000) for _ in range(100_000_000)] 24.82726 29.74331 -19%
[random.randint(0, 1_000) for _ in range(1_000_000_000)] 351.38744 419.41885 -19%

test code:

import pyarrow as pa
import lance
import shutil
import time
import random

shutil.rmtree("/tmp/bitmap.lance", ignore_errors=True)
# arr = [num for num in range(1_000) for _ in range(50_000)]
# arr = [num for _ in range(100_000) for num in range(1_000)]
# arr = [random.randint(0, 1_000) for _ in range(100_000_000)]
arr = range(50_000_000)
t0 = time.time()
print("Writing data")
ds = lance.write_dataset(pa.table({"x": arr}), "/tmp/bitmap.lance")
t1 = time.time()
print(f"Write data cost: {t1-t0:.5f}")

print("Creating index")
ds.create_scalar_index("x", "BITMAP")
t2 = time.time()
print(f"Create index cost: {t2-t1:.5f}")

ddupg avatar Nov 10 '25 09:11 ddupg

Instead of hand-writing the concurrency model for each index, maybe we should write them in terms of DataFusion physical plans? Then we could re-use the memory pool / spilling infrastructure, and we might also be able to use more parallelism in index training.

The nice thing about that code is it only spill if there isn't sufficient memory; as long as there is sufficient memory it will not spill to disk.

wjones127 avatar Dec 18 '25 17:12 wjones127

Instead of hand-writing the concurrency model for each index, maybe we should write them in terms of DataFusion physical plans? Then we could re-use the memory pool / spilling infrastructure, and we might also be able to use more parallelism in index training.

When I was thinking about concurrency I was thinking about multiple machines, not multiple threads.

I'm not entirely sure converting to Datafusion plans would actually save work at the end of the day. The reservation and spilling infrastructure in Datafusion is rather complicated and might be overkill here. Maybe there is some other advantage to using DF?

westonpace avatar Dec 18 '25 20:12 westonpace

The reservation and spilling infrastructure in DataFusion is rather complicated and might be overkill here. Maybe there is some other advantage to using DF?

Maybe DataFusion is overkill. I was thinking it would be preferable to have a general framework that all indices to use to handle concurrency and spilling. We could write our own if we think that's simpler than using DataFusion's.

wjones127 avatar Dec 18 '25 21:12 wjones127

Agreed.

I don't know about a general pattern for "all indices" but all inverted indexes (bitmap, ngram, and fts) generally have the same goal which is to build up a map from token to roaring bitmap. Both ngram and fts have already implemented spilling and the implementations today are very similar, but different.

I'm most familiar with the spilling in the ngram index. Once the map gets too large then we decide to spill. This means we serialize an array where one column is tokens and the other column is bitmaps. Before we write this spill file we sort it by token.

Then, at the end, there is a merge step. In the ngram index we read in 2 spill files at a time and merge them together and write another spill file. We repeat this process until there is only one file. That file is the index.

So yes, a generalized spilling mechanism could be invented...HOWEVER...I think it would be a waste of time.

If we zoom out then we can see that these steps are exactly the steps required to implement distributed indexes. Each worker grabs some chunk of the input. That worker then processes its input and creates a "partial index" (i.e. spill file). There is a final merge step where all the partial indexes are merged into a single index.

In other words, instead of implementing spilling, we should be creating an API for distributed indexing. Instead of "accumulate data until we are using X bytes of RAM and then spill" we should split one index job into many index jobs where each index job is only going to need X bytes of RAM. If the user has multiple servers they can spread the jobs out. If they have only one server then they just run the jobs one after the other. This way we bound the RAM usage AND give the user more power (ability to distribute).

westonpace avatar Dec 19 '25 14:12 westonpace

If the user has multiple servers they can spread the jobs out. If they have only one server then they just run the jobs one after the other. This way we bound the RAM usage AND give the user more power (ability to distribute).

In our (LanceDB's) case, we do vertical scaling instead of horizontal scaling for our indexing jobs. The main pain we've had is that it's difficult to tell what resources a given job will need. We make our best guess at allocating a node with enough memory, but if we guess wrong we often get OOM (or sometimes out-of-disk). It would be preferable for us if indexing jobs could scale gracefully vertically as well as horizontally. If we have a 2-core machine with 2GB of memory, I should be able to still index 10GB of data, just slowly. If I have a 64-core machine with 128GB of memory, I should be able to index that same 10GB of memory much faster, using all available resources. That's the ideal, IMO, for vertical scaling. I agree what you propose sounds nice work horizontal scaling.

I think this is still accomplishable with the strategy of splitting into N small batch jobs. Though I do have some worries how good we will be at guessing memory requirements. (The nice thing about the streaming / memory pool / spilling operator approach is that you enforce the memory limit at runtime rather than having to guess up front how much memory is needed.) Do you think that part will be easy? Likely easier for indexing jobs than it is for general queries, but I imagine it could be hard in some cases. For example, if the data to be indexed is string data, you will need some good statistics on the size of those strings.

wjones127 avatar Dec 19 '25 16:12 wjones127

I suppose, if it's the same work either way, then the majority of the code can be shared (serialize the map into a spill file, merge spill files) so we can support both. Still, I think some simple heuristics will work for the job approach in the majority of scenarios.

In the cases where it doesn't (extremely long keys) then we might also want to consider an approximate bitmap index anyways (where we just store the hash and not the key itself) to keep the size of the overall index down.

westonpace avatar Dec 19 '25 17:12 westonpace