timescaledb-toolkit
timescaledb-toolkit copied to clipboard
Merge multiple state aggs to one union
Is your feature request related to a problem? Please describe. I would like to merge multiple state timelines depicting when multiple systems are in state A, so that I get an intersection of these timelines which shows when all systems are in state A at the same time.
Describe the solution you'd like
I tried implementing a solution with rollup()
, since that function is described as an aggregator of state aggregates, which is more or less what I am trying to achieve. However, rollup()
seems to make other types of calculations on non-overlapping aggregates, meaning that the GROUP BY
used should be at least on the timestamp.
There might be a need for a new function that merges timelines, either using UNION or INTERSECTION.
My implementation try:
WITH buckets AS (SELECT system_id, state_agg(ts, value) AS sa
FROM system_states
WHERE (system_id = 1 OR system_id = 2)
GROUP BY system_id)
SELECT rollup(buckets.sa)
FROM buckets;
However, I am not sure if the rollup()
approach could support interpolated state aggregates, so that we could get the previous/next one states (gap filling with the previous value).
Describe alternatives you've considered Using SQL I can achieve the same result with unions and window functions:
WITH buckets AS (SELECT system_id, state_agg(ts, value) AS sa
FROM system_states
WHERE (system_id IN (43, 44))
GROUP BY system_id),
merged_timelines AS (SELECT buckets.system_id, (state_periods(buckets.sa, 1)).*, COUNT(*) OVER () AS total_systems
FROM buckets),
periodTimeline AS (SELECT system_id, start_time AS moment, 1 AS priority, total_systems
FROM merged_timelines
UNION ALL
(SELECT system_id, end_time AS moment, -1 AS priority, total_systems
FROM merged_timelines)
ORDER BY time)
SELECT start, stop
FROM (SELECT CASE
WHEN SUM(priority) OVER (
ORDER BY moment ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
) = total_systems
THEN LAG(moment) OVER (ORDER BY moment)
ELSE NULL END AS start,
moment AS stop
FROM periodTimeline) counters
WHERE start IS NOT NULL
AND start <> stop;
This might not be the prettiest implementation but is actually fast and does the merge no matter how many systems I am tracking. I can also use interpolated_state_periods
to get the preceding state in case the initial buckets do not include the entire dataset.