vector icon indicating copy to clipboard operation
vector copied to clipboard

Vector CPU Usage increased 0.50.0

Open omarghader opened this issue 2 months ago • 6 comments

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

Hello, I have migrated from vector 0.49.0 to 0.50.0 recently. When updated, I have noticed an increase of the CPU usage. The CPU average increase were by 1.4 cpu per VM approximately, from 5.4-> 6.8. Even though, the number of metrics incoming has not changed. Reverting to 0.49.0 seems to fix the issue.

I have updated 2 independent clusters, and both has behaved the same way (as you can see in the image below).

Each vector is installed on a separate VM. I have 8 vector instances (VM) per cluster. Each VM has :

  • 8 cpu
  • 16 GB RAM.
  • Only metrics are processed in Vector

Configuration

My config is:

data_dir: "{{ vector_data_directory }}"

api:
  enabled: true

enrichment_tables:
  enrichment:
    type: file
    file:
      path: "{{ vector_data_directory }}/enrichment.csv"
      encoding:
        type: csv
sources:
  prometheusreceiver:
    type: prometheus_remote_write
    address: 0.0.0.0:9090
    tls:
      enabled: true
      crt_file: crt
      key_file: key
  internalmetrics:
    type: internal_metrics
    scrape_interval_secs: 10
transforms:
  enrich_internalmetrics:
    type: remap
    inputs:
      - internalmetrics
    source: |
      .tags.host_name = get_hostname!()
      .tags.deployment_environment = "dev"
      .tags.service_name = "vector"
      .tags.service_namespace = "namespace"
      .tags.service_budget_name = "service_budget_name"

  enrich_metrics:
    type: remap
    inputs:
      - prometheusreceiver
    source: |
      # don't enrich counter metrics
      if .name == "my_counter" {
        return .
      }

      # Pre-assign Kubernetes namespace and cluster (using Vector or OTel tags)
      kubernetes_namespace = compact([.tags.namespace, .tags.k8s_namespace_name, .tags.kubernetes_namespace], string: true, nullish: true)[0]

      kubernetes_cluster = compact([.tags.kubernetes_cluster_name, .tags.k8s_cluster_name],  string: true, nullish: true)[0]

      # Set isKubernetes if both exist (non-empty)
      isKubernetes = !is_nullish(kubernetes_namespace) && !is_nullish(kubernetes_cluster)

      selector = ""

      if isKubernetes {
        selector, err = kubernetes_namespace + "." + .tags.kubernetes_cluster_name
        if err != null {
          log(join!([selector, to_string(err)], " "), level: "error", rate_limit_secs: 600)
          return .
        }

      } else {
        # Non-Kubernetes selectors to try
        defaultSelectors = [.tags.instance, .tags.host_name, .tags.host, .tags.deployment_host]

        # remove the the null and the ip selectors
        selectors = filter(defaultSelectors) -> |_i, sel| {
          selectorString = to_string!(sel)
          !is_nullish(selectorString) && match(selectorString, r'my_regex')
        }

        # if no selectors left, don't enrich the metric
        if is_empty(selectors) {
          return .
        }

        selector = selectors[0]

        # When it's a url and not an ip, use the hostname as selector
        if starts_with(string!(selector), "http") {
          url, err = parse_url(string!(selector))
          if err == null && !is_ipv4(string!(url.host)) {
            selector = url.host
          }
        }

        selector = split(string!(selector), ":")[0]

      }

      # Add service_name from app, appl or app_kubernetes_io_name by order
      if !exists(.tags.service_name) {
        service_name = compact([.tags.app, .tags.appl, .tags.app_kubernetes_io_name],  string: true, nullish: true)[0]
        if !is_nullish(service_name) {
          .tags.service_name = service_name
        }
      }

      enrichment, err = get_enrichment_table_record("enrichment", {"name": selector })
      if err != null {
        # if the row is not found, return
        log(join!(["Couldn't find in enrichment:",selector], " "), level: "error", rate_limit_secs: 600)
        return .
      }

      # Add an internal label for the budget name, because service_namespace is sometimes predefined by users
      .tags.service_budget_name = downcase!(enrichment.service_budget_name)

      if !exists(.tags.service_namespace) {
        .tags.service_namespace = downcase!(enrichment.service_budget_name)
      }

      if !exists(.tags.deployment_environment) {
        .tags.deployment_environment = enrichment.context
      }

      if isKubernetes {
        if !exists(.tags.host_name) && exists(.tags.kubernetes_pod_name) {
          .tags.host_name = .tags.kubernetes_pod_name
        }
      } else {
        if !exists(.tags.host_name) {
          .tags.host_name = enrichment.name
        }
      }

  remove_internal_labels:
    type: remap
    inputs:
      - enrich_metrics
      - enrich_internalmetrics
    source: |
      # don't enrich counter metrics
      if .name == "my_counter" {
        return .
      }
      del(.tags.service_budget_name)

