timescaledb icon indicating copy to clipboard operation
timescaledb copied to clipboard

[Bug]: Very slow performance of cagg_watermark function, when running multiple queries in parallel on cagg with real time aggregation on

Open DZDomi opened this issue 2 years ago • 6 comments

What type of bug is this?

Performance issue

What subsystems and features are affected?

Continuous aggregate

What happened?

Hello everybody,

We are currently in the process of building a solution for candle stick (ohlc) data from different crypto exchanges (binance/coinbase etc.) and we noticed that we have very slow (magnitudes slower than expected) planning times on the caggs (with real time aggregation on), than we have on the time bucket query on the original hypertable + the cagg (without real time agg on). In our use case we have very bursty workloads, we are expecting around 500 - 1000 requests in approximately the same 1-10 second period. The drop off the performance is especially noticeable at around the 100+ queries in parallel on these views.

The raw data in the hypertables is around:

  • 300-400 Gigabyte uncompressed
  • 120 Gigabyte compressed

We did a lot of initial investigation for choosing the correct chunk size for each exchange (since they have different amount of instruments). We base our assumption currently on a server with 4 cpu cores/16gb ram (+1 replica). We looked up best practices for the chunks and found the recommendation from the timescale team that you should be able to fit one chunk of all your hypertables in around 25% (shared_buffers) of the available memory. The following chunks in each table reflect that (below 400 Megabyte for each chunk).

Our current setup consists of the following example schemas (with binance as an example):

  • Hypertable, we are using an integer based column for the partitioning (unix timestamp in milliseconds)
CREATE TABLE candles_binance_60
(
    start       BIGINT          NOT NULL,
    instrument  TEXT            NOT NULL,
    open        NUMERIC(30, 15) NOT NULL,
    high        NUMERIC(30, 15) NOT NULL,
    low         NUMERIC(30, 15) NOT NULL,
    close       NUMERIC(30, 15) NOT NULL,
    volume      NUMERIC(30, 15) NOT NULL,
    -- other non relevant fields
);

SELECT create_hypertable('candles_binance_60', 'start', chunk_time_interval => 43200000);
SELECT set_integer_now_func('candles_binance_60', 'unix_milliseconds');
  • Indices on the hypertable
create unique index on candles_binance_60(instrument, start DESC);
  • Example view (we have different intervals 300, 900, ..., 86400)
CREATE MATERIALIZED VIEW candles_binance_300
WITH (timescaledb.continuous) AS
    SELECT
        time_bucket(300000, start) AS bucket_start,
        instrument,
        FIRST(open, start) AS "open",
        MAX(high) AS high,
        MIN(low) AS low,
        LAST(close, start) AS "close",
        SUM(volume) as volume,
        -- other non relevant fields
    FROM candles_binance_60
    GROUP BY instrument, bucket_start
WITH NO DATA;
  • Compression on hypertable + views (after 2 months)
ALTER TABLE candles_binance_60
SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'instrument'
);
SELECT add_compression_policy('candles_binance_60', 5184000000);

-- Views
ALTER MATERIALIZED VIEW candles_binance_300 SET (
    timescaledb.compress = true
);
SELECT add_compression_policy('candles_binance_300', 5184000000);

After we inserted all the raw data into the hypertables, we created and refreshed all the views, to make sure we have data in the materialised view. After this we enabled compression and let the background jobs finish.

When we did our first test benchmarks on the caggs with different intervals, we noticed a very big increase in planning time, which became especially noticeable the more queries (100+) were running in parallel on a single cagg. Below is an example query and the explain which causes a very high planning time:

query:

EXPLAIN (ANALYZE, COSTS, VERBOSE, BUFFERS) 
SELECT *
FROM candles_binance_900
WHERE instrument = 'FILUSDT' and bucket_start between 1659783328000 and 1661580254000
ORDER BY bucket_start
LIMIT 30000

explain:

