cluster-logging-operator icon indicating copy to clipboard operation
cluster-logging-operator copied to clipboard

Log forwarder input rate limits (maxRecordsPerSecond) are not honored by the collector (Vector)

Open rjeczkow opened this issue 5 months ago • 3 comments

Describe the bug

I'm following:

https://docs.openshift.com/container-platform/4.15/observability/logging/performance_reliability/logging-flow-control-mechanisms.html#logging-set-input-rate-limit_logging-flow-control-mechanisms

Adding maxRecordsPerSecond limit for specific namespace to the ClusterLogForwarder is not honored.

spec:
  inputs:
  - application:
      containerLimit:
        maxRecordsPerSecond: 5
      namespaces:
      - test-ns
  • Checking the vector.toml from collector-config secret.
oc get secret/collector-config -n openshift-logging

from vector.toml

(...)
# Logs from containers (including openshift containers)
[sources.input_test_ns_container]
type = "kubernetes_logs"
max_read_bytes = 3145728
glob_minimum_cooldown_ms = 15000
auto_partial_merge = true
include_paths_glob_patterns = ["/var/log/pods/test-ns_*/*/*.log"]
exclude_paths_glob_patterns = ["/var/log/pods/*/*/*.gz", "/var/log/pods/*/*/*.log.*", "/var/log/pods/*/*/*.tmp", "/var/log/pods/default_*/*/*.log", "/var/log/pods/kube*_*/*/*.log", "/var/log/pods/openshift*_*/*/*.log"]
pod_annotation_fields.pod_labels = "kubernetes.labels"
pod_annotation_fields.pod_namespace = "kubernetes.namespace_name"
pod_annotation_fields.pod_annotations = "kubernetes.annotations"
pod_annotation_fields.pod_uid = "kubernetes.pod_id"
pod_annotation_fields.pod_node_name = "hostname"
namespace_annotation_fields.namespace_uid = "kubernetes.namespace_id"
rotate_wait_ms = 5000

[transforms.input_test_ns_container_throttle]
type = "throttle"
inputs = ["input_test_ns_container"]
window_secs = 1
threshold = 5
key_field = "{{ file }}"

[transforms.input_test_ns_container_viaq]
type = "remap"
inputs = ["input_test_ns_container_throttle"]
source = '''
  .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}"
  if !exists(.level) {
    .level = "default"
    if match!(.message, r'Warning|WARN|^W[0-9]+|level=warn|Value:warn|"level":"warn"|<warn>') {
      .level = "warn"
    } else if match!(.message, r'Error|ERROR|^E[0-9]+|level=error|Value:error|"level":"error"|<error>') {
      .level = "error"
    } else if match!(.message, r'Critical|CRITICAL|^C[0-9]+|level=critical|Value:critical|"level":"critical"|<critical>') {
      .level = "critical"
    } else if match!(.message, r'Debug|DEBUG|^D[0-9]+|level=debug|Value:debug|"level":"debug"|<debug>') {
      .level = "debug"
    } else if match!(.message, r'Notice|NOTICE|^N[0-9]+|level=notice|Value:notice|"level":"notice"|<notice>') {
      .level = "notice"
    } else if match!(.message, r'Alert|ALERT|^A[0-9]+|level=alert|Value:alert|"level":"alert"|<alert>') {
      .level = "alert"
    } else if match!(.message, r'Emergency|EMERGENCY|^EM[0-9]+|level=emergency|Value:emergency|"level":"emergency"|<emergency>') {
      .level = "emergency"
    } else if match!(.message, r'(?i)\b(?:info)\b|^I[0-9]+|level=info|Value:info|"level":"info"|<info>') {
      .level = "info"
  	}
  }
  pod_name = string!(.kubernetes.pod_name)
  if starts_with(pod_name, "eventrouter-") {
    parsed, err = parse_json(.message)
    if err != null {
      log("Unable to process EventRouter log: " + err, level: "info")
    } else {
      ., err = merge(.,parsed)
      if err == null && exists(.event) && is_object(.event) {
          if exists(.verb) {
            .event.verb = .verb
            del(.verb)
          }
          .kubernetes.event = del(.event)
          .message = del(.kubernetes.event.message)
          set!(., ["@timestamp"], .kubernetes.event.metadata.creationTimestamp)
          del(.kubernetes.event.metadata.creationTimestamp)
  		. = compact(., nullish: true)
      } else {
        log("Unable to merge EventRouter log message into record: " + err, level: "info")
      }
    }
  }
  del(.source_type)
  del(.stream)
  del(.kubernetes.pod_ips)
  del(.kubernetes.node_labels)
  del(.timestamp_end)
  ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts}
'''

# Set log_type
[transforms.input_test_ns_viaq_logtype]
type = "remap"
inputs = ["input_test_ns_container_viaq"]
source = '''
  .log_type = "application"
'''

Environment

  • OKD 4.15.0-0.okd-2024-03-10-010116
  • Red Hat OpenShift Logging 5.9.5
  • Loki Operator 5.9.5

Logs

Expected behavior No more than 5 log lines per second should be collected (Checking in the console 'Aggregated logs' tab in the pod view)

Actual behavior Number of log lines per second much higher visible in the 'Aggregated logs' tab.

To Reproduce Steps to reproduce the behavior:

  1. Add containerLimit to the ClusterLogForwarder instance:
spec:
  inputs:
  - application:
      containerLimit:
        maxRecordsPerSecond: 5
      namespaces:
      - test-ns
    name: test-ns
  1. Create a pod in the namespace test-ns that writes few hundred lines per second to the STDOUT. It might be a loop:
apiVersion: v1
kind: Pod
metadata:
  name: test-pod
  namespace: test-ns
spec:
  containers:
  - name: test-container
    image: alpine
    command: [ "/bin/sh", "-c", "--" ]
    args: [ "while true; do echo `date`; done;" ]
  1. Check how many lines per second are visible in the 'Aggregated Logs' tab (histogram will help): ~400 log lines per second are visible (should be <= 5): image

Additional context Add any other context about the problem here.

rjeczkow avatar Sep 18 '24 09:09 rjeczkow