sinks:
  prometheus_remote_write1:
    type: prometheus_remote_write
    inputs: &inputs
      - remove_internal_labels
    endpoint: "${REMOTEWRITE_ENDPOINT1}"
    auth: &promauth
      strategy: "basic"
      user: "user"
      password: "${REMOTEWRITE_PASSWORD}"
    buffer: &buffer
      type: memory
      max_events: 20000
    batch: &batch
      max_events: 10000

  prometheus_remote_write2:
    type: prometheus_remote_write
    inputs: *inputs
    endpoint: "${REMOTEWRITE_ENDPOINT2}"
    auth: *promauth
    buffer: *buffer
    batch: *batch

  metrics_counter:
    type: prometheus_remote_write
    inputs:
      - enrich_metrics
      - enrich_internalmetrics
    endpoint: "http://localhost:{{ port }}/count/metrics/prometheus"
    buffer: 
      type: memory
      max_events: 20000
      when_full: drop_newest
    batch:
      max_events: 10000

Version

0.50.0

Debug Output


Example Data

Image

Additional Context

No response

References

No response

omarghader avatar Oct 23 '25 06:10 omarghader

We also noticed it after upgrading from v0.48: Image

Our config:
role: "Agent"
args:
  - "--config-dir"
  - "/etc/vector/"
  - "--allocation-tracing"
image:
  repository: timberio/vector
  base: "debian"

livenessProbe:
  httpGet:
    path: /health
    port: api

readinessProbe:
  httpGet:
    path: /health
    port: api

resources:
  requests:
    memory: 256Mi
    cpu: 200m
  limits:
    memory: 1024Mi
    cpu: 1000m

tolerations:
  - key: "name"
    operator: "Equal"
    value: "system"
    effect: "NoSchedule"

podAnnotations:
  prometheus.io/scrape: "true"
  prometheus.io/scheme: "http"
  prometheus.io/port: "9598"
  enable.version-checker.io/vector: "true"
  use-metadata.version-checker.io/vector: "true"

extraVolumes:
  - name: var-log-journal
    hostPath:
      path: "/var/log/journal"
extraVolumeMounts:
  - name: var-log-journal
    mountPath: "/mnt/host/journal"
    readOnly: true

customConfig:
  data_dir: /vector-data-dir
  expire_metrics_secs: 300
  api:
    enabled: true
    address: 0.0.0.0:8686
    playground: false

  sources:
    pods_logs:
      type: kubernetes_logs
      glob_minimum_cooldown_ms: 2000
      max_line_bytes: 131072
      use_apiserver_cache: true
    internal_logs:
      type: internal_logs
    journald_logs:
      type: journald
      journal_directory: /mnt/host/journal
    internal_metrics:
      type: internal_metrics

  transforms:
    pods_source_types:
      type: remap
      inputs:
        - pods_logs
      source: |-
        .source = "pods_logs"
        .source_environment  = "${environment}"
    vector_internal_source_types:
      type: remap
      inputs:
        - internal_logs
      source: |-
        .source = "vector_agent_internal_logs"
        .source_environment  = "${environment}"
    vector_journald_source_types:
      type: remap
      inputs:
        - journald_logs
      source: |-
        .source = "journald_logs"
        .source_environment  = "${environment}"
        # also fix hostname field as it sometimes for some services defined as "localhost"
        .real_hostname = get_env_var("VECTOR_SELF_NODE_NAME") ?? .host

  sinks:
    to_kafka:
      type: kafka
      inputs:
        - pods_source_types
        - vector_internal_source_types
        - vector_journald_source_types
      bootstrap_servers: ...
      compression: zstd
      encoding:
        codec: "json"
      topic: logs-kubernetes-${environment}
      batch:
        timeout_secs: 1
      buffer:
        max_size: 5368709120
        type: disk

    prometheus_exporter:
      type: prometheus_exporter
      inputs:
        - internal_metrics
      address: 0.0.0.0:9598

