Avoid OOM when running snapshots at higher throughput
Problem statement
Currently when throughput is quite high (in relation to the resources available) snapshots can run into OOM even when force snapshotting is set. The problem can be reproduced locally and it is because sort/dedupe step requires significant amount of memory (~40% of the memory of the total memory - see flamegraph below marked as (2)). When forcing a snapshot at 70% (for example), it needs 40% of that 70% to run a successful snapshot.
This flamegraph was produced with 1.63 MB/s with server running at 1GiB max memory and 2 cores of CPU and all the defaults (70% for forcing snapshot). It OOM'd after the first force snapshot and sort/dedupe step needed ~285 MiB for snapshotting ~700 MiB of data.
Proposed Solution
In order to avoid running into OOMs, we can either snapshot more often (either by increasing wal flush intervals or snapshot size) but that'd mean evicting data out of the query buffer which is not ideal. Current view is, when snapshotting
- check the available memory and set an upper bound that can be used in the sort/dedupe step.
- break down the number of rows such that it can fit within the upper bound.
- run through sort/dedupe and produce smaller parquet files.
- avoid running this step in parallel when forcing a snapshot as we're already short of memory.
There is still a possibility to overwhelm the system and run into OOM if the memory profile changes as the sort/dedupe step is running with a big increase in throughput as this step is running. However that would impact the system as a whole and not specific to just the snapshotting process. In this case it is up to the user to decide increasing resources required to accommodate the throughput.
Alternatives considered
- Running with lower wal flush intervals and/or snapshot size such that snapshots are taken more often thus avoiding the buildup to OOM. This will mean evicting data out of the queryable buffer which is not ideal
- Triggering force snapshots at a lesser percentage of memory (moving default to 50% or even 30% for example for higher throughputs). This again means evicting data out of the queryable buffer earlier
Both the approaches above trigger snapshot earlier and evict the data out of the buffer but they could potentially produce a bigger parquet file than the ones produced in proposed solution. However, these approaches will consistently produce smaller files always compare to the proposed solution as it only produces smaller files when forcing a snapshot (or more precisely when there isn't enough memory).
Additional context
The perf team had run into this issue and so have others and the usual workaround for working around resource consumption has been to lower the percentages to 50% or 30% to force a snapshot.
For the part where it breaks down the data and produces smaller Parquet files. When doing this within a table chunk, it should make sure that the resulting Parquet files are non-overlapping in time.
@pauldix - I've looked into the code that creates MutableTableChunks. I don't think producing non-overlapping (in time) chunks for smaller parquet files can be done readily, as all the fine grained details go into the MutableBuffer the buffer that backs all the Builder variants. I can see three options,
-
Break the
query_chunk_time(which is derived from gen 1 duration atm) further, maybe a minute or even fine grained like 30 secs if there's a lot of data coming in. When we filter the chunks for querying it'd have to scan few morequery_chunk_times, which is probably not too taxing. However this would mean we'd have to hold more of theseMutableBuffers. So instead of 1 buffer holding 10mins worth of data, we may have 10 buffers holding a minute's worth of data. I don't know if that's costly or not in terms of memory. -
(or) Try and workout if values_slice on
PrimitiveBuildercan be used to peek at the values it's holding for timestamp and filter out the indexes. This assumes that the index from one column for a row will match the index on other columns held (inBuilders). This might take some time too. Maybe this idea can be extended by building an index in memory as data is buffered into these builders such that it holds the mappingtimestamp -> array indexfor the builders to reduce the lookup times -
(or) Try and filter out from
RecordBatches (generated from calling snapshot method onTableBuffer) for the time slice to write to parquet file. The lookup times are probably going to be O(n)?
I'm not proficient with Arrow arrays, so I could be wrong with options 2 and 3. I can experiment to find out, but it'd be good to see what you think will work - are these approaches first of all feasible, if so which one do you think will work better? If you've any other approach in mind happy to follow that through.
You can break an individual chunk down into multiple parts by looping through the time array and figuring out where you want to do the split. All the other columns have matching indexes so you'd be able to build two separate record batches to split them off. I actually wrote this function quite some time ago in a previous implementation of the write buffer. It's buried somewhere in the long ago commits, but it shouldn't be too hard to create.
That will let you break them into smaller persist jobs, but you still have the memory problem if you allow it all the buffer up before you start breaking it down. The tricky bit is that the WAL snapshot logic is very much tied to the fixed chunk times (the gen1 duration).
One way to do this would be to change the chunk duration inside the queryable buffer to be 1m. And then let the WAL snapshot in 1m intervals. And when the snapshot is run, rather than persisting whatever number of 1m files, you just combine the 1m buffer chunks into a persist job to create a single parquet file.
This would make it so that in low throughput scenarios, the WAL could actually buffer up the entire window of time and the snapshot would produce a single parquet file. In high throughput scenarios, the snapshot would produce parquet files more frequently than every 10m.
One way to do this would be to change the chunk duration inside the queryable buffer to be 1m. And then let the WAL snapshot in 1m intervals.
I think this is what I'm trying to do right now - WAL is tied to 10m and when it receives TableChunks to add to query buffer, instead of using the chunk time (based off 10m gen 1 duration) I'm going through each Row and partitioning the time by 1m. I've also removed any parallel ops at the moment. I'm still running into OOMs when throughput is much higher (like 4x the previous, what was OOM'ing @ 1.63 MB/s is now OOM'ing around 5.43 MB/s). I'll run through the numbers and setup to see if I've got something wrong. I might have to break this down further, it's doing by 1m, maybe I should break it down further by 30s or should be a function of growth rate of the query buffer itself. I've not checked the parquet file sizes or whether the queries are working yet 😄, when I get to that I can probably group together multiple chunks based on the memory available into a single file. I can probably re-enable parallelizing it as well at that point.
As a general question - is there a preference to size of the parquet file? Are larger files preferred for gen 1? For some reason I assumed the smaller the file sizes (more files in that case), the more datafusion can do in parallel (I haven't verified if that's the case).
On a separate note, the default memory pool size we set for datafusion should be relative to what the process is given (current default is 8G - ignores caps set on process). That'd hopefully translate this to an allocation error at the sort/dedupe step (which should be in the logs) instead of quietly running into OOMs - I'll raise a PR for that, not sure what should be the default though. I don't have a good handle on how datafusion uses the memory pool to set a sane default for this - do you have any suggestions?
The problem with changing the chunk time in the queryable buffer and then not changing it in the WAL is that the snapshot logic could be wrong. I'd have to think through your changes to know.
Ideally, you want the WAL to snapshot some of the WAL files, but not all of them. If you have files 1-10 (with 1 being oldest file and 10 being newest), you want to snapshot files 1-5, leaving the leading 6-10 around until you get another five built up.
The reason for this is that arriving data can have lagged timestamps. So when you snapshot files 1-5, what you're actually doing is writing all data with a timestamp <= the max timestamp in those files. This window of time could, and likely does include data from file 6 and maybe beyond. This is the lagged part.
Persisting that data that is in WAL files we're still keeping around means that we have a chance at creating Parquet files that are non-overlapping in time.
Later, when we snapshot files 6-10, we already know that all data in those files with time <= the previous snapshot max in files 1-5 has been persisted. So at this point we're snapshotting all data with time <= max from 6-10, which will likely include data from some of the now newer files 11+.
Anyway, if you reduce the snapshot tracker to look at 1m intervals in the WAL, that will coincide with how you can break things up in the query buffer. Mainly we don't want to force a Parquet file to be persisted every minute if there isn't much data around.
I thought the flow will still be fine given the filtering at the query buffer level is still the same (i.e driven by the max of wal period) but the number of persist jobs created from the chunks will be more. Instead of 1 chunk, you may end up with 5 chunks. But the rows within those chunks will not be overlapping in time (I think). I'll run through some examples once this approach looks feasible (hopefully in the morning) and it might be easier for you to find the hole in my logic with the examples.
@pauldix - here's what I'm trying to do, in the "existing approach" gen 1 duration is set to 4 and that's used by query buffer implicitly (as TableChunk already has the derived chunk time). When adding the wal write batches into query buffer I'm thinking going through the row times and deriving the chunk time again should leave us with more fine grained number of keys in query buffer. In the example below I'm using 1 as the chunk duration.
Example
wal file 1 contains 1-10 time ranges, and file 2 contains 3-15 time ranges
- 1 [1 - 10] (actual row tstamps: 1, 2, 3, 5, 8, 9, 10)
- 2 [3 - 15] (actual row tstamps: 3, 5, 8, 12, 14, 15)
Existing approach
let's say query buffer looks like below (query_chunk_time mapping) - 4 is gen1 duration, read this as chunk time 0 contains rows from files 1 and 2 and the individual rows coming from these files are also called out
- 0 (1, 2) (1 rows: 1, 2, 3 + 2 rows: 3)
- 4 (1, 2) (1 rows: 5 + 2 rows: 5)
- 8 (1, 2) (1 rows: 8, 9, 10 + 2 rows: 8)
- 12 (2) (2 rows: 12, 14, 15)
snapshot 1, max is 10 -> remove 0, 4, 8 keys from query buffer leaving 12
- 12 (2) (2 rows: 12, 14, 15)
Alternate approach (smaller chunk times for query buffer)
wal file 1 contains 1-10 time ranges, and file 2 contains 3-15 time ranges (this is same as above)
- 1 [1 - 10] (actual row tstamps: 1, 2, 3, 5, 8, 9, 10)
- 2 [3 - 15] (actual row tstamps: 3, 5, 8, 12, 14, 15)
query buffer looks like, (query_chunk_time mapping) but now, when adding a table chunk into query buffer, the actual Row held by TableChunk is used for calculating the chunk it should belong to (here using 1 as the duration)
- 1 (1) (1 rows: 1)
- 2 (1) (1 rows: 2)
- 3 (1, 2) (1 rows: 3, 2 rows: 3)
- 5 (1, 2) (1 rows: 5, 2 rows: 5)
- 8 (1, 2) (1 rows: 8, 2 rows: 8)
- 9 (1, 2) (1 rows: 9)
- 10 (1, 2) (1 rows: 10)
- 12 (2) (2 rows: 12)
- 14 (2) (2 rows: 14)
- 15 (2) (2 rows: 15)
snapshot 1, max is 10 -> remove 1, 2, 3, 5, 8, 9, 10 keys, leaving
- 12 (2) (2 rows: 12)
- 14 (2) (2 rows: 14)
- 15 (2) (2 rows: 15)
snapshotting 1 in both approaches leaves the same rows in memory (12, 14, 15)
Could you please see where I'm going wrong with this logic? Each of those are currently written as separate parquet files but I think what we'd want is some mechanism to group them together when there aren't may rows in the parquet file (like set at least 10kb minimum before writing it out) and also equally write lesser than say X MB (X is based on memory available and roughly how much is required for sort/dedupe) files.
@praveen-influx With the latest commit as of Feb 26th,
- No more OOMs are observed 🎉
- 2 hours going good, whereas with the current code we OOM in about 50 minutes
- CPU is still at 100% when the snapshotting happens
Some log info,
2025-02-27T01:45:57.803897Z DEBUG influxdb3_wal::snapshot_tracker: >>> wal periods and snapshots wal_periods_len=894 num_snapshots_after=900
2025-02-27T01:44:55.877880Z DEBUG influxdb3_write::write_buffer: checking buffer size and snapshotting current_buffer_size_bytes=3055606060 memory_threshold_bytes=4054065152
Note: I will be letting this run overnight to observe how stable it is.
For reference, we are simulating 1K rows per second each across 50 tables, cumulative of 50K rows per second on a small cluster of 2 cores and 8 GiB RAM.
Thanks @MaduMitha-Ravi - this proves the normal snapshotting can hold up fine which is good. It'd also be good check
-
if force snapshotting can cope as well. This requires increasing the throughput such that before it gets to do a normal snapshot, the query buffer is full (50% of the total memory) and hence it'll force a snapshot. I think you had tests with throughputs to trigger that scenario as well.
-
the query performance hasn't degraded by running query perf related tests against this branch as well.
Point 1 - Complete and verified by Praveen, below are the details.
- Ingestion rate of 120K rows per second across 50 tables
- Approx. 27 GiB of data written
- No OOMs observed, less than 7 GiB on 8 GiB RAM
- CPU usage ~95% on snapshotting process (1 core is spent on the snapshot)
Note: Working on Point 2
Query perf results on the branch (rebased version)
Here is the detailed document on the experiments and results.
Overall, we are good to proceed with the merge and then plan on the next-level of optimizations.
To-Do:
- Execute the query suite on Cold data (10+ hours look back)
- Reduce force snapshot - need to align with @praveen-influx on this
Thanks @MaduMitha-Ravi - I looked into profiling the query path to understand where the difference comes from between main (base as in @MaduMitha-Ravi 's graphs) and the branch.
@pauldix - should we do anymore work to see if we can shave off that 10ms? Although, I'm not sure what other approaches I can take other than making the chunk times dynamic per table based on throughput, also perhaps copies can be done in parallel, but handing off the copies to another rayon or some thread pool maybe expensive in itself. A bit more detailed study on this issue below,
It seems to be the calls to finish_cloned that results in memcpy (translates to __memove_avx_unaligned_erms in my x86_64 box that copies 32 bytes at a time). It seems to be more efficient (at least at lower volumes and hence lower data) to copy one whole chunk (10m gen 1 duration) compared to copying multiple 1m durations (5 for 5m queries and 10 for 10m queries).
I've ran the tests few different times with different throughputs, in my box with 25% CPU cap and 8G memory cap, after it moves beyond 3.84 MB/s, having 1m gen1 duration starts to perform better (I've included one such profiled info below).
Profile 1
- Query used locally:
select count(*) from cpu where time > now() - interval '5 minutes' limit 1 - Using perf:
perf record -F 99 -p $PID --call-graph dwarfand visualised using hotspot
Setup
- 0.43 MB/s (3.2k lines)
1m gen1 duration
10m gen1 duration
- 1m gen1 duration reporting
finish_clonedwhen getting table chunks - 7.53% of CPU time - 10m gen duration also reported finish_cloned when getting table chunks - 1.52% of CPU time
Profile 2
Setup
- 5.42 MB/s (40k lines)
1m gen1 duration
10m gen1 duration
- 1m gen1 duration reporting finish_cloned() when getting table chunks - 1.86% of CPU time
- 10m gen duration also reported finish_cloned when getting table chunks - 1.48% of CPU time
The difference in CPU times have narrowed with increase in throughput.
I'm not sure it's worth the extra effort to optimize that at this moment. We should come back to it, but I think this can probably be closed.