rust-prometheus icon indicating copy to clipboard operation
rust-prometheus copied to clipboard

`push_metrics` panics inside tokio

Open yds12 opened this issue 2 years ago • 5 comments

Describe the bug Calling push_metrics from an async function with tokio causes this error:

thread 'main' panicked at 'Cannot drop a runtime in a context where blocking is not allowed. This happens when a runtime is dropped from within an asynchronous context.' .../.cargo/registry/src/github.com-.../tokio-1.20.1/src/runtime/blocking/shutdown.rs:51:21

To Reproduce Take the push example, annotate main with #[tokio::main], using tokio = { version = "1.20", features = ["rt", "rt-multi-thread", "macros"] } in the Cargo.toml. Running the example will produce the error above.

Expected behavior No panic or async-compatible alternative.

yds12 avatar Aug 23 '22 09:08 yds12

It's quite simple to make it work, basically just need to not use reqwest::blocking::Client, and put some async and await here and there (see patch below). But of course that would be a breaking change and force people to use async, so I don't know what you guys would like to do, maybe put the async code behind a feature flag? If you are interested I can send a PR for this.

diff --git a/examples/example_push.rs b/examples/example_push.rs
index 22d0195..c20f94b 100644
--- a/examples/example_push.rs
+++ b/examples/example_push.rs
@@ -26,7 +26,8 @@ lazy_static! {
 }
 
 #[cfg(feature = "push")]
-fn main() {
+#[tokio::main]
+async fn main() {
     let args: Vec<String> = env::args().collect();
     let program = args[0].clone();
 
@@ -63,6 +64,7 @@ fn main() {
                 password: "pass".to_owned(),
             }),
         )
+        .await
         .unwrap();
     }
 
diff --git a/src/push.rs b/src/push.rs
index 525b342..a3d039a 100644
--- a/src/push.rs
+++ b/src/push.rs
@@ -6,7 +6,7 @@ use std::hash::BuildHasher;
 use std::str::{self, FromStr};
 use std::time::Duration;
 
-use reqwest::blocking::Client;
+use reqwest::Client;
 use reqwest::header::CONTENT_TYPE;
 use reqwest::{Method, StatusCode, Url};
 
