posthog icon indicating copy to clipboard operation
posthog copied to clipboard

Data exports can saturate ClickHouse resources

Open macobo opened this issue 2 years ago • 5 comments

In what situation are you experiencing subpar performance?

When analyzing what our production ClickHouse cluster does, then during large exports, exports end up hogging almost as much resources as normal online workflows and slowing them down.

Analysis: https://metabase.posthog.net/question/528-debug-what-generates-ch-load

Chart by query time: image

Chart by data read from disk: image

Note we also read a lot more data from clickhouse than we ever return - indication that the underlying queries/streaming plan is flawed: https://metabase.posthog.net/question/529-exports-read-data

Potential improvements

Not set in stone, just pointing out first ideas.

On the plugin-server side:

  • Switch to a client library that supports native protocol or use http compression - we're streaming a lot of data as uncompressed JSON right now.
  • Benchmark and measure how to make the queries cheaper. Some ideas:
    • Rather than ordering things by timestamp as we do currently, make sure how we order things matches table ORDER BY?
    • Create a separate (temporary) table for the events being exported and stream from there rather than issuing expensive queries?

Generally:

  • Separate ClickHouse replicas for online (analytics) and offline (ingestion, exports) workflows.

Environment

  • [x] PostHog Cloud
  • [ ] self-hosted PostHog, version/commit: please provide

Additional context

cc @xvello @fuziontech @ellie @yakkomajuri @hazzadous

Thank you for your performance issue report – we want PostHog to go supersonic!

macobo avatar Dec 15 '22 10:12 macobo

I've had a quick review of this.

Re. suggestions:

  1. using compression - would be great for reducing over the wire transfer, reducing costs. Will need to enable https://clickhouse.com/docs/en/operations/settings/settings#settings-enable_http_compression and set the appropriate Accept-encoding header values.
  2. Re timestamp, agree it's not ideal. This will read all timestamps for that team such that it can produce the ordered results to then be paginated through, on every request if I'm not mistaken. I was hoping that sorting by key and paginating through via filtering on the sort key would allow the query to miss some of that reading. Feels like there should be some easy wins here so will look at this first to see if we can get the rows read count down. (wip implementation here). It's possible that simply increasing the query limit might reduce the overall data read from disk also.
  3. separate table: I assume INSERT INTO ... SELECT ... is pretty optimized, although it presumably will have to do the same kind of querying we'd be doing anyway to pull out of the table directly. I'll give it a go though 👍
  4. Separate ClickHouse replicas sounds like a good strategy in general for QoS although I'll hold of doing this before Christmas.

I'll also have a look to see how much data we have to read as in how much data this team has, that should give us a lower bound. Also I suppose it's possible that we're retrying some parts over and again so will have a look in the logs also.

hazzadous avatar Dec 22 '22 08:12 hazzadous

This will read all timestamps for that team such that it can produce the ordered results to then be paginated through, on every request if I'm not mistaken.

Not quite as bad luckily as toDate(timestamp) is in the event table order by but within a date you can assume all timestamps will be read to figure out what data to read.

Click to see some quick query analysis
EXPLAIN indexes=1, header=1
                   SELECT
                       event,
                       uuid,
                       team_id,
                       distinct_id,
                       properties,
                       timestamp,
                       created_at,
                       elements_chain
                   FROM
                       events
                   WHERE
                       team_id = 2635
                       AND timestamp >= '2022-01-03 04:50:00'
                       AND timestamp < '2022-01-03 05:00:00'
                   ORDER BY
                       timestamp
                   LIMIT
                       500 OFFSET 1000 FORMAT LineAsString

Results of this:

