timescaledb-toolkit
timescaledb-toolkit copied to clipboard
Expose `TDigest` type so it can be populated on the client and then inserted into a Timescale table
Is your feature request related to a problem? Please describe. I'm aggregating metrics on the client into a histogram and I want to store this aggregated data in TimescaleDB and be able to use continuous aggregates on this data to downsample and analyze it with the help of TimescaleDB hyperfunctions. I have a control over how client aggregates the data the only limitation is that it has to be aggregated. Applications that need to store data in such a way are written in C++ and Python.
Describe the solution you'd like
I'd like to aggregate data on the client into some structure that I can later insert into TimescaleDB table as a TDigest
value. Then I can use all the TimescaleDB functions that work over TDigest
type (e.g. rollup
, approx_percentile
etc). This also unifies the approaches between storing aggregated and non-aggregated data in TimescaleDB.
The histogram collected on the client currently has min, max, sum, count values and a set of buckets with counters and I suspect that it's pretty close to TDigest
format already.
It would be great if TimescaleDB library exposes C-API to work with TDigest
objects (create, update with new data samples etc) and allows to insert the final value to TimescaleDB.
Describe alternatives you've considered I've considered two alternatives:
- Collect metrics via Promscale. This is a viable alternative, but Promscale histogram has a limited use with continuous aggregates and has other limitations coming from the Prometheus metrics format.
- Store non-aggregated data in TimescaleDB and attach a materialized view with continuos aggregate. This works for some of my use cases but not for all. In some cases it is not possible to transfer all the non-aggregated data to TimescaleDB due to it's amount and/or limited connectivity.
Update: The part of being able to insert TDigest
object from the client is the most important here because I can implement the calculation part myself.
Hi, we'd love to get t-digest importable too. We have a data pipeline where we pre-compute t-digests (and other metrics) in several places long before they reach timescale due to the sheer size of the data. Timescale holds the aggregated metrics and then does further re-aggregations (downsampling, etc.). Since we can't ship all the raw data to timescale, we have to have a way to import t-digests into it, otherwise we can't use it.
I'm trying to get a tdigest into timescaledb:
DbError { severity: "ERROR", parsed_severity: Some(Error), code: SqlState(E42883), message: "no binary input function available for type tdigest", detail: None, hint: None, position: None, where_: None, schema: None, table: None, column: None, datatype: None, constraint: None, file: Some("lsyscache.c"), line: Some(2913), routine: Some("getTypeBinaryInputInfo") }
I use a binary ingest like this:
copy metrics (latency) from stdin with binary
and I was trying for a hello-world via:
impl ToSql for SqlTdigest {
fn to_sql(
&self,
ty: &Type,
out: &mut bytes::BytesMut,
) -> Result<postgres_types::IsNull, Box<dyn std::error::Error + Sync + Send>>
where
Self: Sized,
{
// I would expect a deserialization error or something, not an "I don't know how to attempt deserialization"
postgres_protocol::types::text_to_sql(
&format!(
"(version:1,max_buckets:{},count:{},sum:{},min:{},max:{},centroids:[{}])",
self.max_buckets,
self.count,
self.sum,
self.min,
self.max,
self.centroids
.iter()
.map(|c| format!("(mean:{},weight:{})", c.mean, c.weight))
.join(",")
),
out,
);
Ok(postgres_types::IsNull::No)
}
fn accepts(ty: &Type) -> bool
where
Self: Sized,
{
ty.name() == "tdigest"
}
postgres_types::to_sql_checked!();
}
because I observed that a text
literal to tdigest
works:
select '(version:1,max_buckets:10,count:1,sum:42,min:42,max:42,centroids:[(mean:42,weight:1)])'::tdigest;
tdigest
--------------------------------------------------------------------------------------------------
(version:1,buckets:1,max_buckets:10,count:1,sum:42,min:42,max:42,centroids:[(mean:42,weight:1)])
Is there a binary-friendly ingest pattern that works with:
\dx
List of installed extensions
Name | Version | Schema | Description
---------------------+---------+------------+---------------------------------------------------------------------------------------
timescaledb | 2.9.0 | public | Enables scalable inserts and complex queries for time-series data
timescaledb_toolkit | 1.13.0 | public | Library of analytical hyperfunctions, time-series pipelining, and other SQL utilities
for posterity, here's the tokio_postgres stuff I'm using:
let sink = connection
.copy_in::<String, bytes::Bytes>(&format!(
"copy {table_name} ({all_columns}) from stdin with binary",
table_name = "test",
all_columns = "dig",
))
.await?;
let writer = tokio_postgres::binary_copy::BinaryCopyInWriter::new(
sink,
&vec![histogram_types.tdigest_type.clone()],
);
futures::pin_mut!(writer);
let a_tdigest = crate::server::postgres_things::statistic_set::SqlTdigest {
version: 1,
max_buckets: 10,
count: 1,
sum: 42.0,
min: 42.0,
max: 42.0,
centroids: vec![crate::server::postgres_things::statistic_set::SqlCentroid {
mean: 42.0,
weight: 1,
}],
};
writer.as_mut().write(&[&a_tdigest]).await?;
writer.finish().await?;
I see that the tdigest from text path is covered in tests: https://github.com/timescale/timescaledb-toolkit/blob/main/extension/src/tdigest.rs#L614 So I think the only thing I'm missing is how to get it in via a standard bulk copy.