Vector CPU Usage increased 0.50.0
A note for the community
- Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
- If you are interested in working on this issue or have submitted a pull request, please leave a comment
Problem
Hello, I have migrated from vector 0.49.0 to 0.50.0 recently. When updated, I have noticed an increase of the CPU usage. The CPU average increase were by 1.4 cpu per VM approximately, from 5.4-> 6.8. Even though, the number of metrics incoming has not changed. Reverting to 0.49.0 seems to fix the issue.
I have updated 2 independent clusters, and both has behaved the same way (as you can see in the image below).
Each vector is installed on a separate VM. I have 8 vector instances (VM) per cluster. Each VM has :
- 8 cpu
- 16 GB RAM.
- Only metrics are processed in Vector
Configuration
My config is:
data_dir: "{{ vector_data_directory }}"
api:
enabled: true
enrichment_tables:
enrichment:
type: file
file:
path: "{{ vector_data_directory }}/enrichment.csv"
encoding:
type: csv
sources:
prometheusreceiver:
type: prometheus_remote_write
address: 0.0.0.0:9090
tls:
enabled: true
crt_file: crt
key_file: key
internalmetrics:
type: internal_metrics
scrape_interval_secs: 10
transforms:
enrich_internalmetrics:
type: remap
inputs:
- internalmetrics
source: |
.tags.host_name = get_hostname!()
.tags.deployment_environment = "dev"
.tags.service_name = "vector"
.tags.service_namespace = "namespace"
.tags.service_budget_name = "service_budget_name"
enrich_metrics:
type: remap
inputs:
- prometheusreceiver
source: |
# don't enrich counter metrics
if .name == "my_counter" {
return .
}
# Pre-assign Kubernetes namespace and cluster (using Vector or OTel tags)
kubernetes_namespace = compact([.tags.namespace, .tags.k8s_namespace_name, .tags.kubernetes_namespace], string: true, nullish: true)[0]
kubernetes_cluster = compact([.tags.kubernetes_cluster_name, .tags.k8s_cluster_name], string: true, nullish: true)[0]
# Set isKubernetes if both exist (non-empty)
isKubernetes = !is_nullish(kubernetes_namespace) && !is_nullish(kubernetes_cluster)
selector = ""
if isKubernetes {
selector, err = kubernetes_namespace + "." + .tags.kubernetes_cluster_name
if err != null {
log(join!([selector, to_string(err)], " "), level: "error", rate_limit_secs: 600)
return .
}
} else {
# Non-Kubernetes selectors to try
defaultSelectors = [.tags.instance, .tags.host_name, .tags.host, .tags.deployment_host]
# remove the the null and the ip selectors
selectors = filter(defaultSelectors) -> |_i, sel| {
selectorString = to_string!(sel)
!is_nullish(selectorString) && match(selectorString, r'my_regex')
}
# if no selectors left, don't enrich the metric
if is_empty(selectors) {
return .
}
selector = selectors[0]
# When it's a url and not an ip, use the hostname as selector
if starts_with(string!(selector), "http") {
url, err = parse_url(string!(selector))
if err == null && !is_ipv4(string!(url.host)) {
selector = url.host
}
}
selector = split(string!(selector), ":")[0]
}
# Add service_name from app, appl or app_kubernetes_io_name by order
if !exists(.tags.service_name) {
service_name = compact([.tags.app, .tags.appl, .tags.app_kubernetes_io_name], string: true, nullish: true)[0]
if !is_nullish(service_name) {
.tags.service_name = service_name
}
}
enrichment, err = get_enrichment_table_record("enrichment", {"name": selector })
if err != null {
# if the row is not found, return
log(join!(["Couldn't find in enrichment:",selector], " "), level: "error", rate_limit_secs: 600)
return .
}
# Add an internal label for the budget name, because service_namespace is sometimes predefined by users
.tags.service_budget_name = downcase!(enrichment.service_budget_name)
if !exists(.tags.service_namespace) {
.tags.service_namespace = downcase!(enrichment.service_budget_name)
}
if !exists(.tags.deployment_environment) {
.tags.deployment_environment = enrichment.context
}
if isKubernetes {
if !exists(.tags.host_name) && exists(.tags.kubernetes_pod_name) {
.tags.host_name = .tags.kubernetes_pod_name
}
} else {
if !exists(.tags.host_name) {
.tags.host_name = enrichment.name
}
}
remove_internal_labels:
type: remap
inputs:
- enrich_metrics
- enrich_internalmetrics
source: |
# don't enrich counter metrics
if .name == "my_counter" {
return .
}
del(.tags.service_budget_name)
sinks:
prometheus_remote_write1:
type: prometheus_remote_write
inputs: &inputs
- remove_internal_labels
endpoint: "${REMOTEWRITE_ENDPOINT1}"
auth: &promauth
strategy: "basic"
user: "user"
password: "${REMOTEWRITE_PASSWORD}"
buffer: &buffer
type: memory
max_events: 20000
batch: &batch
max_events: 10000
prometheus_remote_write2:
type: prometheus_remote_write
inputs: *inputs
endpoint: "${REMOTEWRITE_ENDPOINT2}"
auth: *promauth
buffer: *buffer
batch: *batch
metrics_counter:
type: prometheus_remote_write
inputs:
- enrich_metrics
- enrich_internalmetrics
endpoint: "http://localhost:{{ port }}/count/metrics/prometheus"
buffer:
type: memory
max_events: 20000
when_full: drop_newest
batch:
max_events: 10000
Version
0.50.0
Debug Output
Example Data
Additional Context
No response
References
No response
We also noticed it after upgrading from v0.48:
Our config:
role: "Agent"
args:
- "--config-dir"
- "/etc/vector/"
- "--allocation-tracing"
image:
repository: timberio/vector
base: "debian"
livenessProbe:
httpGet:
path: /health
port: api
readinessProbe:
httpGet:
path: /health
port: api
resources:
requests:
memory: 256Mi
cpu: 200m
limits:
memory: 1024Mi
cpu: 1000m
tolerations:
- key: "name"
operator: "Equal"
value: "system"
effect: "NoSchedule"
podAnnotations:
prometheus.io/scrape: "true"
prometheus.io/scheme: "http"
prometheus.io/port: "9598"
enable.version-checker.io/vector: "true"
use-metadata.version-checker.io/vector: "true"
extraVolumes:
- name: var-log-journal
hostPath:
path: "/var/log/journal"
extraVolumeMounts:
- name: var-log-journal
mountPath: "/mnt/host/journal"
readOnly: true
customConfig:
data_dir: /vector-data-dir
expire_metrics_secs: 300
api:
enabled: true
address: 0.0.0.0:8686
playground: false
sources:
pods_logs:
type: kubernetes_logs
glob_minimum_cooldown_ms: 2000
max_line_bytes: 131072
use_apiserver_cache: true
internal_logs:
type: internal_logs
journald_logs:
type: journald
journal_directory: /mnt/host/journal
internal_metrics:
type: internal_metrics
transforms:
pods_source_types:
type: remap
inputs:
- pods_logs
source: |-
.source = "pods_logs"
.source_environment = "${environment}"
vector_internal_source_types:
type: remap
inputs:
- internal_logs
source: |-
.source = "vector_agent_internal_logs"
.source_environment = "${environment}"
vector_journald_source_types:
type: remap
inputs:
- journald_logs
source: |-
.source = "journald_logs"
.source_environment = "${environment}"
# also fix hostname field as it sometimes for some services defined as "localhost"
.real_hostname = get_env_var("VECTOR_SELF_NODE_NAME") ?? .host
sinks:
to_kafka:
type: kafka
inputs:
- pods_source_types
- vector_internal_source_types
- vector_journald_source_types
bootstrap_servers: ...
compression: zstd
encoding:
codec: "json"
topic: logs-kubernetes-${environment}
batch:
timeout_secs: 1
buffer:
max_size: 5368709120
type: disk
prometheus_exporter:
type: prometheus_exporter
inputs:
- internal_metrics
address: 0.0.0.0:9598
Same here upgrading from 0.48.0 to 0.50.0. Vector is running as log collecting agent. Although relatively little absolute cpu consumption the cluster wide average increased by about 100%.
Our config:
Additional note. we are building vector from source like follows:
&& export RUSTFLAGS="-Ctarget-feature=+crt-static -Clto=true -Cembed-bitcode=yes -Clinker=clang-20 -Clinker-plugin-lto=true -Clink-arg=-flto=full -Ccodegen-units=1 -Cstrip=symbols -Cpanic=abort" \
&& . ~/.cargo/env \
&& cargo build \
--no-default-features \
--features api,api-client,sources-file,sources-journald,sources-kubernetes_logs,transforms-filter,transforms-log_to_metric,transforms-reduce,transforms-remap,transforms-route,sinks-aws_s3,sinks-prometheus,sinks-socket,sinks-http \
--target x86_64-unknown-linux-gnu \
--release
Those are Kubernetes Config Maps mounted as files in the Vector containers.
---
apiVersion: v1
kind: ConfigMap
metadata:
name: vector-cm
data:
vector.yaml: |-
api:
enabled: true
playground: false
expire_metrics_secs: 60
data_dir: /opt/vector/data/
enrichment_tables:
severity_num:
type: file
file:
path: /opt/vector/config/severity_num.csv
encoding:
type: csv
delimiter: "\t"
include_headers: true
schema:
severity: string
severity_num: int
sources:
kubernetes:
type: kubernetes_logs
glob_minimum_cooldown_ms: 1000
self_node_name: ${NODE_NAME}
host_journal:
type: journald
current_boot_only: true
since_now: true
journal_directory: /var/log/journal/
transforms:
vector_in:
type: log_to_metric
inputs:
- kubernetes
- host_journal
metrics:
- type: counter
name: logs_in_total
field: message
tags:
source_type: "{{source_type}}"
node_logs:
type: remap
reroute_dropped: true
inputs: [host_journal]
source: |
.cluster = "${K8S_IDENTIFIER}"
.host = split(get_env_var("NODE_NAME") ?? "no_host", ".")[0]
.parsing_type = "journald"
filtered:
type: filter
inputs: [kubernetes]
condition: |
!is_nullish(.message) && .pod_container != "reverse-proxy"
merge_multiline:
type: reduce
inputs:
- filtered
group_by:
- .kubernetes.container_id
merge_strategies:
message: concat_newline
starts_when: |
msg = string(.message) ?? ""
match(msg, r'^[^\s]') # match indented lines like jvm stack trace
#!starts_with(msg, "[") # match square brace indexed line like graylog
expire_after_ms: 3000
route:
type: route
inputs: [merge_multiline]
route:
json: starts_with(string(.message) ?? "", "{")
ltsv: starts_with(string(.message) ?? "", "ltsv:1")
alertlogger: .kubernetes.container_name == "prometheus-alertlogger"
graylog: .kubernetes.container_name == "graylog"
parsed_json:
type: remap
reroute_dropped: true
inputs: [route.json]
file: /opt/vector/config/vrls/parsed_json.vrl
parsed_ltsv:
type: remap
reroute_dropped: true
inputs: [route.ltsv]
file: /opt/vector/config/vrls/parsed_ltsv.vrl
parsed_alertlogger:
type: remap
reroute_dropped: true
inputs: [route.alertlogger]
file: /opt/vector/config/vrls/parsed_alertlogger.vrl
parsed_graylog:
type: remap
reroute_dropped: true
inputs: [route.graylog]
file: /opt/vector/config/vrls/parsed_graylog.vrl
unparsed:
type: remap
reroute_dropped: true
inputs: [route._unmatched]
source: |
msg, err = string(.message)
if err == null {
.message = strip_ansi_escape_codes(msg)
}
.parsing_type = "unparsed"
dropped:
type: remap
reroute_dropped: true
inputs:
- node_logs.dropped
- unparsed.dropped
- parsed_json.dropped
- parsed_ltsv.dropped
- parsed_alertlogger.dropped
- parsed_graylog.dropped
source: .parsing_type = "dropped"
unified:
type: remap
reroute_dropped: true
inputs:
- parsed_json
- parsed_ltsv
- parsed_alertlogger
- parsed_graylog
- unparsed
- dropped
- dropped.dropped
file: /opt/vector/config/vrls/unified.vrl
trace_filter:
type: filter
inputs: [unified]
condition: |
.trace_id != null &&
.span_id != null &&
.span_name != null &&
.span_start_timestamp_nanos != null &&
.span_end_timestamp_nanos != null
trace_contexts:
type: route
inputs: [trace_filter]
route:
http_server: |
."http.request.method" != null &&
."url.path" != null &&
."url.scheme" != null
http_client: |
."http.request.method" != null &&
."server.address" != null &&
."server.port" != null &&
."url.full" != null
http-client-tracing-context:
type: remap
inputs: [trace_contexts.http_client]
source: |
.span_kind = 3
.attributes = {
"http.request.method": ."http.request.method",
"server.address": ."server.address",
"server.port": ."server.port",
"url.full": ."url.full",
}
http-server-tracing-context:
type: remap
inputs: [trace_contexts.http_server]
source: |
.span_kind = 2
.attributes = {
"http.request.method": ."http.request.method",
"url.path": ."url.path",
"url.scheme": ."url.scheme",
}
quickwit-traces:
type: remap
inputs:
- http-server-tracing-context
- http-client-tracing-context
file: /opt/vector/config/vrls/quickwit-traces.vrl
vector_out:
type: log_to_metric
inputs:
- unified
- node_logs
metrics:
- type: counter
name: logs_out_total
field: message
tags:
parsing_type: "{{parsing_type}}"
source_type: "{{source_type}}"
sinks:
prometheus:
type: prometheus_exporter
inputs:
- vector_in
- vector_out
address: 0.0.0.0:9100
# prometheus is confused by timeseries with timestamps
# probably occurring when multiple timestamps exists within single scrape (single pod)
suppress_timestamp: true
graylog:
type: socket
inputs:
- unified
- unified.dropped
- node_logs
address: ${GRAYLOG_ADDRESS}
mode: tcp
encoding:
codec: gelf
buffer:
type: disk
max_size: 536870912 # 512 Mb
framing:
method: character_delimited
character_delimited:
delimiter: "\0"
quickwit_traces:
type: http
inputs: [quickwit-traces]
method: post
uri: quickwit
encoding:
codec: json
framing:
method: newline_delimited
buffer:
when_full: drop_newest"
# TODO: this could use build in function to convert int to syslog severity
# https://vector.dev/docs/reference/vrl/functions/#to_syslog_severity
severity_num.csv: |-
severity_num severity
7 debug
7 debg
7 d
7 7
6 sc_info
6 info
6 i
6 6
5 notice
5 n
4 warning
4 warn
4 w
4 4
3 error
3 err
3 eror
3 e
3 3
2 critical
2 crit
2 2
1 alert
1 1
0 f
0 emergency
0 0
---
apiVersion: v1
kind: ConfigMap
metadata:
name: vector-vrls
data:
parsed_json.vrl: |-
json, err = parse_json(.message)
if err == null {
# merge parsed json into message (overwrites fields with json values)
.parsed_message = json
.parsing_type = "json"
} else {
.parsing_type = "invalid_json"
}
parsed_ltsv.vrl: |-
parsed_kv, err = parse_key_value(
.message,
key_value_delimiter: ":",
field_delimiter: "\t",
accept_standalone_key: false
)
if err == null {
del(parsed_kv.ltsv)
.parsed_message = map_values(parsed_kv) -> |value| {
# order is important
typed_val, err = to_int(value) ?? to_bool(value) ?? to_float(value)
if err == null {
typed_val
} else {
tmp_str, err = string(value)
if err == null {
tmp_str = replace(tmp_str, "\\n", "\n")
tmp_str = replace(tmp_str, "\\t", "\t")
tmp_str = replace(tmp_str, "\\r", "\r")
tmp_str
} else {
value # gets json encoded later
}
}
}
.parsing_type = "ltsv"
} else {
.parsing_type = "invalid_ltsv"
}
parsed_graylog.vrl: |-
parsed_message, err = parse_regex(
.message,
# '(?s)' enables multiline mode
r'(?s)[^ ]+ [^ ]+ (?P<severity>\w+) *: (?P<class>[^ ]+) - (?P<message>.*)'
)
if err == null {
.parsed_message.severity = parsed_message.severity
.parsed_message.message = parsed_message.message
.parsed_message.class = parsed_message.class
.parsing_type = "graylog"
} else {
.parsing_type = "invalid_graylog"
}
parsed_alertlogger.vrl: |-
parsed_kv, err = parse_key_value(
.message,
field_delimiter: "\t",
accept_standalone_key: false
)
if err == null {
parsed_kv = map_values(parsed_kv) -> |value| {
# order is important
to_int(value) ?? to_bool(value) ?? to_float(value) ?? value
}
.parsed_message = parsed_kv
.parsing_type = "alertlogger"
} else {
.parsing_type = "invalid_alertlogger"
}
unified.vrl: |-
# incorporate parsed message
if exists(.parsed_message) {
if length(object!(.parsed_message)) > 30 {
.parsing_type = "too_many_fields"
.message = "No. of fields > 30 is not allowed."
} else {
del(.message)
# if "message" not set try other value
if !exists(.parsed_message.message) {
# use "msg" field
if exists(.parsed_message.msg) {
.parsed_message.message = .parsed_message.msg
del(.parsed_message.msg)
}
# use "summary" field
if exists(.parsed_message.summary) {
.parsed_message.message = .parsed_message.summary
del(.parsed_message.summary)
}
}
# kubernetes info are not allowed to be overwritten
del(.parsed_message.kubernetes)
. |= map_keys(object(.parsed_message) ?? {}) -> |key| {
if key != "message" { # "message" key gets malformed sometimes
replace(key, r'[^a-z0-9.]', "_") # sanitize keys
} else {
key
}
}
del(.parsed_message)
}
}
# set common message needed by gelf (if not exist)
if !exists(.message) {.message = "sc_parsed"}
# set infrastructure labels
.host = split(get_env_var("NODE_NAME") ?? "no_host", ".")[0]
.cluster = "${K8S_IDENTIFIER}"
if exists(.kubernetes) {
.pod = .kubernetes.pod_name
.pod_namespace = .kubernetes.pod_namespace
.pod_container = .kubernetes.container_name
.identifier = .kubernetes.container_name
.container_id = .kubernetes.container_id
} else {
.container_id = "undetermined_due_to_missing_pod_metadata"
# workaround if kubeapi metadata is missing
# /var/log/pods/sxp-test_test-logs_add544da-b3e9-4379-bc2b-dec169758fce/alpine/1.log
file_string, err = string(.file)
if err == null {
file_fields = split(file_string, r'[/_]')
.pod = file_fields[5]
.pod_namespace = file_fields[4]
.pod_container = file_fields[7]
.identifier = file_fields[7]
} else {
.pod = "unknown"
.pod_namespace = "unknown"
.pod_container = "unknown"
.identifier = "unknown"
}
}
# remove unused or redundant info
#del(.file) # keep to allow id of records w/missing pod enrichment metadata
del(.kubernetes)
del(.stream)
del(.timestamp_end)
# make sure that some timestamp field exists and is of type ts
.timestamp = timestamp(.timestamp) ?? {
# parse iso format timestamps ("%+")
parse_timestamp(.timestamp, format:"%+") ?? now()
}
.timestamp_nano = format_timestamp!(.timestamp, format: "%F %T.%f")
.timestamp_now = format_timestamp!(now(), format: "%F %T.%f")
# gelf fails if ".level" is not an integer
# and we want severity instead anyway
if !exists(.severity) && !is_nullish(.level) {
.severity = .level
}
.severity_num = get_enrichment_table_record(
"severity_num", {"severity": .severity}, case_sensitive: false
).severity_num ?? -1
.severity = to_syslog_level(.severity_num) ?? ""
del(.level)
if !exists(.parsing_type) { .parsing_type = "missed" }
# gelf doesn't handle special types -> stringify
. = map_values(., false) -> |value| {
value = if is_object(value) || is_array(value) || is_null(value) || is_boolean(value) {
encode_json(value)
} else {
value
}
# handle fields being to long
str, err = string(value)
if err == null && strlen(str) > 32752 {
slice!(str, start: 0, end: 32752) + " ... truncated"
} else {
value
}
}
# some fields are used by different applications with different types -> coerse them
# because of previous json transform this should never fail
if exists(.id) { .id = to_string(.id) ?? "this value must be string but was not" }
if exists(.status) { .status = to_string(.status) ?? "this value must be string but was not" }
# fail safe if anything went wrong (without graylog would drop the message)
if is_nullish(.message) {.message = "missing message"}
quickwit-traces.vrl: |-
.attributes.pod_namespace = .pod_namespace
_span_status = {
"code": string(."span_status.code") ?? "unset",
"message": string(."span_status.message") ?? "",
}
if _span_status.code != "error" {
if _span_status.message != "" || (.serverity_num <= 3 && .serverity_num > -1 ?? false) {
_span_status.code = "error"
} else {
del(_span_status.message)
}
}
parent_id = if .message.parent_id == "" {
null
} else {
.message.parent_id
}
. = {
"trace_id": .trace_id,
"parent_span_id": parent_id, # graylog legacy naming
"service_name": .pod_container,
"scope_name": .pod,
"scope_version": .scope_version,
"span_id": .span_id,
"span_kind": .span_kind,
"span_name": .span_name,
"span_start_timestamp_nanos": .span_start_timestamp_nanos,
"span_end_timestamp_nanos": .span_end_timestamp_nanos,
"span_attributes": .attributes,
"span_status": _span_status,
}
Hi all, please share your Vector configurations and any other details and metrics you have available so we can try to reproduce this on our side.
@pront I have edited my previous comment and added the config.
We've also noticed a significant increase in io and rolled back v0.50. We use Vector for log forwarding of kubernetes logs (up to around 250MB/s per region). In two separate incidents our nodes completely locked up under the pressure.
Below is our cloudwatch for SELECT MIN(VolumeIdleTime) FROM "AWS/EBS"
We use VRL extensively to encrich logs and sink to Splunk/HTTP
hello @pront , any news regarding this bug? If you need more info or you need a help with debugging please let me know.