posthog
posthog copied to clipboard
Data exports can saturate ClickHouse resources
In what situation are you experiencing subpar performance?
When analyzing what our production ClickHouse cluster does, then during large exports, exports end up hogging almost as much resources as normal online workflows and slowing them down.
Analysis: https://metabase.posthog.net/question/528-debug-what-generates-ch-load
Chart by query time:
Chart by data read from disk:
Note we also read a lot more data from clickhouse than we ever return - indication that the underlying queries/streaming plan is flawed: https://metabase.posthog.net/question/529-exports-read-data
Potential improvements
Not set in stone, just pointing out first ideas.
On the plugin-server side:
- Switch to a client library that supports native protocol or use http compression - we're streaming a lot of data as uncompressed JSON right now.
- Benchmark and measure how to make the queries cheaper. Some ideas:
- Rather than ordering things by timestamp as we do currently, make sure how we order things matches table
ORDER BY
? - Create a separate (temporary) table for the events being exported and stream from there rather than issuing expensive queries?
- Rather than ordering things by timestamp as we do currently, make sure how we order things matches table
Generally:
- Separate ClickHouse replicas for online (analytics) and offline (ingestion, exports) workflows.
Environment
- [x] PostHog Cloud
- [ ] self-hosted PostHog, version/commit: please provide
Additional context
cc @xvello @fuziontech @ellie @yakkomajuri @hazzadous
Thank you for your performance issue report – we want PostHog to go supersonic!
I've had a quick review of this.
Re. suggestions:
- using compression - would be great for reducing over the wire transfer, reducing costs. Will need to enable
https://clickhouse.com/docs/en/operations/settings/settings#settings-enable_http_compression
and set the appropriate Accept-encoding header values. - Re timestamp, agree it's not ideal. This will read all timestamps for that team such that it can produce the ordered results to then be paginated through, on every request if I'm not mistaken. I was hoping that sorting by key and paginating through via filtering on the sort key would allow the query to miss some of that reading. Feels like there should be some easy wins here so will look at this first to see if we can get the rows read count down. (wip implementation here). It's possible that simply increasing the query limit might reduce the overall data read from disk also.
- separate table: I assume
INSERT INTO ... SELECT ...
is pretty optimized, although it presumably will have to do the same kind of querying we'd be doing anyway to pull out of the table directly. I'll give it a go though 👍 - Separate ClickHouse replicas sounds like a good strategy in general for QoS although I'll hold of doing this before Christmas.
I'll also have a look to see how much data we have to read as in how much data this team has, that should give us a lower bound. Also I suppose it's possible that we're retrying some parts over and again so will have a look in the logs also.
This will read all timestamps for that team such that it can produce the ordered results to then be paginated through, on every request if I'm not mistaken.
Not quite as bad luckily as toDate(timestamp) is in the event table order by but within a date you can assume all timestamps will be read to figure out what data to read.
Click to see some quick query analysis
EXPLAIN indexes=1, header=1
SELECT
event,
uuid,
team_id,
distinct_id,
properties,
timestamp,
created_at,
elements_chain
FROM
events
WHERE
team_id = 2635
AND timestamp >= '2022-01-03 04:50:00'
AND timestamp < '2022-01-03 05:00:00'
ORDER BY
timestamp
LIMIT
500 OFFSET 1000 FORMAT LineAsString
Results of this:
Header: event String
uuid UUID
team_id Int64
distinct_id String
properties String
timestamp DateTime64(6, 'UTC')
created_at DateTime64(6, 'UTC')
elements_chain String
Limit (preliminary LIMIT (without OFFSET))
Header: uuid UUID
event String
properties String
timestamp DateTime64(6, 'UTC')
team_id Int64
distinct_id String
created_at DateTime64(6, 'UTC')
elements_chain String
Sorting (Merge sorted streams after aggregation stage for ORDER BY)
Header: uuid UUID
event String
properties String
timestamp DateTime64(6, 'UTC')
team_id Int64
distinct_id String
created_at DateTime64(6, 'UTC')
elements_chain String
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Header: uuid UUID
event String
properties String
timestamp DateTime64(6, 'UTC')
team_id Int64
distinct_id String
created_at DateTime64(6, 'UTC')
elements_chain String
Union
Header: uuid UUID
event String
properties String
timestamp DateTime64(6, 'UTC')
team_id Int64
distinct_id String
created_at DateTime64(6, 'UTC')
elements_chain String
Expression
Header: uuid UUID
event String
properties String
timestamp DateTime64(6, 'UTC')
team_id Int64
distinct_id String
created_at DateTime64(6, 'UTC')
elements_chain String
Limit (preliminary LIMIT (with OFFSET))
Header: timestamp DateTime64(6, 'UTC')
uuid UUID
event String
properties String
team_id Int64
distinct_id String
created_at DateTime64(6, 'UTC')
elements_chain String
Sorting (Sorting for ORDER BY)
Header: timestamp DateTime64(6, 'UTC')
uuid UUID
event String
properties String
team_id Int64
distinct_id String
created_at DateTime64(6, 'UTC')
elements_chain String
Expression (Before ORDER BY)
Header: timestamp DateTime64(6, 'UTC')
uuid UUID
event String
properties String
team_id Int64
distinct_id String
created_at DateTime64(6, 'UTC')
elements_chain String
Filter (WHERE)
Header: timestamp DateTime64(6, 'UTC')
uuid UUID
event String
properties String
team_id Int64
distinct_id String
created_at DateTime64(6, 'UTC')
elements_chain String
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Header: and(greaterOrEquals(timestamp, '2022-01-03 04:50:00'), less(timestamp, '2022-01-03 05:00:00')) UInt8
timestamp DateTime64(6, 'UTC')
uuid UUID
event String
properties String
team_id Int64
distinct_id String
created_at DateTime64(6, 'UTC')
elements_chain String
ReadFromMergeTree
Header: and(greaterOrEquals(timestamp, '2022-01-03 04:50:00'), less(timestamp, '2022-01-03 05:00:00')) UInt8
timestamp DateTime64(6, 'UTC')
uuid UUID
event String
properties String
team_id Int64
distinct_id String
created_at DateTime64(6, 'UTC')
elements_chain String
Indexes:
MinMax
Keys:
timestamp
Condition: and(and((timestamp in (-Inf, '1641186000')), (timestamp in ['1641185400', +Inf))), and((timestamp in (-Inf, '1641186000')), (timestamp in ['1641185400', +Inf))))
Parts: 12/871
Granules: 457415/9874449
Partition
Keys:
toYYYYMM(timestamp)
Condition: and(and((toYYYYMM(timestamp) in (-Inf, 202201]), (toYYYYMM(timestamp) in [202201, +Inf))), and((toYYYYMM(timestamp) in (-Inf, 202201]), (toYYYYMM(timestamp) in [202201, +Inf))))
Parts: 12/12
Granules: 457415/457415
PrimaryKey
Keys:
team_id
toDate(timestamp)
Condition: and(and((toDate(timestamp) in (-Inf, 18995]), (toDate(timestamp) in [18995, +Inf))), and((team_id in [2635, 2635]), and((toDate(timestamp) in (-Inf, 18995]), (toDate(timestamp) in [18995, +Inf)))))
Parts: 10/12
Granules: 1098/457415
ReadFromRemote (Read from remote replica)
Header: uuid UUID
event String
properties String
timestamp DateTime64(6, 'UTC')
team_id Int64
distinct_id String
created_at DateTime64(6, 'UTC')
elements_chain String
You can see it filters out most projections but we still read 1098 granules (~8M rows) on one shard (and the same from other) and retain 1500 rows from both shards.
separate table: I assume INSERT INTO ... SELECT ... is pretty optimized, although it presumably will have to do the same kind of querying we'd be doing anyway to pull out of the table directly. I'll give it a go though +1
Difference is that it will:
- Do it all in one go (no more reading the same data many many times due to limit/offset)
- The new table can have a schema that avoids read overhead (e.g. ORDER BY timestamp, offset for efficient pagination)
- It will do it in the native protocol (and ideally within a single shard) avoiding overheads.
Some suggestions before building this:
- Remember this magic invocation before you INSERT:
set max_block_size=100000, max_insert_block_size=100000, max_threads=20, max_insert_threads=20, optimize_on_insert=0, max_execution_time=0
- Consider doing an
OPTIMIZE TABLE FINAL
on the temporary table after insert :) - Consider doing insert partition-by-partition for better resumability (_partition_id virtual column in sharded_events table)
Preliminary findings, operating on one shard but adjusting to two in calculations:
Looking at using a staging table with an order by timestamp, and reusing the logic we already have for exports, and performing the following:
- insert into staging_table select from sharded_events where team_id = x and toDate(timestamp) =
- select as the original export query
We get throughput of around 300k rows a second for the initial insert (this is an empty table so may decrease and may need to tweak order by accordingly)
For the actual export chunking query we get a 1/30th read bytes initially, scaling downwards linearly as we approach the end of the chunk to 0 bytes read if we include a further filter on timestamp.
So roughly we should decrease bytes read to a 1/60th of its current state.
I'll continue playing with this to see if there are further improvements to make, but it looks promising.
We increased the export batch size 500 -> 10k on the 23th, and this had a very positive impact on both the CH load and the export speed, see metabase
The load didn't fall by a factor of 20 because we are also exporting a lot faster, thanks to faster IO to the destination store. If the load is still too high, we could reduce the concurrency, but my guess is that it's better to get this export done with ASAP while the human traffic is low.
The 10k events batch size was pretty conservative, we could decide to increase it again too, assuming this won't put more memory strain on CH.
We increased the export batch size 500 -> 10k on the 23th, and this had a very positive impact on both the CH load and the export speed, see metabase
Thanks for this! 🙏
If the load is still too high, we could reduce the concurrency, but my guess is that it's better to get this export done with ASAP while the human traffic is low.
For further context concurrency was reduced from 30 to 5 on ~21st, it wouldn't have been immediate but kicked in as older tasks finished, which may account for some of the further variations in the graph. I'll pull some numbers to see when this export would be expected to complete. If we can/have meaningfully affect the performance to get the time down with bumping some values to get this through in a timely manner then it's worth a go.
Separately if the same changes would help with other export speed issues we can port the same changes to them, but will need to review the contract that we have on exports to see if we'd be breaking any expectations there.
The 10k events batch size was pretty conservative, we could decide to increase it again too, assuming this won't put more memory strain on CH.
We'd have two places to consider I guess, ClickHouse and the plugin-server pods. Looks like ClickHouse would scale linearly with this specific query as it stands fwiw.
Fixed by batch exports that don't hit CH that wastefully (bigger read batches). Leaving it open for now to discuss what polish is needed to ensure we don't regress on that front.