opentelemetry-rust icon indicating copy to clipboard operation
opentelemetry-rust copied to clipboard

[metric] How to implement a batch counter

Open nappa85 opened this issue 2 years ago • 16 comments

I'm trying to build a metric that sends the number of hits per page on a webserver. Using simply Metric::u64_counter creates a global counter that doesn't resets at every metrics batch, this way I have an growing value tending to infinity.

Looking at Meter APIs, the only way I can think of is to use Meter::batch_observer and, inside here, use a BatchObserver::u64_value_observer, but to do so I have to implement myself a global counter. Like, if I want to keep a simple global counter, an AtomicUsize would be enough, but if I want to distinct between pages, e.g. putting the path as an attribute on the observer, the situation becomes quickly complicated, like having a global HashMap with method and path as key and the counter as value and, at every batch, empty it and send all values...

Am I correct or there is a simplier way?

nappa85 avatar Dec 10 '21 15:12 nappa85

I thought about that only now, probably the best solution is to use Metric::u64_value_recorder and put always 1 as value, letting the consumer to sum the values in the time range, grouping by attributes

nappa85 avatar Dec 10 '21 17:12 nappa85

I was wrong, the sum is still monotonically growing... I need exact values to get what I want, so I'm back to a global hashmap to be rest at every batch... It's a bit overcomplicated for something so simple, there isn't a better solution?

nappa85 avatar Dec 13 '21 14:12 nappa85

Please correct me if I misunderstand. But I think you are looking to count the hits per page in the last metrics batch?

But from the metrics SDK side, counter by design is monotonically increasing. Usually, ppl send counter to the backend and use something like the increase function to get the value over the last period of time

TommyCpp avatar Dec 15 '21 15:12 TommyCpp

I'm sending metrics via otlp to an external service (newrelic). At the moment I solved with a global hashmap and a value observer inside a batch observer. Using a sum_value, if on first batch I had 10 hits, and on the second 6 hits, for second batch I would see 16 hits, that's not what I want

nappa85 avatar Dec 15 '21 15:12 nappa85

Using a sum_value, if on first batch I had 10 hits, and on the second 6 hits, for second batch I would see 16 hits, that's not what I want

Feels like something to do with aggregation, could you share your config for the otlp exporter/pipeline? With proper setup, it should allow you to have a stateless value recorder. If it's not then we may need to investigate this as a possible bug.

TommyCpp avatar Dec 16 '21 15:12 TommyCpp

I init metrics with a methos like that:


fn init_meter(apikey: &str) -> metrics::Result<PushController> {
    let mut map = tonic::metadata::MetadataMap::with_capacity(1);
    map.insert(
        "api-key",
        apikey.parse().unwrap(),
    );
    opentelemetry_otlp::new_pipeline()
        .metrics(spawn, delayed_interval)
        .with_exporter(
            opentelemetry_otlp::new_exporter()
                .tonic()
                .with_endpoint("https://otlp.nr-data.net:4317/")
                .with_protocol(opentelemetry_otlp::Protocol::Grpc)
                .with_metadata(map),
        )
        .with_aggregator_selector(selectors::simple::Selector::Exact)
        .with_period(Duration::from_secs(10))
        .with_resource(vec![
            KeyValue::new("service.name", "otlp-test"),
            KeyValue::new("service.namespace", "otlp-test"),
            KeyValue::new("service.instance.id", "test"),
            KeyValue::new("service.version", "0.1.0"),
        ])
        .build()
}

Just a side note, as you can notice I need to use tonic structs to add metadata to the call, it would be cool if opentelemetry-otlp reexports tonic, since it isn't the latest version.

nappa85 avatar Dec 16 '21 16:12 nappa85

To give you a visual example, this is a u64_counter metric, I'm calling the single endpoints 20 times a minute, so I expect it to be a flatline, not to grow image (more, as you can see here, metrics at come point stops, but now every metric, it's quite strange and I'm still trying to undestand which element is faulty, if opentelemetry-rust, opentelemetry-collector or newrelic)

nappa85 avatar Dec 17 '21 13:12 nappa85

Thanks for the detailed example. It's really helpful. I will try to reproduce this today. I will probably use stdout to observe the result of each batch and see if there is something wrong

TommyCpp avatar Dec 17 '21 14:12 TommyCpp

