fix(metrics): run a separate task for utilization metric to ensure it is regularly updated
Summary
This adds a separate task that runs periodically to emit utilization metrics and collect messages from components that need their utilization metrics calculated. This ensures that utilization metric is published even when no events are running through a component.
Change Type
- [x] Bug fix
- [ ] New feature
- [ ] Non-functional (chore, refactoring, docs)
- [ ] Performance
Is this a breaking change?
- [ ] Yes
- [x] No
How did you test this PR?
Ran vector with internal metrics and observer that utilization was updated every ~5 secs, instead of only when events are running.
Does this PR include user facing changes?
- [x] Yes. Please add a changelog fragment based on our guidelines.
- [ ] No. A maintainer will apply the "no-changelog" label to this PR.
Checklist
- [x] Please read our Vector contributor resources.
- [ ] If this PR introduces changes Vector dependencies (modifies
Cargo.lock), please rundd-rust-license-tool writeto regenerate the license inventory and commit the changes (if any). More details here.
References
- Closes: #20216
Sponsored by Quad9
I have left this as a draft, since I am not sure how to handle shutdown (which shutdown signal to use) and how to name the task (or maybe run it in a completely different way, to not mix it up with components).
Also, gauge is passed into the timer instead of using the macro inside the timer to ensure that correct labels are inherited from the tracing context.
@pront Any suggestion for running this separate task? It is currently started as following:
running_topology.utilization_task =
// TODO: how to name this custom task?
Some(tokio::spawn(Task::new("".into(), "", async move {
utilization_emitter
.run_utilization(ShutdownSignal::noop())
.await;
// TODO: new task output type for this? Or handle this task in a completely
// different way
Ok(TaskOutput::Healthcheck)
})));
I am not sure how to pass the shutdown signal to it (and if I should do it at all, it made sense to me, but I might have misunderstood some part of the topology). Also, I currently create a task with empty name, but maybe it would make more sense to run it in a different way compared to other tasks?
Hi @esensar,
This is a complex so I checked out this PR to do some testing;
config:
api:
enabled: true
sources:
internal_metrics_1:
type: internal_metrics
transforms:
filter_utilization:
type: filter
inputs: ["internal_metrics_1"]
condition: .name == "utilization"
sinks:
console:
inputs: ["filter_utilization"]
type: console
encoding:
codec: json
json:
pretty: true
Sample output:
/Users/pavlos.rontidis/.cargo/bin/cargo run --color=always --profile dev -- --config /Users/pavlos.rontidis/CLionProjects/vector/pront/configs/internal_metrics.yaml
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.70s
Running `target/debug/vector --config /Users/pavlos.rontidis/CLionProjects/vector/pront/configs/internal_metrics.yaml`
2025-01-09T20:46:27.736727Z INFO vector::app: Log level is enabled. level="info"
2025-01-09T20:46:27.741218Z INFO vector::app: Loading configs. paths=["/Users/pavlos.rontidis/CLionProjects/vector/pront/configs/internal_metrics.yaml"]
2025-01-09T20:46:27.766384Z INFO vector::topology::running: Running healthchecks.
2025-01-09T20:46:27.767489Z INFO vector::topology::builder: Healthcheck passed.
2025-01-09T20:46:27.769222Z INFO vector: Vector has started. debug="true" version="0.44.0" arch="aarch64" revision=""
{
"name": "utilization",
"namespace": "vector",
"tags": {
"component_id": "console",
"component_kind": "sink",
"component_type": "console",
"host": "COMP-LPF0JYPP2Q"
},
"timestamp": "2025-01-09T20:46:27.770905Z",
"kind": "absolute",
"gauge": {
"value": 1.0
}
}
{
"name": "utilization",
"namespace": "vector",
"tags": {
"component_id": "filter_utilization",
"component_kind": "transform",
"component_type": "filter",
"host": "COMP-LPF0JYPP2Q"
},
"timestamp": "2025-01-09T20:46:27.770905Z",
"kind": "absolute",
"gauge": {
"value": 1.0
}
}
2025-01-09T20:46:27.777873Z INFO vector::internal_events::api: API server running. address=127.0.0.1:8686 playground=http://127.0.0.1:8686/playground graphql=http://127.0.0.1:8686/graphql
{
"name": "utilization",
"namespace": "vector",
"tags": {
"component_id": "filter_utilization",
"component_kind": "transform",
"component_type": "filter",
"host": "COMP-LPF0JYPP2Q"
},
"timestamp": "2025-01-09T20:46:37.771882Z",
"kind": "absolute",
"gauge": {
"value": 0.010011816446046937
}
}
{
"name": "utilization",
"namespace": "vector",
"tags": {
"component_id": "console",
"component_kind": "sink",
"component_type": "console",
"host": "COMP-LPF0JYPP2Q"
},
"timestamp": "2025-01-09T20:46:37.771882Z",
"kind": "absolute",
"gauge": {
"value": 0.01004418815411736
}
}
{
"name": "utilization",
"namespace": "vector",
"tags": {
"component_id": "filter_utilization",
"component_kind": "transform",
"component_type": "filter",
"host": "COMP-LPF0JYPP2Q"
},
"timestamp": "2025-01-09T20:46:47.771505Z",
"kind": "absolute",
"gauge": {
"value": 0.0001184493997704478
}
}
{
"name": "utilization",
"namespace": "vector",
"tags": {
"component_id": "console",
"component_kind": "sink",
"component_type": "console",
"host": "COMP-LPF0JYPP2Q"
},
"timestamp": "2025-01-09T20:46:47.771505Z",
"kind": "absolute",
"gauge": {
"value": 0.00010693227629135064
}
}
...
Leaving this here as context. Will followup with more questions.
cc @lukesteensen (just in case you are interested in this one)
I haven't been able to figure out what causes these component validation tests to get stuck when stopping the topology. I can see that the utilization task stops properly, but sink tasks get stuck for some reason :/
I haven't been able to figure out what causes these component validation tests to get stuck when stopping the topology. I can see that the utilization task stops properly, but sink tasks get stuck for some reason :/
I didn't have time to take a look at this yet. But I wouldn't be surprised if the validation framework also needs changes.
This was very weird to fix. I managed to get it to work by returning that original IntervalStream to the Utilization wrapper and just polling it and ignoring its results.
let _ = this.intervals.poll_next_unpin(cx);
I have no idea why this worked, but I hope that will help you understand the issue better @pront . Sorry about this, I don't really understand Rust streams that well.
Hi @pront , does this look alright? Does it need further changes? My latest change has fixed issues with tests, but it is not ideal 😄
Hi @pront , does this look alright? Does it need further changes? My latest change has fixed issues with tests, but it is not ideal 😄
Hi @esensar, this is a complex change and I need more time to dive into the details. I plan to review this before the next release. In the meantime, if you had ideas for further improvements, feel free to update the PR.
Hi @pront , does this look alright? Does it need further changes? My latest change has fixed issues with tests, but it is not ideal 😄
Hi @esensar, this is a complex change and I need more time to dive into the details. I plan to review this before the next release. In the meantime, if you had ideas for further improvements, feel free to update the PR.
There is only the latest change I made to consider to change, but I haven't really had time to figure out an alternative. If I get some time, I will try to clean that part up.
Hello Vector Dev folks - can this have a once-over sometime? We're still sometimes very stumped with strange results in our utilization graphs that cause our monitoring system to have severe indigestion.
Hi and apologies for the delay on this PR. Can you please do a rebase and ensure the checks are passing? I will review shortly after.
Checks passed locally. Not sure why the spelling check failed, it seems like it is in some other files.
How did you verify this change locally? I wanted to run my own set of tests as well before we merge this one
How did you verify this change locally? I wanted to run my own set of tests as well before we merge this one
Hmm I can't find my exact configuration that I used back when I originally tested it. Let me try to set something up again, the general idea was to have a sink connected to a source that I can easily control (send data directly to manually, so that I can stop sending and still see utilization metric get published).
I have tested it with this now:
api:
enabled: true
sources:
internal_metrics_1:
type: internal_metrics
http:
type: http_server
address: 0.0.0.0:59001
encoding: "text"
headers:
- User-Agent
transforms:
filter_utilization:
type: filter
inputs: ["internal_metrics_1"]
condition: .name == "utilization"
sinks:
console:
inputs: ["filter_utilization"]
type: console
encoding:
codec: json
console_http:
inputs: ["http"]
type: console
encoding:
codec: json
The console sink based on utilization metrics produces a bit too much output, but it can be seen that the value is changed every 5 seconds for console_http, regardless of data passing through it.
I sent data to it using:
curl -X POST localhost:59001 -d "test"
I'm trying to run your config from both master and your branch and then checking for changes. I'm seeing utilization metrics coming through for "component_id":"console_http" in both versions and the value is also being changed despite no data being sent to the source. Can you run the same test I'm running and tell me what I should expect to see differently?
I'm trying to run your config from both
masterand your branch and then checking for changes. I'm seeing utilization metrics coming through for"component_id":"console_http"in both versions and the value is also being changed despite no data being sent to the source. Can you run the same test I'm running and tell me what I should expect to see differently?
Right, I just tested it myself and it worked fine, my bad. I guess it depends on the kind of component used. Here is a configuration that doesn't get updated on master:
api:
enabled: true
sources:
internal_metrics_1:
type: internal_metrics
http:
type: http_server
address: 0.0.0.0:59001
encoding: "text"
headers:
- User-Agent
transforms:
filter_utilization:
type: filter
inputs: ["internal_metrics_1"]
condition: .name == "utilization" && .tags.component_id == "remap_http" # Just to reduce noise slightly
remap_http:
type: remap
inputs: ["http"]
source: .test = "test"
sinks:
console:
inputs: ["filter_utilization"]
type: console
encoding:
codec: json
console_http:
inputs: ["remap_http"]
type: console
encoding:
codec: json
I'm trying to run your config from both
masterand your branch and then checking for changes. I'm seeing utilization metrics coming through for"component_id":"console_http"in both versions and the value is also being changed despite no data being sent to the source. Can you run the same test I'm running and tell me what I should expect to see differently?Right, I just tested it myself and it worked fine, my bad. I guess it depends on the kind of component used. Here is a configuration that doesn't get updated on
master:
This is interesting. Do we understand the root cause here?
I'm trying to run your config from both
masterand your branch and then checking for changes. I'm seeing utilization metrics coming through for"component_id":"console_http"in both versions and the value is also being changed despite no data being sent to the source. Can you run the same test I'm running and tell me what I should expect to see differently?Right, I just tested it myself and it worked fine, my bad. I guess it depends on the kind of component used. Here is a configuration that doesn't get updated on
master:This is interesting. Do we understand the root cause here?
I am not sure why it worked with console sink. I can't remember what components I tested with originally, but I always had the issue of the utilization not being updated if data is not passing through. Looking at console sink, it doesn't seem to have some special behavior that would make it behave differently.
I am not sure why it worked with console sink. I can't remember what components I tested with originally, but I always had the issue of the utilization not being updated if data is not passing through. Looking at
consolesink, it doesn't seem to have some special behavior that would make it behave differently.
Hi @esensar, I ran your config:
api:
enabled: true
sources:
internal_metrics_1:
type: internal_metrics
scrape_interval_secs: 5
http:
type: http_server
address: 0.0.0.0:59001
encoding: "text"
headers:
- User-Agent
transforms:
filter_utilization:
type: filter
inputs: ["internal_metrics_1"]
condition: .name == "utilization" && .tags.component_id == "remap_http" # Just to reduce noise slightly
remap_http:
type: remap
inputs: ["http"]
source: .test = "test"
sinks:
console:
inputs: ["filter_utilization"]
type: console
encoding:
codec: "json"
json:
pretty: true
console_http:
inputs: ["remap_http"]
type: console
encoding:
codec: json
and then:
./repeat_command.sh curl -X POST localhost:59001 -d "test"
And I noticed master branch and this PR have different behavior. On master it keeps the old value. But on your PR utilization drops when I stop publishing events, which is the desired behavior.