pznamensky avatar Oct 23 '25 13:10 pznamensky

Same here upgrading from 0.48.0 to 0.50.0. Vector is running as log collecting agent. Although relatively little absolute cpu consumption the cluster wide average increased by about 100%.

Image
Our config:

Additional note. we are building vector from source like follows:

  && export RUSTFLAGS="-Ctarget-feature=+crt-static -Clto=true -Cembed-bitcode=yes -Clinker=clang-20 -Clinker-plugin-lto=true -Clink-arg=-flto=full -Ccodegen-units=1 -Cstrip=symbols -Cpanic=abort" \
  && . ~/.cargo/env \
  && cargo build \
  --no-default-features \
  --features api,api-client,sources-file,sources-journald,sources-kubernetes_logs,transforms-filter,transforms-log_to_metric,transforms-reduce,transforms-remap,transforms-route,sinks-aws_s3,sinks-prometheus,sinks-socket,sinks-http \
  --target x86_64-unknown-linux-gnu \
  --release

Those are Kubernetes Config Maps mounted as files in the Vector containers.

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: vector-cm
data:
  vector.yaml: |-
    api:
      enabled: true
      playground: false
    expire_metrics_secs: 60
    data_dir: /opt/vector/data/
    enrichment_tables:
      severity_num:
        type: file
        file:
          path: /opt/vector/config/severity_num.csv
          encoding:
            type: csv
            delimiter: "\t"
            include_headers: true
        schema:
          severity: string
          severity_num: int
    sources:
      kubernetes:
        type: kubernetes_logs
        glob_minimum_cooldown_ms: 1000
        self_node_name: ${NODE_NAME}
      host_journal:
        type: journald
        current_boot_only: true
        since_now: true
        journal_directory: /var/log/journal/
    transforms:
      vector_in:
        type: log_to_metric
        inputs:
          - kubernetes
          - host_journal
        metrics:
          - type: counter
            name: logs_in_total
            field: message
            tags:
              source_type: "{{source_type}}"
      node_logs:
        type: remap
        reroute_dropped: true
        inputs: [host_journal]
        source: |
          .cluster = "${K8S_IDENTIFIER}"
          .host = split(get_env_var("NODE_NAME") ?? "no_host", ".")[0]
          .parsing_type = "journald"
      filtered:
        type: filter
        inputs: [kubernetes]
        condition: |
          !is_nullish(.message) && .pod_container != "reverse-proxy"
      merge_multiline:
        type: reduce
        inputs:
          - filtered
        group_by:
          - .kubernetes.container_id
        merge_strategies:
          message: concat_newline
        starts_when: |
          msg = string(.message) ?? ""

          match(msg, r'^[^\s]') # match indented lines like jvm stack trace
          #!starts_with(msg, "[") # match square brace indexed line like graylog
        expire_after_ms: 3000
      route:
        type: route
        inputs: [merge_multiline]
        route:
          json: starts_with(string(.message) ?? "", "{")
          ltsv: starts_with(string(.message) ?? "", "ltsv:1")
          alertlogger: .kubernetes.container_name == "prometheus-alertlogger"
          graylog: .kubernetes.container_name == "graylog"
      parsed_json:
        type: remap
        reroute_dropped: true
        inputs: [route.json]
        file: /opt/vector/config/vrls/parsed_json.vrl
      parsed_ltsv:
        type: remap
        reroute_dropped: true
        inputs: [route.ltsv]
        file: /opt/vector/config/vrls/parsed_ltsv.vrl
      parsed_alertlogger:
        type: remap
        reroute_dropped: true
        inputs: [route.alertlogger]
        file: /opt/vector/config/vrls/parsed_alertlogger.vrl
      parsed_graylog:
        type: remap
        reroute_dropped: true
        inputs: [route.graylog]
        file: /opt/vector/config/vrls/parsed_graylog.vrl
      unparsed:
        type: remap
        reroute_dropped: true
        inputs: [route._unmatched]
        source: |
          msg, err = string(.message)
          if err == null {
            .message = strip_ansi_escape_codes(msg)
          }

          .parsing_type = "unparsed"
      dropped:
        type: remap
        reroute_dropped: true
        inputs:
          - node_logs.dropped
          - unparsed.dropped
          - parsed_json.dropped
          - parsed_ltsv.dropped
          - parsed_alertlogger.dropped
          - parsed_graylog.dropped
        source: .parsing_type = "dropped"
      unified:
        type: remap
        reroute_dropped: true
        inputs:
          - parsed_json
          - parsed_ltsv
          - parsed_alertlogger
          - parsed_graylog
          - unparsed
          - dropped
          - dropped.dropped
        file: /opt/vector/config/vrls/unified.vrl
      trace_filter:
        type: filter
        inputs: [unified]
        condition: |
          .trace_id != null &&
          .span_id != null &&
          .span_name != null &&
          .span_start_timestamp_nanos != null &&
          .span_end_timestamp_nanos != null
      trace_contexts:
        type: route
        inputs: [trace_filter]
        route:
          http_server: |
            ."http.request.method" != null &&
            ."url.path" != null &&
            ."url.scheme" != null
          http_client: |
            ."http.request.method" != null &&
            ."server.address" != null &&
            ."server.port" != null &&
            ."url.full" != null
      http-client-tracing-context:
        type: remap
        inputs: [trace_contexts.http_client]
        source: |
          .span_kind = 3

          .attributes = {
            "http.request.method": ."http.request.method",
            "server.address": ."server.address",
            "server.port": ."server.port",
            "url.full": ."url.full",
          }
      http-server-tracing-context:
        type: remap
        inputs: [trace_contexts.http_server]
        source: |
          .span_kind = 2

          .attributes = {
            "http.request.method": ."http.request.method",
            "url.path": ."url.path",
            "url.scheme": ."url.scheme",
          }
      quickwit-traces:
        type: remap
        inputs:
          - http-server-tracing-context
          - http-client-tracing-context
        file: /opt/vector/config/vrls/quickwit-traces.vrl
      vector_out:
        type: log_to_metric
        inputs:
          - unified
          - node_logs
        metrics:
          - type: counter
            name: logs_out_total
            field: message
            tags:
              parsing_type: "{{parsing_type}}"
              source_type: "{{source_type}}"
    sinks:
      prometheus:
        type: prometheus_exporter
        inputs:
          - vector_in
          - vector_out
        address: 0.0.0.0:9100
        # prometheus is confused by timeseries with timestamps
        # probably occurring when multiple timestamps exists within single scrape (single pod)
        suppress_timestamp: true
      graylog:
        type: socket
        inputs:
          - unified
          - unified.dropped
          - node_logs
        address: ${GRAYLOG_ADDRESS}
        mode: tcp
        encoding:
          codec: gelf
        buffer:
          type: disk
          max_size: 536870912 # 512 Mb
        framing:
          method: character_delimited
          character_delimited:
            delimiter: "\0"
      quickwit_traces:
        type: http
        inputs: [quickwit-traces]
        method: post
        uri: quickwit
        encoding:
          codec: json
        framing:
          method: newline_delimited
        buffer:
          when_full: drop_newest"

  # TODO: this could use build in function to convert int to syslog severity
  # https://vector.dev/docs/reference/vrl/functions/#to_syslog_severity
  severity_num.csv: |-
    severity_num	severity
    7	debug
    7	debg
    7	d
    7	7
    6	sc_info
    6	info
    6	i
    6	6
    5	notice
    5	n
    4	warning
    4	warn
    4	w
    4	4
    3	error
    3	err
    3	eror
    3	e
    3	3
    2	critical
    2	crit
    2	2
    1	alert
    1	1
    0	f
    0	emergency
    0	0
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: vector-vrls
data:
  parsed_json.vrl: |-
    json, err = parse_json(.message)
    if err == null {
      # merge parsed json into message (overwrites fields with json values)
      .parsed_message = json

      .parsing_type = "json"
    } else {
      .parsing_type = "invalid_json"
    }

  parsed_ltsv.vrl: |-
    parsed_kv, err = parse_key_value(
      .message,
      key_value_delimiter: ":",
      field_delimiter: "\t",
      accept_standalone_key: false
    )
    if err == null {
      del(parsed_kv.ltsv)
      .parsed_message = map_values(parsed_kv) -> |value| {
        # order is important
        typed_val, err = to_int(value) ?? to_bool(value) ?? to_float(value)
        if err == null {
          typed_val
        } else {
          tmp_str, err = string(value)
          if err == null {
            tmp_str = replace(tmp_str, "\\n", "\n")
            tmp_str = replace(tmp_str, "\\t", "\t")
            tmp_str = replace(tmp_str, "\\r", "\r")
            tmp_str
          } else {
            value # gets json encoded later
          }
        }
      }

      .parsing_type = "ltsv"
    } else {
      .parsing_type = "invalid_ltsv"
    }

  parsed_graylog.vrl: |-
    parsed_message, err = parse_regex(
      .message,
      # '(?s)' enables multiline mode
      r'(?s)[^ ]+ [^ ]+ (?P<severity>\w+) *: (?P<class>[^ ]+) - (?P<message>.*)'
    )
    if err == null {
      .parsed_message.severity = parsed_message.severity
      .parsed_message.message = parsed_message.message
      .parsed_message.class = parsed_message.class
      .parsing_type = "graylog"
    } else {
      .parsing_type = "invalid_graylog"
    }

  parsed_alertlogger.vrl: |-
    parsed_kv, err = parse_key_value(
      .message,
      field_delimiter: "\t",
      accept_standalone_key: false
    )
    if err == null {
      parsed_kv = map_values(parsed_kv) -> |value| {
        # order is important
        to_int(value) ?? to_bool(value) ?? to_float(value) ?? value
      }

      .parsed_message = parsed_kv
      .parsing_type = "alertlogger"
    } else {
      .parsing_type = "invalid_alertlogger"
    }

  unified.vrl: |-
    # incorporate parsed message
    if exists(.parsed_message) {
      if length(object!(.parsed_message)) > 30 {
        .parsing_type = "too_many_fields"
        .message = "No. of fields > 30 is not allowed."
      } else {
        del(.message)

        # if "message" not set try other value
        if !exists(.parsed_message.message) {
          # use "msg" field
          if exists(.parsed_message.msg) {
            .parsed_message.message = .parsed_message.msg
            del(.parsed_message.msg)
          }
          # use "summary" field
          if exists(.parsed_message.summary) {
            .parsed_message.message = .parsed_message.summary
            del(.parsed_message.summary)
          }
        }

        # kubernetes info are not allowed to be overwritten
        del(.parsed_message.kubernetes)

        . |= map_keys(object(.parsed_message) ?? {}) -> |key| {
          if key != "message" { # "message" key gets malformed sometimes
            replace(key, r'[^a-z0-9.]', "_") # sanitize keys
          } else {
            key
          }
        }

        del(.parsed_message)
      }
    }

    # set common message needed by gelf (if not exist)
    if !exists(.message) {.message = "sc_parsed"}

    # set infrastructure labels
    .host = split(get_env_var("NODE_NAME") ?? "no_host", ".")[0]
    .cluster = "${K8S_IDENTIFIER}"

    if exists(.kubernetes) {
      .pod = .kubernetes.pod_name
      .pod_namespace = .kubernetes.pod_namespace
      .pod_container = .kubernetes.container_name
      .identifier = .kubernetes.container_name
      .container_id = .kubernetes.container_id
    } else {
      .container_id = "undetermined_due_to_missing_pod_metadata"
      # workaround if kubeapi metadata is missing
      # /var/log/pods/sxp-test_test-logs_add544da-b3e9-4379-bc2b-dec169758fce/alpine/1.log
      file_string, err = string(.file)
      if err == null {
        file_fields = split(file_string, r'[/_]')
        .pod = file_fields[5]
        .pod_namespace = file_fields[4]
        .pod_container = file_fields[7]
        .identifier = file_fields[7]
      } else {
        .pod = "unknown"
        .pod_namespace = "unknown"
        .pod_container = "unknown"
        .identifier = "unknown"
      }
    }

    # remove unused or redundant info
    #del(.file) # keep to allow id of records w/missing pod enrichment metadata
    del(.kubernetes)
    del(.stream)
    del(.timestamp_end)

    # make sure that some timestamp field exists and is of type ts
    .timestamp = timestamp(.timestamp) ?? {
      # parse iso format timestamps ("%+")
      parse_timestamp(.timestamp, format:"%+") ?? now()
    }
    .timestamp_nano = format_timestamp!(.timestamp, format: "%F %T.%f")
    .timestamp_now = format_timestamp!(now(), format: "%F %T.%f")

    # gelf fails if ".level" is not an integer
    # and we want severity instead anyway
    if !exists(.severity) && !is_nullish(.level) {
      .severity = .level
    }
    .severity_num = get_enrichment_table_record(
      "severity_num", {"severity": .severity}, case_sensitive: false
    ).severity_num ?? -1
    .severity = to_syslog_level(.severity_num) ?? ""
    del(.level)

    if !exists(.parsing_type) { .parsing_type = "missed" }

    # gelf doesn't handle special types -> stringify
    . = map_values(., false) -> |value| {
      value = if is_object(value) || is_array(value) || is_null(value) || is_boolean(value) {
        encode_json(value)
      } else {
        value
      }

      # handle fields being to long
      str, err = string(value)
      if err == null && strlen(str) > 32752 {
        slice!(str, start: 0, end: 32752) + " ... truncated"
      } else {
        value
      }
    }

    # some fields are used by different applications with different types -> coerse them
    # because of previous json transform this should never fail
    if exists(.id) { .id = to_string(.id) ?? "this value must be string but was not" }
    if exists(.status) { .status = to_string(.status) ?? "this value must be string but was not" }

    # fail safe if anything went wrong (without graylog would drop the message)
    if is_nullish(.message) {.message = "missing message"}

  quickwit-traces.vrl: |-
    .attributes.pod_namespace = .pod_namespace

    _span_status = {
      "code": string(."span_status.code") ?? "unset",
      "message": string(."span_status.message") ?? "",
    }

    if _span_status.code != "error" {
      if _span_status.message != "" || (.serverity_num <= 3 && .serverity_num > -1 ?? false) {
        _span_status.code = "error"
      } else {
        del(_span_status.message)
      }
    }

    parent_id = if .message.parent_id == "" {
      null
    } else {
      .message.parent_id
    }

    . = {
      "trace_id": .trace_id,
      "parent_span_id": parent_id, # graylog legacy naming
      "service_name": .pod_container,
      "scope_name": .pod,
      "scope_version": .scope_version,
      "span_id": .span_id,
      "span_kind": .span_kind,
      "span_name": .span_name,
      "span_start_timestamp_nanos": .span_start_timestamp_nanos,
      "span_end_timestamp_nanos": .span_end_timestamp_nanos,
      "span_attributes": .attributes,
      "span_status": _span_status,
    }

scMarkus avatar Nov 04 '25 20:11 scMarkus

Hi all, please share your Vector configurations and any other details and metrics you have available so we can try to reproduce this on our side.

pront avatar Nov 04 '25 21:11 pront

@pront I have edited my previous comment and added the config.

scMarkus avatar Nov 07 '25 13:11 scMarkus

We've also noticed a significant increase in io and rolled back v0.50. We use Vector for log forwarding of kubernetes logs (up to around 250MB/s per region). In two separate incidents our nodes completely locked up under the pressure.

Below is our cloudwatch for SELECT MIN(VolumeIdleTime) FROM "AWS/EBS"

Image

We use VRL extensively to encrich logs and sink to Splunk/HTTP

matt-simons avatar Nov 20 '25 09:11 matt-simons

hello @pront , any news regarding this bug? If you need more info or you need a help with debugging please let me know.

omarghader avatar Dec 10 '25 11:12 omarghader