frostdb icon indicating copy to clipboard operation
frostdb copied to clipboard

can we support percentile statistics of aggregation function?

Open jicanghaixb opened this issue 1 year ago • 3 comments

frostdb support the average statistics created based on time windows ,but it can't detect the problem of long tail effects. Can we refer to prometheus to supporting percentile data statistics, which would be even more useful.

func chooseAggregationFunction(
	aggFunc logicalplan.AggFunc,
	dataType arrow.DataType,
	percentile float64,
) (AggregationFunction, error) {
	switch aggFunc {
	case logicalplan.AggFuncSum, logicalplan.AggFuncTopK, logicalplan.AggBottomK, logicalplan.AggRatio, logicalplan.AggFuncSumForAvg:
		return &SumAggregation{}, nil
	case logicalplan.AggFuncMin:
		return &MinAggregation{}, nil
	case logicalplan.AggFuncMax:
		return &MaxAggregation{}, nil
	case logicalplan.AggFuncPercentile:
		switch dataType.ID() {
		case arrow.FLOAT64:
			return &Float64EncodeToBinary{}, nil
		case arrow.BINARY:
			return &PercentileToFloat64{percentile: percentile}, nil
		default:
			return nil, fmt.Errorf("unsupported percentile of type: %s", dataType.Name())
		}
	case logicalplan.AggStdDev:
		switch dataType.ID() {
		case arrow.FLOAT64:
			return &Float64EncodeToBinary{}, nil
		case arrow.BINARY:
			return &Float64StddevAggregation{}, nil
		default:
			return nil, fmt.Errorf("unsupported stddev of type: %s", dataType.Name())
		}
	case logicalplan.AggStdVar:
		switch dataType.ID() {
		case arrow.FLOAT64:
			return &Float64EncodeToBinary{}, nil
		case arrow.BINARY:
			return &Float64StdVarAggregation{}, nil
		default:
			return nil, fmt.Errorf("unsupported stdvar of type: %s", dataType.Name())
		}
	case logicalplan.AggFuncCount, logicalplan.AggFuncCountForAvg:
		return &CountAggregation{}, nil
	default:
		return nil, fmt.Errorf("unsupported aggregation function: %s", aggFunc.String())
	}
}

// -------------------------------------------------------------------------------------------------------

var (
	ErrUnsupportedPercentileFloat64Type = errors.New("unsupported type for percentile aggregation, expected float64")
	ErrUnsupportedPercentileBinaryType  = errors.New("unsupported type for percentile aggregation, expected binary")
)

type PercentileToFloat64 struct {
	percentile float64
}

func (a *PercentileToFloat64) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) {
	if len(arrs) == 0 {
		return array.NewFloat64Builder(pool).NewArray(), nil
	}

	typ := arrs[0].DataType().ID()
	switch typ {
	case arrow.BINARY:
		return percentileFloat64arrays(pool, arrs, a.percentile)
	default:
		return nil, fmt.Errorf("max array of %s: %w", typ, ErrUnsupportedPercentileBinaryType)
	}
}

func percentileFloat64arrays(pool memory.Allocator, arrs []arrow.Array, percentile float64) (arrow.Array, error) {
	res := array.NewFloat64Builder(pool)
	defer res.Release()
	for _, arr := range arrs {
		if arr.Len() == 0 {
			res.AppendNull()
			continue
		}
		f, err := percentileArray(arr.(*array.Binary), percentile)
		if err != nil {
			return nil, err
		}

		res.Append(f)
	}

	return res.NewArray(), nil

}

var valuesFloat64Pool = sync.Pool{
	New: func() any {
		return make([]float64, 0, 100)
	},
}

func percentileArray(arr *array.Binary, percentile float64) (float64, error) {
	dec := Decbuf{}
	values := valuesFloat64Pool.Get().([]float64)
	defer valuesFloat64Pool.Put(values[:0])
	for i := 0; i < arr.Len(); i++ {
		dec.B = arr.Value(i)
		dec.E = nil
		for len(dec.B) > 0 && dec.Err() == nil {
			f := dec.Be64Float64()
			values = append(values, f)
		}
	}
	return quantile(percentile, values), nil
}

// quantile calculates the given quantile of a vector of samples.
//
// The Vector will be sorted.
// If 'values' has zero elements, NaN is returned.
// If q==NaN, NaN is returned.
// If q<0, -Inf is returned.
// If q>1, +Inf is returned.
func quantile(q float64, values []float64) float64 {
	if len(values) == 0 || math.IsNaN(q) {
		return math.NaN()
	}
	if q < 0 {
		return math.Inf(-1)
	}
	if q > 1 {
		return math.Inf(+1)
	}
	sort.Slice(values, func(i, j int) bool {
		return values[i] < values[j]
	})

	n := float64(len(values))
	// When the quantile lies between two samples,
	// we use a weighted average of the two samples.
	rank := q * (n - 1)

	lowerIndex := math.Max(0, math.Floor(rank))
	upperIndex := math.Min(n-1, lowerIndex+1)

	weight := rank - math.Floor(rank)
	return values[int(lowerIndex)]*(1-weight) + values[int(upperIndex)]*weight
}