I've made another test to clarify which element is failing, and it seems to be opentelemetry-rust. I've attached a prometheus instance to the same collector that sends data to newrelic, the data stoppend for prometheus too: Screenshot_20211217_155235 Screenshot_20211217_155256

I have no logs on my services that makes me think about some failure on otlp exporter, it always lasts more or less 10 minutes and then it stops.

nappa85 avatar Dec 17 '21 14:12 nappa85

Thanks. So in general we have the following issues:

  • Batch seems to be accumulative instead of resetting at each reporting period
  • Metrics stops to changes after a while even if it should.

TommyCpp avatar Dec 17 '21 15:12 TommyCpp

I've been able to replicate the problem with a "minimal" example:

use futures_util::{Stream, StreamExt, Future};
use hyper::{
    body,
    service::{make_service_fn, service_fn},
    Body, Request, Response, Server,
};
use once_cell::sync::Lazy;
use opentelemetry::{
    global,
    metrics::{self, Meter, ValueRecorder, Counter},
    sdk::{metrics::{selectors, PushController}},
    KeyValue,
};
use opentelemetry_otlp::WithExportConfig;
use tokio::task::JoinHandle;
use std::time::{Duration, SystemTime};

static PUSH_CONTROLLER: Lazy<PushController> = Lazy::new(|| init_meter().unwrap());
static METER: Lazy<Meter> = Lazy::new(|| {
    Lazy::force(&PUSH_CONTROLLER);
    global::meter("abracadabra")
});
static HTTP_COUNTER: Lazy<Counter<u64>> = Lazy::new(|| {
    METER.u64_counter("hits.counter")
        .with_description("hit counter")
        .init()
});
static HTTP_REQ_HISTOGRAM: Lazy<ValueRecorder<f64>> = Lazy::new(|| {
    METER.f64_value_recorder("value.duration")
        .with_description("request latencies")
        .init()
});

// Skip first immediate tick from tokio, not needed for async_std.
fn delayed_interval(duration: Duration) -> impl Stream<Item = tokio::time::Instant> {
    println!("call to delayed_interval with duration {:?}", duration);
    opentelemetry::util::tokio_interval_stream(duration).skip(1).map(|i| {
        println!("Interval passed");
        i
    })
}

fn spawn<T>(f: T) -> JoinHandle<T::Output>
where
    T: Future + Send + 'static,
    T::Output: Send + 'static,
{
    println!("Called spawn");
    tokio::spawn(f)
}

fn init_meter() -> metrics::Result<PushController> {
    opentelemetry_otlp::new_pipeline()
        .metrics(spawn, delayed_interval)
        .with_exporter(
            opentelemetry_otlp::new_exporter()
                .tonic()
                .with_endpoint("https://127.0.0.1:4317/")
                .with_protocol(opentelemetry_otlp::Protocol::Grpc),
        )
        .with_aggregator_selector(selectors::simple::Selector::Exact)
        .with_period(Duration::from_secs(10))
        .with_resource(vec![
            KeyValue::new("service.name", "otlp-test"),
            KeyValue::new("service.namespace", "otlp-test"),
            KeyValue::new("service.instance.id", "test"),
            KeyValue::new("service.version", "0.1.0"),
        ])
        .build()
}

fn start_tracing() {
    tracing_subscriber::fmt::init();
}

fn start_metrics() {
    Lazy::force(&METER);
}

async fn serve_req(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    let request_start = SystemTime::now();

    let attributes = &[
        KeyValue::new("method", req.method().to_string()),
        KeyValue::new("path", req.uri().path().to_owned()),
    ];

    let response = match body::to_bytes(req.into_body()).await {
        Ok(bytes) => {
            Response::builder()
                .status(200)
                .body(Body::from(bytes))
                .unwrap()
        }
        Err(e) => Response::builder()
            .status(500)
            .body(Body::from(e.to_string()))
            .unwrap(),
    };

    let duration = request_start.elapsed().unwrap_or_default();
    HTTP_REQ_HISTOGRAM.record(duration.as_secs_f64(), attributes);
    HTTP_COUNTER.add(1, attributes);

    Ok(response)
}