Limit  (cost=1163.26..1242.98 rows=2007 width=92) (actual time=10.353..10.789 rows=1997 loops=1)
  Output: _materialized_hypertable_29.bucket_start, _materialized_hypertable_29.instrument, _materialized_hypertable_29.open, _materialized_hypertable_29.high, _materialized_hypertable_29.low, _materialized_hypertable_29.close, _materialized_hypertable_29.volume, _materialized_hypertable_29.trades, _materialized_hypertable_29.buy_volume, _materialized_hypertable_29.sell_volume, _materialized_hypertable_29.buys, _materialized_hypertable_29.sells, _materialized_hypertable_29.vwap, _materialized_hypertable_29.missed
  Buffers: shared hit=52 read=14
  I/O Timings: read=8.976
  ->  Merge Append  (cost=1163.26..1242.98 rows=2007 width=92) (actual time=10.352..10.662 rows=1997 loops=1)
        Sort Key: _materialized_hypertable_29.bucket_start
        Buffers: shared hit=52 read=14
        I/O Timings: read=8.976
        ->  Sort  (cost=1153.25..1158.21 rows=1985 width=92) (actual time=10.350..10.509 rows=1997 loops=1)
              Output: _materialized_hypertable_29.bucket_start, _materialized_hypertable_29.instrument, _materialized_hypertable_29.open, _materialized_hypertable_29.high, _materialized_hypertable_29.low, _materialized_hypertable_29.close, _materialized_hypertable_29.volume, _materialized_hypertable_29.trades, _materialized_hypertable_29.buy_volume, _materialized_hypertable_29.sell_volume, _materialized_hypertable_29.buys, _materialized_hypertable_29.sells, _materialized_hypertable_29.vwap, _materialized_hypertable_29.missed
              Sort Key: _materialized_hypertable_29.bucket_start
              Sort Method: quicksort  Memory: 500kB
              Buffers: shared hit=52 read=14
              I/O Timings: read=8.976
              ->  Custom Scan (ChunkAppend) on _timescaledb_internal._materialized_hypertable_29  (cost=0.43..1024.67 rows=1985 width=92) (actual time=0.875..9.905 rows=1997 loops=1)
                    Output: _materialized_hypertable_29.bucket_start, _materialized_hypertable_29.instrument, _materialized_hypertable_29.open, _materialized_hypertable_29.high, _materialized_hypertable_29.low, _materialized_hypertable_29.close, _materialized_hypertable_29.volume, _materialized_hypertable_29.trades, _materialized_hypertable_29.buy_volume, _materialized_hypertable_29.sell_volume, _materialized_hypertable_29.buys, _materialized_hypertable_29.sells, _materialized_hypertable_29.vwap, _materialized_hypertable_29.missed
                    Startup Exclusion: true
                    Runtime Exclusion: false
                    Chunks excluded during startup: 0
                    Buffers: shared hit=52 read=14
                    I/O Timings: read=8.976
                    ->  Index Scan using _hyper_29_19961_chunk__materialized_hypertable_29_instrument_bu on _timescaledb_internal._hyper_29_19961_chunk  (cost=0.43..471.21 rows=913 width=92) (actual time=0.874..2.664 rows=916 loops=1)
                          Output: _hyper_29_19961_chunk.bucket_start, _hyper_29_19961_chunk.instrument, _hyper_29_19961_chunk.open, _hyper_29_19961_chunk.high, _hyper_29_19961_chunk.low, _hyper_29_19961_chunk.close, _hyper_29_19961_chunk.volume, _hyper_29_19961_chunk.trades, _hyper_29_19961_chunk.buy_volume, _hyper_29_19961_chunk.sell_volume, _hyper_29_19961_chunk.buys, _hyper_29_19961_chunk.sells, _hyper_29_19961_chunk.vwap, _hyper_29_19961_chunk.missed
                          Index Cond: ((_hyper_29_19961_chunk.instrument = 'FILUSDT'::text) AND (_hyper_29_19961_chunk.bucket_start < COALESCE(_timescaledb_internal.cagg_watermark(29), '-9223372036854775808'::bigint)) AND (_hyper_29_19961_chunk.bucket_start >= '1659783328000'::bigint) AND (_hyper_29_19961_chunk.bucket_start <= '1661580254000'::bigint))
                          Buffers: shared hit=23 read=7
                          I/O Timings: read=2.398
                    ->  Index Scan using _hyper_29_19962_chunk__materialized_hypertable_29_instrument_bu on _timescaledb_internal._hyper_29_19962_chunk  (cost=0.43..489.49 rows=951 width=92) (actual time=0.052..0.762 rows=960 loops=1)
                          Output: _hyper_29_19962_chunk.bucket_start, _hyper_29_19962_chunk.instrument, _hyper_29_19962_chunk.open, _hyper_29_19962_chunk.high, _hyper_29_19962_chunk.low, _hyper_29_19962_chunk.close, _hyper_29_19962_chunk.volume, _hyper_29_19962_chunk.trades, _hyper_29_19962_chunk.buy_volume, _hyper_29_19962_chunk.sell_volume, _hyper_29_19962_chunk.buys, _hyper_29_19962_chunk.sells, _hyper_29_19962_chunk.vwap, _hyper_29_19962_chunk.missed
                          Index Cond: ((_hyper_29_19962_chunk.instrument = 'FILUSDT'::text) AND (_hyper_29_19962_chunk.bucket_start < COALESCE(_timescaledb_internal.cagg_watermark(29), '-9223372036854775808'::bigint)) AND (_hyper_29_19962_chunk.bucket_start >= '1659783328000'::bigint) AND (_hyper_29_19962_chunk.bucket_start <= '1661580254000'::bigint))
                          Buffers: shared hit=28 read=1
                          I/O Timings: read=0.304
                    ->  Index Scan using _hyper_29_19963_chunk__materialized_hypertable_29_instrument_bu on _timescaledb_internal._hyper_29_19963_chunk  (cost=0.43..63.97 rows=121 width=90) (actual time=2.347..6.344 rows=121 loops=1)
                          Output: _hyper_29_19963_chunk.bucket_start, _hyper_29_19963_chunk.instrument, _hyper_29_19963_chunk.open, _hyper_29_19963_chunk.high, _hyper_29_19963_chunk.low, _hyper_29_19963_chunk.close, _hyper_29_19963_chunk.volume, _hyper_29_19963_chunk.trades, _hyper_29_19963_chunk.buy_volume, _hyper_29_19963_chunk.sell_volume, _hyper_29_19963_chunk.buys, _hyper_29_19963_chunk.sells, _hyper_29_19963_chunk.vwap, _hyper_29_19963_chunk.missed
                          Index Cond: ((_hyper_29_19963_chunk.instrument = 'FILUSDT'::text) AND (_hyper_29_19963_chunk.bucket_start < COALESCE(_timescaledb_internal.cagg_watermark(29), '-9223372036854775808'::bigint)) AND (_hyper_29_19963_chunk.bucket_start >= '1659783328000'::bigint) AND (_hyper_29_19963_chunk.bucket_start <= '1661580254000'::bigint))
                          Buffers: shared hit=1 read=6
                          I/O Timings: read=6.275
        ->  GroupAggregate  (cost=10.01..64.47 rows=22 width=298) (actual time=0.001..0.001 rows=0 loops=1)
              Output: (time_bucket('900000'::bigint, candles_binance_60.start)), candles_binance_60.instrument, first(candles_binance_60.open, candles_binance_60.start), max(candles_binance_60.high), min(candles_binance_60.low), last(candles_binance_60.close, candles_binance_60.start), sum(candles_binance_60.volume), sum(candles_binance_60.trades), sum(candles_binance_60.buy_volume), sum(candles_binance_60.sell_volume), sum(candles_binance_60.buys), sum(candles_binance_60.sells), CASE WHEN (sum(candles_binance_60.volume) > '0'::numeric) THEN (sum((candles_binance_60.vwap * candles_binance_60.volume)) / sum(candles_binance_60.volume)) ELSE '0'::numeric END, max(candles_binance_60.missed)
              Group Key: candles_binance_60.instrument, (time_bucket('900000'::bigint, candles_binance_60.start))
              ->  Custom Scan (ConstraintAwareAppend)  (cost=10.01..62.93 rows=22 width=76) (actual time=0.000..0.001 rows=0 loops=1)
                    Output: (time_bucket('900000'::bigint, candles_binance_60.start)), candles_binance_60.instrument, candles_binance_60.open, candles_binance_60.start, candles_binance_60.high, candles_binance_60.low, candles_binance_60.close, candles_binance_60.volume, candles_binance_60.trades, candles_binance_60.buy_volume, candles_binance_60.sell_volume, candles_binance_60.buys, candles_binance_60.sells, candles_binance_60.vwap, candles_binance_60.missed
                    Hypertable: candles_binance_60
                    Chunks excluded during startup: 22
