fluent-bit icon indicating copy to clipboard operation
fluent-bit copied to clipboard

Could the `Elasticsearch OUTPUT` Index configuration support dynamic index name (use record accessor) ?

Open 2nfree opened this issue 9 months ago • 6 comments

Hi there, I currently have this issue: I am using a Lua script to dynamically generate the index name for the Index configuration, as shown in the script below:

function label_filter(tag, timestamp, record)
    if record["kubernetes"] == nil then
        return 0, timestamp, record
    end

    local namespace = record["kubernetes"]["namespace_name"]
    local labels = record["kubernetes"]["labels"]
    
    local log_tag = nil
    local index_name = nil

    if labels ~= nil then
        local logging = labels["logging"]
        local component = labels["component"]
        local app = labels and labels["app"]
        
        if logging == "back" then
            log_tag = string.format("java.back.%s.%s", namespace, app)
            if namespace == "backstage" then
                index_name = string.format("k8s-prod-backstage-%s", app)
            elseif namespace == "backstage-test" then
                index_name = string.format("k8s-test-backstage-%s", app)
            end
        elseif logging == "plat" then
            log_tag = string.format("java.%s.%s.%s", logging, namespace, app)
            if namespace == "platform" then
                index_name = string.format("k8s-prod-platform-%s", app)
            elseif namespace == "platform-test" then
                index_name = string.format("k8s-test-platform-%s", app)
            end
        elseif component == "jobmanager" then
            log_tag = string.format("flink.jobmanager.%s.%s", namespace, app)
            index_name = "flink-jobmanager"
        elseif component == "taskmanager" then
            log_tag = string.format("flink.taskmanager.%s.%s", namespace, app)
            index_name = "flink-taskmanager"
        elseif logging == "nginx" then
            log_tag = string.format("nginx.%s", namespace)
            index_name = "kubernetes-ingress-nginx"
        end
    end

    if log_tag then
        record["log_tag"] = log_tag
        record["index_name"] = index_name
        return 2, timestamp, record
    else
        return 0, timestamp, record
    end
end

When I configure Elasticsearch OUTPUT as follows:

[OUTPUT]
      name              es
      match             *
      host              <host>
      port              9200
      index             $index_name
      replace_dots      on
      retry_limit       2
      buffer_size       10mb
      tls               on
      tls.verify        off
      http_user         elastic
      http_passwd       <password>
      suppress_type_name on
      trace_error       on

It doesn't work properly, presumably because the Index configuration doesn't use record accessor But if I use logstash_prefix_key it causes the index name to be $index_name-YYYY.MM.DD which I don't want

logstash_format   on
logstash_prefix_key $index_name

Could the Elasticsearch OUTPUT Index configuration support dynamic index name (use record accessor) ? Or is there any other way ?

2nfree avatar May 27 '24 08:05 2nfree