Header: event String
        uuid UUID
        team_id Int64
        distinct_id String
        properties String
        timestamp DateTime64(6, 'UTC')
        created_at DateTime64(6, 'UTC')
        elements_chain String
  Limit (preliminary LIMIT (without OFFSET))
  Header: uuid UUID
          event String
          properties String
          timestamp DateTime64(6, 'UTC')
          team_id Int64
          distinct_id String
          created_at DateTime64(6, 'UTC')
          elements_chain String
    Sorting (Merge sorted streams after aggregation stage for ORDER BY)
    Header: uuid UUID
            event String
            properties String
            timestamp DateTime64(6, 'UTC')
            team_id Int64
            distinct_id String
            created_at DateTime64(6, 'UTC')
            elements_chain String
      SettingQuotaAndLimits (Set limits and quota after reading from storage)
      Header: uuid UUID
              event String
              properties String
              timestamp DateTime64(6, 'UTC')
              team_id Int64
              distinct_id String
              created_at DateTime64(6, 'UTC')
              elements_chain String
        Union
        Header: uuid UUID
                event String
                properties String
                timestamp DateTime64(6, 'UTC')
                team_id Int64
                distinct_id String
                created_at DateTime64(6, 'UTC')
                elements_chain String
          Expression
          Header: uuid UUID
                  event String
                  properties String
                  timestamp DateTime64(6, 'UTC')
                  team_id Int64
                  distinct_id String
                  created_at DateTime64(6, 'UTC')
                  elements_chain String
            Limit (preliminary LIMIT (with OFFSET))
            Header: timestamp DateTime64(6, 'UTC')
                    uuid UUID
                    event String
                    properties String
                    team_id Int64
                    distinct_id String
                    created_at DateTime64(6, 'UTC')
                    elements_chain String
              Sorting (Sorting for ORDER BY)
              Header: timestamp DateTime64(6, 'UTC')
                      uuid UUID
                      event String
                      properties String
                      team_id Int64
                      distinct_id String
                      created_at DateTime64(6, 'UTC')
                      elements_chain String
                Expression (Before ORDER BY)
                Header: timestamp DateTime64(6, 'UTC')
                        uuid UUID
                        event String
                        properties String
                        team_id Int64
                        distinct_id String
                        created_at DateTime64(6, 'UTC')
                        elements_chain String
                  Filter (WHERE)
                  Header: timestamp DateTime64(6, 'UTC')
                          uuid UUID
                          event String
                          properties String
                          team_id Int64
                          distinct_id String
                          created_at DateTime64(6, 'UTC')
                          elements_chain String
                    SettingQuotaAndLimits (Set limits and quota after reading from storage)
                    Header: and(greaterOrEquals(timestamp, '2022-01-03 04:50:00'), less(timestamp, '2022-01-03 05:00:00')) UInt8
                            timestamp DateTime64(6, 'UTC')
                            uuid UUID
                            event String
                            properties String
                            team_id Int64
                            distinct_id String
                            created_at DateTime64(6, 'UTC')
                            elements_chain String
                      ReadFromMergeTree
                      Header: and(greaterOrEquals(timestamp, '2022-01-03 04:50:00'), less(timestamp, '2022-01-03 05:00:00')) UInt8
                              timestamp DateTime64(6, 'UTC')
                              uuid UUID
                              event String
                              properties String
                              team_id Int64
                              distinct_id String
                              created_at DateTime64(6, 'UTC')
                              elements_chain String
                      Indexes:
                        MinMax
                          Keys: 
                            timestamp
                          Condition: and(and((timestamp in (-Inf, '1641186000')), (timestamp in ['1641185400', +Inf))), and((timestamp in (-Inf, '1641186000')), (timestamp in ['1641185400', +Inf))))
                          Parts: 12/871
                          Granules: 457415/9874449
                        Partition
                          Keys: 
                            toYYYYMM(timestamp)
                          Condition: and(and((toYYYYMM(timestamp) in (-Inf, 202201]), (toYYYYMM(timestamp) in [202201, +Inf))), and((toYYYYMM(timestamp) in (-Inf, 202201]), (toYYYYMM(timestamp) in [202201, +Inf))))
                          Parts: 12/12
                          Granules: 457415/457415
                        PrimaryKey
                          Keys: 
                            team_id
                            toDate(timestamp)
                          Condition: and(and((toDate(timestamp) in (-Inf, 18995]), (toDate(timestamp) in [18995, +Inf))), and((team_id in [2635, 2635]), and((toDate(timestamp) in (-Inf, 18995]), (toDate(timestamp) in [18995, +Inf)))))
                          Parts: 10/12
                          Granules: 1098/457415
          ReadFromRemote (Read from remote replica)
          Header: uuid UUID
                  event String
                  properties String
                  timestamp DateTime64(6, 'UTC')
                  team_id Int64
                  distinct_id String
                  created_at DateTime64(6, 'UTC')
                  elements_chain String