Query Identifier: 2417736485277390874
Planning:
  Buffers: shared hit=84615
Planning Time: 13711.166 ms <-- Extrem planning times
Execution Time: 11.281 ms

This affected all views. After turning off the real time aggregation: timescaledb.materialized_only = true, we saw the high planning times go away. So we started to dig deeper and looked into the explain from the view again (with real time aggregation on) and found the following line, which we think caused the slow down:

_hyper_29_19961_chunk.bucket_start < COALESCE(_timescaledb_internal.cagg_watermark(29), '-9223372036854775808'::bigint))

So we tried to replicate the same benchmark, just running with _timescaledb_internal.cagg_watermark(29) to see how this function behaves under load. (Note: id 29 -> materialised hypertable of one of the binance views)

Example query, we ran with 100+ queries in parallel:

SELECT _timescaledb_internal.cagg_watermark(29);

After running it in parallel, we immediately noticed now the exact same behaviour we experience on the real time aggregation on the views. But instead of the planning time, we now see the increased execution_time since i assume this query is used in the planning phase of timescaledb for the real time aggregations.

After looking more into the source code, we noticed that it tries to load the cagg_watermark function (basically max(bucket) on the underlying hypertable) for each query on the view with real time aggregation, which causes an extreme load on the database and results in very slow response times. It also seems it is holding a lock on certain tables, which could block the other queries from resuming.