@@ -52,32 +52,32 @@ pub struct BasicAuthentication {
 /// Note that all previously pushed metrics with the same job and other grouping
 /// labels will be replaced with the metrics pushed by this call. (It uses HTTP
 /// method 'PUT' to push to the Pushgateway.)
-pub fn push_metrics<S: BuildHasher>(
+pub async fn push_metrics<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
     mfs: Vec<proto::MetricFamily>,
     basic_auth: Option<BasicAuthentication>,
 ) -> Result<()> {
-    push(job, grouping, url, mfs, "PUT", basic_auth)
+    push(job, grouping, url, mfs, "PUT", basic_auth).await
 }
 
 /// `push_add_metrics` works like `push_metrics`, but only previously pushed
 /// metrics with the same name (and the same job and other grouping labels) will
 /// be replaced. (It uses HTTP method 'POST' to push to the Pushgateway.)
-pub fn push_add_metrics<S: BuildHasher>(
+pub async fn push_add_metrics<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
     mfs: Vec<proto::MetricFamily>,
     basic_auth: Option<BasicAuthentication>,
 ) -> Result<()> {
-    push(job, grouping, url, mfs, "POST", basic_auth)
+    push(job, grouping, url, mfs, "POST", basic_auth).await
 }
 
 const LABEL_NAME_JOB: &str = "job";
 
-fn push<S: BuildHasher>(
+async fn push<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
@@ -160,7 +160,7 @@ fn push<S: BuildHasher>(
         builder = builder.basic_auth(username, Some(password));
     }
 
-    let response = builder.send().map_err(|e| Error::Msg(format!("{}", e)))?;
+    let response = builder.send().await.map_err(|e| Error::Msg(format!("{}", e)))?;
 
     match response.status() {
         StatusCode::ACCEPTED => Ok(()),
@@ -173,7 +173,7 @@ fn push<S: BuildHasher>(
     }
 }
 
-fn push_from_collector<S: BuildHasher>(
+async fn push_from_collector<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
@@ -187,31 +187,31 @@ fn push_from_collector<S: BuildHasher>(
     }
 
     let mfs = registry.gather();
-    push(job, grouping, url, mfs, method, basic_auth)
+    push(job, grouping, url, mfs, method, basic_auth).await
 }
 
 /// `push_collector` push metrics collected from the provided collectors. It is
 /// a convenient way to push only a few metrics.
-pub fn push_collector<S: BuildHasher>(
+pub async fn push_collector<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
     collectors: Vec<Box<dyn Collector>>,
     basic_auth: Option<BasicAuthentication>,
 ) -> Result<()> {
-    push_from_collector(job, grouping, url, collectors, "PUT", basic_auth)
+    push_from_collector(job, grouping, url, collectors, "PUT", basic_auth).await
 }
 
 /// `push_add_collector` works like `push_add_metrics`, it collects from the
 /// provided collectors. It is a convenient way to push only a few metrics.
-pub fn push_add_collector<S: BuildHasher>(
+pub async fn push_add_collector<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
     collectors: Vec<Box<dyn Collector>>,
     basic_auth: Option<BasicAuthentication>,
 ) -> Result<()> {
-    push_from_collector(job, grouping, url, collectors, "POST", basic_auth)
+    push_from_collector(job, grouping, url, collectors, "POST", basic_auth).await
 }
 
 const DEFAULT_GROUP_LABEL_PAIR: (&str, &str) = ("instance", "unknown");
@@ -264,8 +264,8 @@ mod tests {
         assert!(!map.is_empty());
     }
 
-    #[test]
-    fn test_push_bad_label_name() {
+    #[tokio::test]
+    async fn test_push_bad_label_name() {
         let table = vec![
             // Error message: "pushed metric {} already contains a job label"
             (LABEL_NAME_JOB, "job label"),
@@ -280,7 +280,7 @@ mod tests {
             m.set_label(from_vec!(vec![l]));
             let mut mf = proto::MetricFamily::new();
             mf.set_metric(from_vec!(vec![m]));
-            let res = push_metrics("test", hostname_grouping_key(), "mockurl", vec![mf], None);
+            let res = push_metrics("test", hostname_grouping_key(), "mockurl", vec![mf], None).await;
             assert!(format!("{}", res.unwrap_err()).contains(case.1));
         }
     }

yds12 avatar Aug 23 '22 14:08 yds12

Thanks for the report. Have you perhaps tried the features-set described in https://github.com/tikv/rust-prometheus/issues/342#issuecomment-783738547 already?

lucab avatar Aug 24 '22 09:08 lucab

Hello, thanks for the reply. Just tried here, but the same error occurs.

yds12 avatar Aug 24 '22 14:08 yds12

FWIW, the original fix no longer works. It's failing on the latest tokio (1.23.0) as far as I can tell.

taj-p avatar Dec 25 '22 11:12 taj-p

I wrote prometheus-push as a crate that handles the push functionality, so prometheus crates does not necessarily have to take care of this. prometheus-push works blocking, non-blocking, with this crate or "the other" prometheus_client crate. Or you can implement the traits provided there your self to use it whatever you want.

For this crate and non blocking reqwest it is as easy as that:

[dependencies]
prometheus_push = { version = "<version>", default-features = false, features = ["with_reqwest", "prometheus_crate"] }
use prometheus::labels;
use prometheus_push::prometheus_crate::PrometheusMetricsPusher;
use reqwest::Client;
use url::Url;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let push_gateway: Url = Url::parse("<address to pushgateway>")?;
    let client = Client::new();
    let metrics_pusher = PrometheusMetricsPusher::from(client, &push_gateway)?;
    metrics_pusher
        .push_all(
            "<your push jobs name>",
            &labels! { "<label_name>" => "<label_value>" },
            prometheus::gather(),
        )
        .await?;

    Ok(())
}

maoertel avatar Jun 23 '24 16:06 maoertel