Question: How to cap all memory usage across all RocksDB instances?
Asking here, as I was not able to get an answer that solved the issues I've been struggling with for quite some time. I'm running a Kafka streams application (which uses RocksDB to store application state). Kafka streams spawn a RockDB instance per state store, per partition assigned to the application (so an application instance with 2 state stores and 30 partitions spawns 2 x 30 = 60 partitions). As the number of partitions assigned to an application instance is dynamic, I'm looking for a an overall limit of the memory used by all RocksDB instances that's independent of the total number of partitions. I've been playing a lot with the different configurations discussed here & here, but I'm still not able to limit the memory usage the way I want. Following is the config implementation code:
class RocksDbConfig extends RocksDBConfigSetter with Logging { // ToDo: Remove 'with Logging'
import RocksDbConfig._
override def setConfig(storeName: String,
options: Options,
configs: util.Map[String, AnyRef]): Unit = {
val tableConfig: BlockBasedTableConfig = options
.tableFormatConfig() match { case config: BlockBasedTableConfig => config }
// Block cache and memtable settings
tableConfig.setBlockCache(cache)
// Increase max block size to make index block smaller
tableConfig.setBlockSize(MAX_BLOCK_SIZE)
// Index and filter blocks are allocated off-cache and can grow unbounded
// setting them in block cache will guarantee bounded memory usage
tableConfig.setCacheIndexAndFilterBlocks(true)
tableConfig.setPinTopLevelIndexAndFilter(true)
tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true)
options.setWriteBufferManager(writeBufferManager)
options.setMaxWriteBufferNumber(NUM_OF_MEM_TABLES)
options.setWriteBufferSize(MAX_WRITE_BUFFER_SIZE)
// ToDo: remove me!
log.info("Setting RocksDB configs",
KV("cache" -> cache.toString, "writeBufferManager" -> writeBufferManager.toString))
options.setTableFormatConfig(tableConfig)
}
override def close(storeName: String, options: Options): Unit = {}
}
object RocksDbConfig {
def toMB(size: Int) = size * 1024 * 1024L
def tokB(size: Int) = size * 1024L
val INDEX_FILTER_BLOCK_RATIO = 0.6
val NUM_OF_MEM_TABLES = 2
val TOTAL_BLOCK_CACHE_MEMORY = toMB(700) // Todo: Replace me with a better value
val MAX_WRITE_BUFFER_SIZE = toMB(64) // Todo: Replace me with a better value
val TOTAL_MEMTABLE_MEMORY = NUM_OF_MEM_TABLES * MAX_WRITE_BUFFER_SIZE
val TOTAL_OFF_HEAP_MEMORY = TOTAL_BLOCK_CACHE_MEMORY + TOTAL_MEMTABLE_MEMORY
val MAX_BLOCK_SIZE = tokB(8)
// All objects must be created in the companion object,
// to make sure all RocksDB instances get the same object.
val cache = new LRUCache(TOTAL_OFF_HEAP_MEMORY, -1, false, INDEX_FILTER_BLOCK_RATIO)
val writeBufferManager = new WriteBufferManager(TOTAL_MEMTABLE_MEMORY, cache)
}
Per my understanding, the total memory usage (TOTAL_OFF_HEAP_MEMORY) should be: 700 + 2 * 64 = 828MB,
however, what I see in practice is very different - memory usage starts around 1 GB when the pod boots up, and then it keeps growing to more than 2 GB in a couple of hours. I've checked the jvm heap allocation - and it's small. I also checked the off-heap allocation, and RocksDB is all over the plac
e. In addition, RocksDB metrics (generated by the Kafka streams library) shows the Block cache usage is around 25 GB (which is way more than the physical memory, so I guess most of it is paged to disk) and Block cache pinned usage around 3 GB. Memtable size looks good - ~100MB at the highest pick (see screenshots below).
Is there anything wrong with my code/understanding of how the RockDB memory limits work?
Setup details:
- Scala 2.13/Java 11
- RocksDB 6.29.4.1
- Memory allocator: jemalloc
- Total memory (each pod): 2 GB
- Total CPU (each pod): 2