We also saw there have already been attempts to cache the watermark, but it seems they will get invalidated on each new transaction/query: https://github.com/timescale/timescaledb/pull/2828

In order to confirm our suspicion, we also run the exact same queries that the real time aggression would do on a view (but with real time aggregation off, to avoid the cagg_watermark function call):

  • select rows on cagg until last 3 elements in time bucket
  • select with group by + all the other aggregate functions (min/first/last/max) on the underlying hypertable for the last 3 elements (which have not been materialised)

We now saw an improved query times in the form of up to 5-10x faster query times, even when running them in parallel on the database.

With real time on:

100 samples of 100 events
Cumulative:	20m39.62135204s
HMean:		12.383585422s
Avg.:		12.39621352s
p50: 		12.435145364s
p75:		12.73965338s
p95:		12.938279098s
p99:		12.979909272s
p999:		13.096055059s
Long 5%:	12.989388337s
Short 5%:	11.597400698s
Max:		13.096055059s
Min:		11.366040485s
Range:		1.730014574s
StdDev:		393.364365ms
Rate/sec.:	0.08

Without real time + query on the underlying hypertable (both in parallel):

100 samples of 100 events
Cumulative:	47.728964203s
HMean:		24.56293ms
Avg.:		238.644821ms
p50: 		138.6285ms
p75:		472.18237ms
p95:		606.845871ms
p99:		673.48164ms
p999:		703.028222ms
Long 5%:	652.111827ms
Short 5%:	3.135312ms
Max:		703.028222ms
Min:		2.445328ms
Range:		700.582894ms
StdDev:		222.952233ms
Rate/sec.:	4.19

Real time functionality of the views is therefore basically unusable and we have to revert back to querying the cagg with real time off + the underlying hypertable.

Is this behaviour intended? Or is this a configuration issue? If it is intended is there any planned improvement on the real time aggregation + watermark function? Could the watermark of select max(time) from <underlying table> not be cached, until the next insert into that table happens?

Happy to hear any insights/solutions to our problem!

TimescaleDB version affected

2.8.0

PostgreSQL version used

14.5

What operating system did you use?

Timescale Cloud

What installation method did you use?

Other

What platform did you run on?

Timescale Cloud

Relevant log output and stack trace

No response

How can we reproduce the bug?

See schema above

DZDomi avatar Sep 13 '22 18:09 DZDomi

Just set chunk_time_interval 14 days

webpashtet avatar Sep 14 '22 06:09 webpashtet

Hey @webpashtet

The recommendation from the timescale team is to size the chunks of all the hypertables so they can fit in around 25% (shared_buffer size) of the main memory of the instance. We are planning to run 2 instances (1 primary + 1 replica) with 16gb of RAM each. If we account for around 10 hypertables of different exchanges, we want to support, we currently arrive at around 400 Megabyte for each chunk on the hypertable (16gb / 4 (25%) -> 4 GB / 10 hypertables -> 400 Megabyte). Currently the chunk sizes for the exchanges reflect this 400 Megabyte chunk limit.

I am not sure how changing the chunk size to 14 days could help here? We would suddenly have chunks of around 11 Gigabyte (in the example above binance has half a day, 400 Megabyte * 2 * 14 days -> 11 Gigabyte), which would not even fit at all in the main memory anymore. We already experimented in the beginning with bigger chunk sizes and had even worse performance.

