timescaledb-toolkit icon indicating copy to clipboard operation
timescaledb-toolkit copied to clipboard

Expose `TDigest` type so it can be populated on the client and then inserted into a Timescale table

Open oliora opened this issue 2 years ago • 2 comments

Is your feature request related to a problem? Please describe. I'm aggregating metrics on the client into a histogram and I want to store this aggregated data in TimescaleDB and be able to use continuous aggregates on this data to downsample and analyze it with the help of TimescaleDB hyperfunctions. I have a control over how client aggregates the data the only limitation is that it has to be aggregated. Applications that need to store data in such a way are written in C++ and Python.

Describe the solution you'd like I'd like to aggregate data on the client into some structure that I can later insert into TimescaleDB table as a TDigest value. Then I can use all the TimescaleDB functions that work over TDigest type (e.g. rollup, approx_percentile etc). This also unifies the approaches between storing aggregated and non-aggregated data in TimescaleDB.

The histogram collected on the client currently has min, max, sum, count values and a set of buckets with counters and I suspect that it's pretty close to TDigest format already.

It would be great if TimescaleDB library exposes C-API to work with TDigest objects (create, update with new data samples etc) and allows to insert the final value to TimescaleDB.

Describe alternatives you've considered I've considered two alternatives:

  1. Collect metrics via Promscale. This is a viable alternative, but Promscale histogram has a limited use with continuous aggregates and has other limitations coming from the Prometheus metrics format.
  2. Store non-aggregated data in TimescaleDB and attach a materialized view with continuos aggregate. This works for some of my use cases but not for all. In some cases it is not possible to transfer all the non-aggregated data to TimescaleDB due to it's amount and/or limited connectivity.

Update: The part of being able to insert TDigest object from the client is the most important here because I can implement the calculation part myself.

oliora avatar Aug 01 '22 17:08 oliora

Hi, we'd love to get t-digest importable too. We have a data pipeline where we pre-compute t-digests (and other metrics) in several places long before they reach timescale due to the sheer size of the data. Timescale holds the aggregated metrics and then does further re-aggregations (downsampling, etc.). Since we can't ship all the raw data to timescale, we have to have a way to import t-digests into it, otherwise we can't use it.

yalon avatar Nov 08 '22 08:11 yalon

I'm trying to get a tdigest into timescaledb:

DbError { severity: "ERROR", parsed_severity: Some(Error), code: SqlState(E42883), message: "no binary input function available for type tdigest", detail: None, hint: None, position: None, where_: None, schema: None, table: None, column: None, datatype: None, constraint: None, file: Some("lsyscache.c"), line: Some(2913), routine: Some("getTypeBinaryInputInfo") }

I use a binary ingest like this:

copy metrics (latency) from stdin with binary

and I was trying for a hello-world via:

impl ToSql for SqlTdigest {
    fn to_sql(
        &self,
        ty: &Type,
        out: &mut bytes::BytesMut,
    ) -> Result<postgres_types::IsNull, Box<dyn std::error::Error + Sync + Send>>
    where
        Self: Sized,
    {
        // I would expect a deserialization error or something, not an "I don't know how to attempt deserialization"
        postgres_protocol::types::text_to_sql(
            &format!(
                "(version:1,max_buckets:{},count:{},sum:{},min:{},max:{},centroids:[{}])",
                self.max_buckets,
                self.count,
                self.sum,
                self.min,
                self.max,
                self.centroids
                    .iter()
                    .map(|c| format!("(mean:{},weight:{})", c.mean, c.weight))
                    .join(",")
            ),
            out,
        );
        Ok(postgres_types::IsNull::No)
    }

    fn accepts(ty: &Type) -> bool
    where
        Self: Sized,
    {
        ty.name() == "tdigest"
    }

    postgres_types::to_sql_checked!();
}

because I observed that a text literal to tdigest works:

 select '(version:1,max_buckets:10,count:1,sum:42,min:42,max:42,centroids:[(mean:42,weight:1)])'::tdigest;
                                             tdigest
--------------------------------------------------------------------------------------------------
 (version:1,buckets:1,max_buckets:10,count:1,sum:42,min:42,max:42,centroids:[(mean:42,weight:1)])

Is there a binary-friendly ingest pattern that works with:

\dx
                                                    List of installed extensions
        Name         | Version |   Schema   |                                      Description
---------------------+---------+------------+---------------------------------------------------------------------------------------
 timescaledb         | 2.9.0   | public     | Enables scalable inserts and complex queries for time-series data
 timescaledb_toolkit | 1.13.0  | public     | Library of analytical hyperfunctions, time-series pipelining, and other SQL utilities

for posterity, here's the tokio_postgres stuff I'm using:

let sink = connection
    .copy_in::<String, bytes::Bytes>(&format!(
        "copy {table_name} ({all_columns}) from stdin with binary",
        table_name = "test",
        all_columns = "dig",
    ))
    .await?;
let writer = tokio_postgres::binary_copy::BinaryCopyInWriter::new(
    sink,
    &vec![histogram_types.tdigest_type.clone()],
);
futures::pin_mut!(writer);
let a_tdigest = crate::server::postgres_things::statistic_set::SqlTdigest {
    version: 1,
    max_buckets: 10,
    count: 1,
    sum: 42.0,
    min: 42.0,
    max: 42.0,
    centroids: vec![crate::server::postgres_things::statistic_set::SqlCentroid {
        mean: 42.0,
        weight: 1,
    }],
};
writer.as_mut().write(&[&a_tdigest]).await?;
writer.finish().await?;

I see that the tdigest from text path is covered in tests: https://github.com/timescale/timescaledb-toolkit/blob/main/extension/src/tdigest.rs#L614 So I think the only thing I'm missing is how to get it in via a standard bulk copy.

kvc0 avatar Dec 21 '22 20:12 kvc0