var (
	ErrUnsupportedStdFloat64Type = errors.New("unsupported type for stddev or stdvar or percentile aggregation, expected float64")
	ErrUnsupportedStdBinaryType  = errors.New("unsupported type for stddev or stdvar aggregation, expected binary")
)

type Float64EncodeToBinary struct{}

func (a *Float64EncodeToBinary) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) {
	if len(arrs) == 0 {
		return array.NewBinaryBuilder(pool, &arrow.BinaryType{}).NewArray(), nil
	}
	typ := arrs[0].DataType().ID()
	switch typ {
	case arrow.FLOAT64:
		return Float64ToBinaryArrays(pool, arrs)
	default:
		return nil, fmt.Errorf("std array of %s: %w", typ, ErrUnsupportedStdFloat64Type)
	}
}

func Float64ToBinaryArrays(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) {
	res := array.NewBinaryBuilder(pool, &arrow.BinaryType{})
	defer res.Release()
	buf := Encbuf{B: make([]byte, 0, 1<<22)}
	for _, arr := range arrs {
		buf.Reset()
		if arr.Len() == 0 {
			res.AppendNull()
			continue
		}
		values := arr.(*array.Float64)
		for _, value := range values.Float64Values() {
			buf.PutBEFloat64(value)
		}

		res.Append(buf.Get())
	}
	return res.NewArray(), nil
}

type Float64StddevAggregation struct{}

func (a *Float64StddevAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) {
	if len(arrs) == 0 {
		return array.NewFloat64Builder(pool).NewArray(), nil
	}

	typ := arrs[0].DataType().ID()
	switch typ {
	case arrow.BINARY:
		return stddevFloat64arrays(pool, arrs)
	default:
		return nil, fmt.Errorf("array of %s: %w", typ, ErrUnsupportedStdBinaryType)
	}
}

func stddevFloat64arrays(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) {
	res := array.NewFloat64Builder(pool)
	defer res.Release()
	for _, arr := range arrs {
		if arr.Len() == 0 {
			res.AppendNull()
			continue
		}
		f, err := stddevArray(arr.(*array.Binary))
		if err != nil {
			return nil, err
		}

		res.Append(f)
	}

	return res.NewArray(), nil
}

func stddevArray(arr *array.Binary) (float64, error) {
	dec := Decbuf{}
	var count float64
	var mean, cMean float64
	var aux, cAux float64
	for i := 0; i < arr.Len(); i++ {
		dec.B = arr.Value(i)
		dec.E = nil
		for len(dec.B) > 0 && dec.Err() == nil {
			f := dec.Be64Float64()
			count++
			delta := f - (mean + cMean)
			mean, cMean = kahanSumInc(delta/count, mean, cMean)
			aux, cAux = kahanSumInc(delta*(f-(mean+cMean)), aux, cAux)
		}
	}
	if count == 0 {
		return 0, nil
	}
	return gomath.Sqrt((aux + cAux) / count), nil
}

func kahanSumInc(inc, sum, c float64) (newSum, newC float64) {
	t := sum + inc
	// Using Neumaier improvement, swap if next term larger than sum.
	if gomath.Abs(sum) >= gomath.Abs(inc) {
		c += (sum - t) + inc
	} else {
		c += (inc - t) + sum
	}
	return t, c
}

type Float64StdVarAggregation struct{}

func (a *Float64StdVarAggregation) Aggregate(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) {
	if len(arrs) == 0 {
		return array.NewFloat64Builder(pool).NewArray(), nil
	}

	typ := arrs[0].DataType().ID()
	switch typ {
	case arrow.BINARY:
		return stdVarFloat64arrays(pool, arrs)
	default:
		return nil, fmt.Errorf("array of %s: %w", typ, ErrUnsupportedStdBinaryType)
	}
}

func stdVarFloat64arrays(pool memory.Allocator, arrs []arrow.Array) (arrow.Array, error) {
	res := array.NewFloat64Builder(pool)
	defer res.Release()
	for _, arr := range arrs {
		if arr.Len() == 0 {
			res.AppendNull()
			continue
		}
		f, err := stdVarArray(arr.(*array.Binary))
		if err != nil {
			return nil, err
		}

		res.Append(f)
	}

	return res.NewArray(), nil
}

func stdVarArray(arr *array.Binary) (float64, error) {
	dec := Decbuf{}
	var count float64
	var mean, cMean float64
	var aux, cAux float64
	for i := 0; i < arr.Len(); i++ {
		dec.B = arr.Value(i)
		dec.E = nil
		for len(dec.B) > 0 && dec.Err() == nil {
			f := dec.Be64Float64()
			count++
			delta := f - (mean + cMean)
			mean, cMean = kahanSumInc(delta/count, mean, cMean)
			aux, cAux = kahanSumInc(delta*(f-(mean+cMean)), aux, cAux)
		}
	}
	if count == 0 {
		return 0, nil
	}
	return (aux + cAux) / count, nil
}

jicanghaixb avatar Jan 04 '24 03:01 jicanghaixb

This issue is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 5 days.

github-actions[bot] avatar Feb 04 '24 01:02 github-actions[bot]

Definitely something we want!

brancz avatar Feb 10 '24 16:02 brancz

This issue is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 5 days.

github-actions[bot] avatar Mar 13 '24 01:03 github-actions[bot]