rust-prometheus icon indicating copy to clipboard operation
rust-prometheus copied to clipboard

`push_metrics` panics inside tokio

Open yds12 opened this issue 1 year ago • 5 comments

Describe the bug Calling push_metrics from an async function with tokio causes this error:

thread 'main' panicked at 'Cannot drop a runtime in a context where blocking is not allowed. This happens when a runtime is dropped from within an asynchronous context.' .../.cargo/registry/src/github.com-.../tokio-1.20.1/src/runtime/blocking/shutdown.rs:51:21

To Reproduce Take the push example, annotate main with #[tokio::main], using tokio = { version = "1.20", features = ["rt", "rt-multi-thread", "macros"] } in the Cargo.toml. Running the example will produce the error above.

Expected behavior No panic or async-compatible alternative.

yds12 avatar Aug 23 '22 09:08 yds12

It's quite simple to make it work, basically just need to not use reqwest::blocking::Client, and put some async and await here and there (see patch below). But of course that would be a breaking change and force people to use async, so I don't know what you guys would like to do, maybe put the async code behind a feature flag? If you are interested I can send a PR for this.

diff --git a/examples/example_push.rs b/examples/example_push.rs
index 22d0195..c20f94b 100644
--- a/examples/example_push.rs
+++ b/examples/example_push.rs
@@ -26,7 +26,8 @@ lazy_static! {
 }
 
 #[cfg(feature = "push")]
-fn main() {
+#[tokio::main]
+async fn main() {
     let args: Vec<String> = env::args().collect();
     let program = args[0].clone();
 
@@ -63,6 +64,7 @@ fn main() {
                 password: "pass".to_owned(),
             }),
         )
+        .await
         .unwrap();
     }
 
diff --git a/src/push.rs b/src/push.rs
index 525b342..a3d039a 100644
--- a/src/push.rs
+++ b/src/push.rs
@@ -6,7 +6,7 @@ use std::hash::BuildHasher;
 use std::str::{self, FromStr};
 use std::time::Duration;
 
-use reqwest::blocking::Client;
+use reqwest::Client;
 use reqwest::header::CONTENT_TYPE;
 use reqwest::{Method, StatusCode, Url};
 
@@ -52,32 +52,32 @@ pub struct BasicAuthentication {
 /// Note that all previously pushed metrics with the same job and other grouping
 /// labels will be replaced with the metrics pushed by this call. (It uses HTTP
 /// method 'PUT' to push to the Pushgateway.)
-pub fn push_metrics<S: BuildHasher>(
+pub async fn push_metrics<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
     mfs: Vec<proto::MetricFamily>,
     basic_auth: Option<BasicAuthentication>,
 ) -> Result<()> {
-    push(job, grouping, url, mfs, "PUT", basic_auth)
+    push(job, grouping, url, mfs, "PUT", basic_auth).await
 }
 
 /// `push_add_metrics` works like `push_metrics`, but only previously pushed
 /// metrics with the same name (and the same job and other grouping labels) will
 /// be replaced. (It uses HTTP method 'POST' to push to the Pushgateway.)
-pub fn push_add_metrics<S: BuildHasher>(
+pub async fn push_add_metrics<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
     mfs: Vec<proto::MetricFamily>,
     basic_auth: Option<BasicAuthentication>,
 ) -> Result<()> {
-    push(job, grouping, url, mfs, "POST", basic_auth)
+    push(job, grouping, url, mfs, "POST", basic_auth).await
 }
 
 const LABEL_NAME_JOB: &str = "job";
 
-fn push<S: BuildHasher>(
+async fn push<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
@@ -160,7 +160,7 @@ fn push<S: BuildHasher>(
         builder = builder.basic_auth(username, Some(password));
     }
 
-    let response = builder.send().map_err(|e| Error::Msg(format!("{}", e)))?;
+    let response = builder.send().await.map_err(|e| Error::Msg(format!("{}", e)))?;
 
     match response.status() {
         StatusCode::ACCEPTED => Ok(()),
@@ -173,7 +173,7 @@ fn push<S: BuildHasher>(
     }
 }
 
-fn push_from_collector<S: BuildHasher>(
+async fn push_from_collector<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
@@ -187,31 +187,31 @@ fn push_from_collector<S: BuildHasher>(
     }
 
     let mfs = registry.gather();
-    push(job, grouping, url, mfs, method, basic_auth)
+    push(job, grouping, url, mfs, method, basic_auth).await
 }
 
 /// `push_collector` push metrics collected from the provided collectors. It is
 /// a convenient way to push only a few metrics.
-pub fn push_collector<S: BuildHasher>(
+pub async fn push_collector<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
     collectors: Vec<Box<dyn Collector>>,
     basic_auth: Option<BasicAuthentication>,
 ) -> Result<()> {
-    push_from_collector(job, grouping, url, collectors, "PUT", basic_auth)
+    push_from_collector(job, grouping, url, collectors, "PUT", basic_auth).await
 }
 
 /// `push_add_collector` works like `push_add_metrics`, it collects from the
 /// provided collectors. It is a convenient way to push only a few metrics.
-pub fn push_add_collector<S: BuildHasher>(
+pub async fn push_add_collector<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
     collectors: Vec<Box<dyn Collector>>,
     basic_auth: Option<BasicAuthentication>,
 ) -> Result<()> {
-    push_from_collector(job, grouping, url, collectors, "POST", basic_auth)
+    push_from_collector(job, grouping, url, collectors, "POST", basic_auth).await
 }
 
 const DEFAULT_GROUP_LABEL_PAIR: (&str, &str) = ("instance", "unknown");
@@ -264,8 +264,8 @@ mod tests {
         assert!(!map.is_empty());
     }
 
-    #[test]
-    fn test_push_bad_label_name() {
+    #[tokio::test]
+    async fn test_push_bad_label_name() {
         let table = vec![
             // Error message: "pushed metric {} already contains a job label"
             (LABEL_NAME_JOB, "job label"),
@@ -280,7 +280,7 @@ mod tests {
             m.set_label(from_vec!(vec![l]));
             let mut mf = proto::MetricFamily::new();
             mf.set_metric(from_vec!(vec![m]));
-            let res = push_metrics("test", hostname_grouping_key(), "mockurl", vec![mf], None);
+            let res = push_metrics("test", hostname_grouping_key(), "mockurl", vec![mf], None).await;
             assert!(format!("{}", res.unwrap_err()).contains(case.1));
         }
     }

yds12 avatar Aug 23 '22 14:08 yds12

Thanks for the report. Have you perhaps tried the features-set described in https://github.com/tikv/rust-prometheus/issues/342#issuecomment-783738547 already?

lucab avatar Aug 24 '22 09:08 lucab

Hello, thanks for the reply. Just tried here, but the same error occurs.

yds12 avatar Aug 24 '22 14:08 yds12

FWIW, the original fix no longer works. It's failing on the latest tokio (1.23.0) as far as I can tell.

taj-p avatar Dec 25 '22 11:12 taj-p