rust-prometheus
rust-prometheus copied to clipboard
`push_metrics` panics inside tokio
Describe the bug
Calling push_metrics
from an async
function with tokio
causes this error:
thread 'main' panicked at 'Cannot drop a runtime in a context where blocking is not allowed. This happens when a runtime is dropped from within an asynchronous context.' .../.cargo/registry/src/github.com-.../tokio-1.20.1/src/runtime/blocking/shutdown.rs:51:21
To Reproduce
Take the push example, annotate main
with #[tokio::main]
, using tokio = { version = "1.20", features = ["rt", "rt-multi-thread", "macros"] }
in the Cargo.toml
. Running the example will produce the error above.
Expected behavior
No panic or async
-compatible alternative.
It's quite simple to make it work, basically just need to not use reqwest::blocking::Client
, and put some async
and await
here and there (see patch below). But of course that would be a breaking change and force people to use async, so I don't know what you guys would like to do, maybe put the async code behind a feature flag? If you are interested I can send a PR for this.
diff --git a/examples/example_push.rs b/examples/example_push.rs
index 22d0195..c20f94b 100644
--- a/examples/example_push.rs
+++ b/examples/example_push.rs
@@ -26,7 +26,8 @@ lazy_static! {
}
#[cfg(feature = "push")]
-fn main() {
+#[tokio::main]
+async fn main() {
let args: Vec<String> = env::args().collect();
let program = args[0].clone();
@@ -63,6 +64,7 @@ fn main() {
password: "pass".to_owned(),
}),
)
+ .await
.unwrap();
}
diff --git a/src/push.rs b/src/push.rs
index 525b342..a3d039a 100644
--- a/src/push.rs
+++ b/src/push.rs
@@ -6,7 +6,7 @@ use std::hash::BuildHasher;
use std::str::{self, FromStr};
use std::time::Duration;
-use reqwest::blocking::Client;
+use reqwest::Client;
use reqwest::header::CONTENT_TYPE;
use reqwest::{Method, StatusCode, Url};
@@ -52,32 +52,32 @@ pub struct BasicAuthentication {
/// Note that all previously pushed metrics with the same job and other grouping
/// labels will be replaced with the metrics pushed by this call. (It uses HTTP
/// method 'PUT' to push to the Pushgateway.)
-pub fn push_metrics<S: BuildHasher>(
+pub async fn push_metrics<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
- push(job, grouping, url, mfs, "PUT", basic_auth)
+ push(job, grouping, url, mfs, "PUT", basic_auth).await
}
/// `push_add_metrics` works like `push_metrics`, but only previously pushed
/// metrics with the same name (and the same job and other grouping labels) will
/// be replaced. (It uses HTTP method 'POST' to push to the Pushgateway.)
-pub fn push_add_metrics<S: BuildHasher>(
+pub async fn push_add_metrics<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
mfs: Vec<proto::MetricFamily>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
- push(job, grouping, url, mfs, "POST", basic_auth)
+ push(job, grouping, url, mfs, "POST", basic_auth).await
}
const LABEL_NAME_JOB: &str = "job";
-fn push<S: BuildHasher>(
+async fn push<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
@@ -160,7 +160,7 @@ fn push<S: BuildHasher>(
builder = builder.basic_auth(username, Some(password));
}
- let response = builder.send().map_err(|e| Error::Msg(format!("{}", e)))?;
+ let response = builder.send().await.map_err(|e| Error::Msg(format!("{}", e)))?;
match response.status() {
StatusCode::ACCEPTED => Ok(()),
@@ -173,7 +173,7 @@ fn push<S: BuildHasher>(
}
}
-fn push_from_collector<S: BuildHasher>(
+async fn push_from_collector<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
@@ -187,31 +187,31 @@ fn push_from_collector<S: BuildHasher>(
}
let mfs = registry.gather();
- push(job, grouping, url, mfs, method, basic_auth)
+ push(job, grouping, url, mfs, method, basic_auth).await
}
/// `push_collector` push metrics collected from the provided collectors. It is
/// a convenient way to push only a few metrics.
-pub fn push_collector<S: BuildHasher>(
+pub async fn push_collector<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
collectors: Vec<Box<dyn Collector>>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
- push_from_collector(job, grouping, url, collectors, "PUT", basic_auth)
+ push_from_collector(job, grouping, url, collectors, "PUT", basic_auth).await
}
/// `push_add_collector` works like `push_add_metrics`, it collects from the
/// provided collectors. It is a convenient way to push only a few metrics.
-pub fn push_add_collector<S: BuildHasher>(
+pub async fn push_add_collector<S: BuildHasher>(
job: &str,
grouping: HashMap<String, String, S>,
url: &str,
collectors: Vec<Box<dyn Collector>>,
basic_auth: Option<BasicAuthentication>,
) -> Result<()> {
- push_from_collector(job, grouping, url, collectors, "POST", basic_auth)
+ push_from_collector(job, grouping, url, collectors, "POST", basic_auth).await
}
const DEFAULT_GROUP_LABEL_PAIR: (&str, &str) = ("instance", "unknown");
@@ -264,8 +264,8 @@ mod tests {
assert!(!map.is_empty());
}
- #[test]
- fn test_push_bad_label_name() {
+ #[tokio::test]
+ async fn test_push_bad_label_name() {
let table = vec![
// Error message: "pushed metric {} already contains a job label"
(LABEL_NAME_JOB, "job label"),
@@ -280,7 +280,7 @@ mod tests {
m.set_label(from_vec!(vec![l]));
let mut mf = proto::MetricFamily::new();
mf.set_metric(from_vec!(vec![m]));
- let res = push_metrics("test", hostname_grouping_key(), "mockurl", vec![mf], None);
+ let res = push_metrics("test", hostname_grouping_key(), "mockurl", vec![mf], None).await;
assert!(format!("{}", res.unwrap_err()).contains(case.1));
}
}
Thanks for the report. Have you perhaps tried the features-set described in https://github.com/tikv/rust-prometheus/issues/342#issuecomment-783738547 already?
Hello, thanks for the reply. Just tried here, but the same error occurs.
FWIW, the original fix no longer works. It's failing on the latest tokio (1.23.0) as far as I can tell.
I wrote prometheus-push as a crate that handles the push functionality, so prometheus
crates does not necessarily have to take care of this. prometheus-push
works blocking, non-blocking, with this crate or "the other" prometheus_client crate. Or you can implement the traits provided there your self to use it whatever you want.
For this crate and non blocking reqwest
it is as easy as that:
[dependencies]
prometheus_push = { version = "<version>", default-features = false, features = ["with_reqwest", "prometheus_crate"] }
use prometheus::labels;
use prometheus_push::prometheus_crate::PrometheusMetricsPusher;
use reqwest::Client;
use url::Url;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let push_gateway: Url = Url::parse("<address to pushgateway>")?;
let client = Client::new();
let metrics_pusher = PrometheusMetricsPusher::from(client, &push_gateway)?;
metrics_pusher
.push_all(
"<your push jobs name>",
&labels! { "<label_name>" => "<label_value>" },
prometheus::gather(),
)
.await?;
Ok(())
}