#[tokio::main]
async fn main() {
    start_tracing();
    start_metrics();

    let addr = ([127, 0, 0, 1], 9898).into();
    println!("Listening on http://{}", addr);

    let serve_future = Server::bind(&addr).serve(make_service_fn(|_| async {
        Ok::<_, hyper::Error>(service_fn(serve_req))
    }));

    if let Err(err) = serve_future.await {
        eprintln!("server error: {}", err);
    }
}

It's an echo webserver that logs metrics for called paths, it lasted more or less 50 minutes before stopping, I've started with a call every 0.5 seconds but at some point I've bombed it with 3 different parallel calls without pauses, with:

while [ true ]; do curl -d "kijhfadsohiuagdhoiuagdohisgadhopsigd" http://127.0.0.1:9898/trallalla & curl -d "asf" http://127.0.0.1:9898/trololo & curl -d "srhsdhf" http://127.0.0.1:9898/trelele & echo 1; done

image

nappa85 avatar Dec 17 '21 20:12 nappa85

  • Batch seems to be accumulative instead of resetting at each reporting period

So I dig a little bit. I think we can add .with_export_kind(ExportKindSelector::Delta) to the OTLP metrics pipeline, this should force the SDK to reset in every batch.

TommyCpp avatar Dec 19 '21 01:12 TommyCpp

  • Metrics stops to changes after a while even if it should.

I was able to reproduce the problem fairly quickly using value_recorder and the script you provided. Thanks a lot!

Here is my current theory on the issue:

I notice you are using a value_recorder with Exact aggregation(and also cumulative). This will consume a lot of memory and also slow down the exporting because with Exact aggregation value_recorder will use array aggregation. This aggregation will send every data point to the back end. That will put a lot of pressure when exporting because the exporter will clone some of the values.

In this case, maybe use ExportKindSelector::Delta will help. Or you can try to compute histogram within application using Histogram aggregator selector.

TommyCpp avatar Dec 19 '21 01:12 TommyCpp

Is it possible to output an error log when this happens? It would be very helpful for troubleshooting.

Can I help with patch development? Or just with the tests?

Il dom 19 dic 2021, 02:40 Zhongyang Wu @.***> ha scritto:

  • Metrics stops to changes after a while even if it should.

I was able to reproduce the problem fairly quickly with value_recorder. Here is my current theory.

I notice you are using a value_recorder with Exact aggregation(and also cumulative). This will consume a lot of memory and also slow down the exporting because with Exact aggregation value_recorder will use array aggregation https://github.com/open-telemetry/opentelemetry-rust/blob/ccc8fb210c046856825e283940850478521c9c8b/opentelemetry/src/sdk/metrics/selectors/simple.rs#L49. This aggregation will send every data point to the back end. That will put a lot of pressure when exporting because the exporter will clone some of the values.

In this case, maybe use ExportKindSelector::Delta will help. Or you can try to compute histogram within application using Histogram https://github.com/open-telemetry/opentelemetry-rust/blob/ccc8fb210c046856825e283940850478521c9c8b/opentelemetry/src/sdk/metrics/selectors/simple.rs#L26 .

— Reply to this email directly, view it on GitHub https://github.com/open-telemetry/opentelemetry-rust/issues/677#issuecomment-997313622, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABZXINMP2QMVJU5YSOSSHIDURUZYZANCNFSM5JZODPUQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

You are receiving this because you authored the thread.Message ID: @.***>

nappa85 avatar Dec 19 '21 13:12 nappa85

Is it possible to output an error log when this happens? It would be very helpful for troubleshooting

Yeah, I think we should probably have some kind of load testing for OTLPmetrics exporter.

Also may help us find the bottleneck.

As of cumulative vs delta. It's more like a feature and usually, users use cumulative. But yeah we should probably include some kind of warning when users use cumulative and exact aggregation.

Contribution of load testing and documentation are always welcome 😄

TommyCpp avatar Dec 21 '21 14:12 TommyCpp

Sorry for the long silence. I'm doing some more tests, with Selector::Inexpensive I've some ValueRecorder<f64> used to track query times that stopped working, both using ExportKindSelector::Delta and ExportKindSelector::Stateless. Now I'm testing with Selector::Exact and ExportKindSelector::Delta, values started flowing again, I'm waiting some hours to see if this thread's original problem shows up again

nappa85 avatar Jan 10 '22 11:01 nappa85

Closing this for now as metrics have been rewritten. Feel free to re-open if the issue occurs again.

jtescher avatar Jul 30 '23 17:07 jtescher