Ref: https://www.timescale.com/forum/t/choosing-the-right-chunk-time-interval-value-for-timescaledb-hypertables/116 https://www.timescale.com/blog/timescale-cloud-tips-testing-your-chunk-size/

DZDomi avatar Sep 14 '22 09:09 DZDomi

The decrease in performance is due to the large number of records of exchange transactions. My chunk interval is 14 days and it includes records of trades from the exchange on business days (approximately 1-1.5 million trades per day) and the chunk size is approximately 1.5 GB. When I made the chunk interval 1 day, the chunks were somewhere around 300-500mb too)

webpashtet avatar Sep 14 '22 10:09 webpashtet

Yeah as already mentioned, this is not a solution for us since we are tracking all symbols from all exchanges every minute. This is a lot of data that accumulates and we want to stick to the recommendation by the timescale cloud team (25% of shared buffer size). As discussed above, we had tried with bigger chunks already, but this only improves the performance a bit, but still ends up being significantly higher than just doing the 2 parallels on the cagg + hypertable aggregation.

DZDomi avatar Sep 14 '22 14:09 DZDomi

@DZDomi,

We really appreciate the detailed posts, both here and in the forum. 👍

However, a few questions come to mind that you haven't addressed which might be helpful.

  1. How many CAGGs have you created on top of the underlying hypertable?
  2. What are your refresh policy settings for each CAGG? It seems unlikely that there should be lock contention on the watermark value, but if there is it would indicate to me that maybe the refresh for this CAGG and the query are in contention somehow.

Also, I believe our CSM team has tried to reach out to you a few times. As a trial use on Timescale Cloud, you have the ability to talk directly to support, and working with them might help us all understand this problem more quickly. Please consider connecting with them if that's a possibility.

ryanbooz avatar Sep 14 '22 14:09 ryanbooz

Hey @ryanbooz thanks for your answer

we have been in contact with support, but he recommended we should ask the question here on GitHub.

Regarding the questions:

  • currently we are supporting the following intervals for each table

    • 5 min
    • 15 min
    • 30 min
    • 1 hour
    • 90 minutes
    • 2 hours
    • 3 hours
    • 4 hours
    • 6 hours
    • 12 hours
    • 1 day
  • Our current refresh policy setup:

SELECT add_continuous_aggregate_policy('candles_binance_300',
    start_offset => 900000,
    end_offset => 300000,
    schedule_interval => interval '5 minutes'
);

SELECT add_continuous_aggregate_policy('candles_binance_900',
    start_offset => 2700000,
    end_offset => 900000,
    schedule_interval => interval '15 minutes'
);

SELECT add_continuous_aggregate_policy('candles_binance_1800',
    start_offset => 5400000,
    end_offset => 1800000,
    schedule_interval => interval '30 minutes'
);

SELECT add_continuous_aggregate_policy('candles_binance_3600',
    start_offset => 10800000,
    end_offset => 3600000,
    schedule_interval => interval '1 hour'
);

SELECT add_continuous_aggregate_policy('candles_binance_5400',
    start_offset => 16200000,
    end_offset => 5400000,
    schedule_interval => interval '90 minutes'
);

SELECT add_continuous_aggregate_policy('candles_binance_7200',
    start_offset => 21600000,
    end_offset => 7200000,
    schedule_interval => interval '2 hours'
);

SELECT add_continuous_aggregate_policy('candles_binance_10800',
    start_offset => 32400000,
    end_offset => 10800000,
    schedule_interval => interval '3 hours'
);

SELECT add_continuous_aggregate_policy('candles_binance_14400',
    start_offset => 43200000,
    end_offset => 14400000,
    schedule_interval => interval '4 hours'
);

SELECT add_continuous_aggregate_policy('candles_binance_21600',
    start_offset => 64800000,
    end_offset => 21600000,
    schedule_interval => interval '6 hours'
);

SELECT add_continuous_aggregate_policy('candles_binance_43200',
    start_offset => 129600000,
    end_offset => 43200000,
    schedule_interval => interval '12 hours'
);

SELECT add_continuous_aggregate_policy('candles_binance_86400',
    start_offset => 259200000,
    end_offset => 86400000,
    schedule_interval => interval '1 day'
);

these policies are for each exchange we support, but they all use their own hypertables (candles_coinbase_60, candles_kraken_60 and so on)

DZDomi avatar Sep 14 '22 14:09 DZDomi