You can see it filters out most projections but we still read 1098 granules (~8M rows) on one shard (and the same from other) and retain 1500 rows from both shards.

separate table: I assume INSERT INTO ... SELECT ... is pretty optimized, although it presumably will have to do the same kind of querying we'd be doing anyway to pull out of the table directly. I'll give it a go though +1

Difference is that it will:

  1. Do it all in one go (no more reading the same data many many times due to limit/offset)
  2. The new table can have a schema that avoids read overhead (e.g. ORDER BY timestamp, offset for efficient pagination)
  3. It will do it in the native protocol (and ideally within a single shard) avoiding overheads.

Some suggestions before building this:

  • Remember this magic invocation before you INSERT: set max_block_size=100000, max_insert_block_size=100000, max_threads=20, max_insert_threads=20, optimize_on_insert=0, max_execution_time=0
  • Consider doing an OPTIMIZE TABLE FINAL on the temporary table after insert :)
  • Consider doing insert partition-by-partition for better resumability (_partition_id virtual column in sharded_events table)

macobo avatar Dec 22 '22 09:12 macobo

Preliminary findings, operating on one shard but adjusting to two in calculations:

Looking at using a staging table with an order by timestamp, and reusing the logic we already have for exports, and performing the following:

  1. insert into staging_table select from sharded_events where team_id = x and toDate(timestamp) =
  2. select as the original export query

We get throughput of around 300k rows a second for the initial insert (this is an empty table so may decrease and may need to tweak order by accordingly)

For the actual export chunking query we get a 1/30th read bytes initially, scaling downwards linearly as we approach the end of the chunk to 0 bytes read if we include a further filter on timestamp.

So roughly we should decrease bytes read to a 1/60th of its current state.

I'll continue playing with this to see if there are further improvements to make, but it looks promising.

hazzadous avatar Dec 22 '22 14:12 hazzadous

We increased the export batch size 500 -> 10k on the 23th, and this had a very positive impact on both the CH load and the export speed, see metabase

image

The load didn't fall by a factor of 20 because we are also exporting a lot faster, thanks to faster IO to the destination store. If the load is still too high, we could reduce the concurrency, but my guess is that it's better to get this export done with ASAP while the human traffic is low.

image

The 10k events batch size was pretty conservative, we could decide to increase it again too, assuming this won't put more memory strain on CH.

xvello avatar Dec 27 '22 10:12 xvello

We increased the export batch size 500 -> 10k on the 23th, and this had a very positive impact on both the CH load and the export speed, see metabase

Thanks for this! 🙏

If the load is still too high, we could reduce the concurrency, but my guess is that it's better to get this export done with ASAP while the human traffic is low.

For further context concurrency was reduced from 30 to 5 on ~21st, it wouldn't have been immediate but kicked in as older tasks finished, which may account for some of the further variations in the graph. I'll pull some numbers to see when this export would be expected to complete. If we can/have meaningfully affect the performance to get the time down with bumping some values to get this through in a timely manner then it's worth a go.

Separately if the same changes would help with other export speed issues we can port the same changes to them, but will need to review the contract that we have on exports to see if we'd be breaking any expectations there.

The 10k events batch size was pretty conservative, we could decide to increase it again too, assuming this won't put more memory strain on CH.

We'd have two places to consider I guess, ClickHouse and the plugin-server pods. Looks like ClickHouse would scale linearly with this specific query as it stands fwiw.

hazzadous avatar Dec 28 '22 10:12 hazzadous

Fixed by batch exports that don't hit CH that wastefully (bigger read batches). Leaving it open for now to discuss what polish is needed to ensure we don't regress on that front.

xvello avatar Nov 06 '23 09:11 xvello