frostdb
frostdb copied to clipboard
can we support percentile statistics of aggregation function?
frostdb support the average statistics created based on time windows ,but it can't detect the problem of long tail effects. Can we refer to prometheus to supporting percentile data statistics, which would be even more useful.
func chooseAggregationFunction(
aggFunc logicalplan.AggFunc,
dataType arrow.DataType,
percentile float64,
) (AggregationFunction, error) {
switch aggFunc {
case logicalplan.AggFuncSum, logicalplan.AggFuncTopK, logicalplan.AggBottomK, logicalplan.AggRatio, logicalplan.AggFuncSumForAvg:
return &SumAggregation{}, nil
case logicalplan.AggFuncMin:
return &MinAggregation{}, nil
case logicalplan.AggFuncMax:
return &MaxAggregation{}, nil
case logicalplan.AggFuncPercentile:
switch dataType.ID() {
case arrow.FLOAT64:
return &Float64EncodeToBinary{}, nil
case arrow.BINARY:
return &PercentileToFloat64{percentile: percentile}, nil
default:
return nil, fmt.Errorf("unsupported percentile of type: %s", dataType.Name())
}
case logicalplan.AggStdDev:
switch dataType.ID() {
case arrow.FLOAT64:
return &Float64EncodeToBinary{}, nil
case arrow.BINARY:
return &Float64StddevAggregation{}, nil
default:
return nil, fmt.Errorf("unsupported stddev of type: %s", dataType.Name())
}
case logicalplan.AggStdVar:
switch dataType.ID() {
case arrow.FLOAT64:
return &Float64EncodeToBinary{}, nil
case arrow.BINARY:
return &Float64StdVarAggregation{}, nil
default:
return nil, fmt.Errorf("unsupported stdvar of type: %s", dataType.Name())
}
case logicalplan.AggFuncCount, logicalplan.AggFuncCountForAvg:
return &CountAggregation{}, nil
default:
return nil, fmt.Errorf("unsupported aggregation function: %s", aggFunc.String())
}
}
// -------------------------------------------------------------------------------------------------------
var (
ErrUnsupportedPercentileFloat64Type = errors.New("unsupported type for percentile aggregation, expected float64")
ErrUnsupportedPercentileBinaryType = errors.New("unsupported type for percentile aggregation, expected binary")
)
type PercentileToFloat64 struct {
percentile float64
}
func (a *PercentileToFloat64) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) {
if len(arrs) == 0 {
return array.NewFloat64Builder(pool).NewArray(), nil
}
typ := arrs[0].DataType().ID()
switch typ {
case arrow.BINARY:
return percentileFloat64arrays(pool, arrs, a.percentile)
default:
return nil, fmt.Errorf("max array of %s: %w", typ, ErrUnsupportedPercentileBinaryType)
}
}
func percentileFloat64arrays(pool memory.Allocator, arrs []arrow.Array, percentile float64) (arrow.Array, error) {
res := array.NewFloat64Builder(pool)
defer res.Release()
for _, arr := range arrs {
if arr.Len() == 0 {
res.AppendNull()
continue
}
f, err := percentileArray(arr.(*array.Binary), percentile)
if err != nil {
return nil, err
}
res.Append(f)
}
return res.NewArray(), nil
}
var valuesFloat64Pool = sync.Pool{
New: func() any {
return make([]float64, 0, 100)
},
}
func percentileArray(arr *array.Binary, percentile float64) (float64, error) {
dec := Decbuf{}
values := valuesFloat64Pool.Get().([]float64)
defer valuesFloat64Pool.Put(values[:0])
for i := 0; i < arr.Len(); i++ {
dec.B = arr.Value(i)
dec.E = nil
for len(dec.B) > 0 && dec.Err() == nil {
f := dec.Be64Float64()
values = append(values, f)
}
}
return quantile(percentile, values), nil
}
// quantile calculates the given quantile of a vector of samples.
//
// The Vector will be sorted.
// If 'values' has zero elements, NaN is returned.
// If q==NaN, NaN is returned.
// If q<0, -Inf is returned.
// If q>1, +Inf is returned.
func quantile(q float64, values []float64) float64 {
if len(values) == 0 || math.IsNaN(q) {
return math.NaN()
}
if q < 0 {
return math.Inf(-1)
}
if q > 1 {
return math.Inf(+1)
}
sort.Slice(values, func(i, j int) bool {
return values[i] < values[j]
})
n := float64(len(values))
// When the quantile lies between two samples,
// we use a weighted average of the two samples.
rank := q * (n - 1)
lowerIndex := math.Max(0, math.Floor(rank))
upperIndex := math.Min(n-1, lowerIndex+1)
weight := rank - math.Floor(rank)
return values[int(lowerIndex)]*(1-weight) + values[int(upperIndex)]*weight
}
var (
ErrUnsupportedStdFloat64Type = errors.New("unsupported type for stddev or stdvar or percentile aggregation, expected float64")
ErrUnsupportedStdBinaryType = errors.New("unsupported type for stddev or stdvar aggregation, expected binary")
)
type Float64EncodeToBinary struct{}
func (a *Float64EncodeToBinary) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) {
if len(arrs) == 0 {
return array.NewBinaryBuilder(pool, &arrow.BinaryType{}).NewArray(), nil
}
typ := arrs[0].DataType().ID()
switch typ {
case arrow.FLOAT64:
return Float64ToBinaryArrays(pool, arrs)
default:
return nil, fmt.Errorf("std array of %s: %w", typ, ErrUnsupportedStdFloat64Type)
}
}
func Float64ToBinaryArrays(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) {
res := array.NewBinaryBuilder(pool, &arrow.BinaryType{})
defer res.Release()
buf := Encbuf{B: make([]byte, 0, 1<<22)}
for _, arr := range arrs {
buf.Reset()
if arr.Len() == 0 {
res.AppendNull()
continue
}
values := arr.(*array.Float64)
for _, value := range values.Float64Values() {
buf.PutBEFloat64(value)
}
res.Append(buf.Get())
}
return res.NewArray(), nil
}
type Float64StddevAggregation struct{}
func (a *Float64StddevAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) {
if len(arrs) == 0 {
return array.NewFloat64Builder(pool).NewArray(), nil
}
typ := arrs[0].DataType().ID()
switch typ {
case arrow.BINARY:
return stddevFloat64arrays(pool, arrs)
default:
return nil, fmt.Errorf("array of %s: %w", typ, ErrUnsupportedStdBinaryType)
}
}
func stddevFloat64arrays(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) {
res := array.NewFloat64Builder(pool)
defer res.Release()
for _, arr := range arrs {
if arr.Len() == 0 {
res.AppendNull()
continue
}
f, err := stddevArray(arr.(*array.Binary))
if err != nil {
return nil, err
}
res.Append(f)
}
return res.NewArray(), nil
}
func stddevArray(arr *array.Binary) (float64, error) {
dec := Decbuf{}
var count float64
var mean, cMean float64
var aux, cAux float64
for i := 0; i < arr.Len(); i++ {
dec.B = arr.Value(i)
dec.E = nil
for len(dec.B) > 0 && dec.Err() == nil {
f := dec.Be64Float64()
count++
delta := f - (mean + cMean)
mean, cMean = kahanSumInc(delta/count, mean, cMean)
aux, cAux = kahanSumInc(delta*(f-(mean+cMean)), aux, cAux)
}
}
if count == 0 {
return 0, nil
}
return gomath.Sqrt((aux + cAux) / count), nil
}
func kahanSumInc(inc, sum, c float64) (newSum, newC float64) {
t := sum + inc
// Using Neumaier improvement, swap if next term larger than sum.
if gomath.Abs(sum) >= gomath.Abs(inc) {
c += (sum - t) + inc
} else {
c += (inc - t) + sum
}
return t, c
}
type Float64StdVarAggregation struct{}
func (a *Float64StdVarAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) {
if len(arrs) == 0 {
return array.NewFloat64Builder(pool).NewArray(), nil
}
typ := arrs[0].DataType().ID()
switch typ {
case arrow.BINARY:
return stdVarFloat64arrays(pool, arrs)
default:
return nil, fmt.Errorf("array of %s: %w", typ, ErrUnsupportedStdBinaryType)
}
}
func stdVarFloat64arrays(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) {
res := array.NewFloat64Builder(pool)
defer res.Release()
for _, arr := range arrs {
if arr.Len() == 0 {
res.AppendNull()
continue
}
f, err := stdVarArray(arr.(*array.Binary))
if err != nil {
return nil, err
}
res.Append(f)
}
return res.NewArray(), nil
}
func stdVarArray(arr *array.Binary) (float64, error) {
dec := Decbuf{}
var count float64
var mean, cMean float64
var aux, cAux float64
for i := 0; i < arr.Len(); i++ {
dec.B = arr.Value(i)
dec.E = nil
for len(dec.B) > 0 && dec.Err() == nil {
f := dec.Be64Float64()
count++
delta := f - (mean + cMean)
mean, cMean = kahanSumInc(delta/count, mean, cMean)
aux, cAux = kahanSumInc(delta*(f-(mean+cMean)), aux, cAux)
}
}
if count == 0 {
return 0, nil
}
return (aux + cAux) / count, nil
}
This issue is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 5 days.
Definitely something we want!
This issue is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 5 days.