opentelemetry-operator icon indicating copy to clipboard operation
opentelemetry-operator copied to clipboard

An algorithm for new target allocation, based on the average job algorithm.

Open zh-w opened this issue 1 year ago • 19 comments

Component(s)

target allocator

Is your feature request related to a problem? Please describe.

In the scenario where I use target-allocator, there are usually different jobs, and the metric datapoints of targets for each job vary significantly. For example, there are five types of collection jobs: A, B, C, D, and E. Suppose each job has the same collection interval and each job has 10 targets. The number of datapoints pulled by each target of job A is 1000 (e.g., KSM), for job B it is 100, for jobs C and D it is 50, and for job E it is 10.

At the same time, assume I have 5 collector instances deployed in StatefulSets. When using consistent-hashing or least-weighted algorithms, the targets for each job are not evenly distributed across each collector instance. In the assumed collection scenario, it is possible that collector-0 is assigned 3 targets of job A, while collector-4 is assigned 0 targets of job A. This can result in a significant disparity in the number of datapoints collected by each collector, leading to an imbalance in load.

In my actual use case, this situation occurs quite frequently. Below is a diagram showing the load distribution of collectors in a large cluster I deployed (using the consistent-hashing algorithm), illustrating the extreme imbalance in resource utilization across each collector. image

Describe the solution you'd like

I have implemented a load-balancing algorithm based on jobs. The algorithm is designed as follows:

Describe alternatives you've considered

No response

Additional context

No response

zh-w avatar Jul 15 '24 02:07 zh-w

The algorithm is designed as follows:

if service discovery targets change {
    for each deleted target:
        remove the corresponding target from the original collector without triggering rebalancing
    for each new target:
        for the job's targets, find the collector with the smallest number of assigned targets for that job. If multiple collectors meet the criteria, choose the collector with the smallest total number of targets.
} else if the list of collector instances changes {
    for each new collector:
        add the collector to the list and reassign all targets
    for each removed collector:
        remove the collector from the list and reassign all targets
}

zh-w avatar Jul 15 '24 02:07 zh-w

@zh-w what do the metrics from the target allocator look like for the targets per collector metric? It's possible that the distribution is even, but the memory usage is not in the scenario where some of your scrape targets emit more metrics than others.

jaronoff97 avatar Jul 15 '24 12:07 jaronoff97

I wouldn't really mind if we just made the least-weighted strategy behave this way. @zh-w would that work? Out of the set of collectors with the least amount of targets, we'd pick the one with the least amount of targets from the assigned target's job. Right now we just pick randomly.

Both least-weighted and your proposal have the undesirable property that they're not stable - the actual assignment can depend on the order of target additions and deletions, whereas consistent-hashing and per-node always give the same result. This makes the implementation of the former more complex and requires doing things we'd prefer not to do, like hashing all the target label sets to determine their identities.

swiatekm avatar Jul 18 '24 12:07 swiatekm

I wouldn't really mind if we just made the least-weighted strategy behave this way. @zh-w would that work? Out of the set of collectors with the least amount of targets, we'd pick the one with the least amount of targets from the assigned target's job. Right now we just pick randomly.

Both least-weighted and your proposal have the undesirable property that they're not stable - the actual assignment can depend on the order of target additions and deletions, whereas consistent-hashing and least-weighted always give the same result. This makes the implementation of the former more complex and requires doing things we'd prefer not to do, like hashing all the target label sets to determine their identities.

least-weighted is not stable too, it depend on the order of target additions and deletions.

philchia avatar Aug 01 '24 06:08 philchia

I wouldn't really mind if we just made the least-weighted strategy behave this way. @zh-w would that work? Out of the set of collectors with the least amount of targets, we'd pick the one with the least amount of targets from the assigned target's job. Right now we just pick randomly. Both least-weighted and your proposal have the undesirable property that they're not stable - the actual assignment can depend on the order of target additions and deletions, whereas consistent-hashing and least-weighted always give the same result. This makes the implementation of the former more complex and requires doing things we'd prefer not to do, like hashing all the target label sets to determine their identities.

least-weighted is not stable too, it depend on the order of target additions and deletions.

My bad, I meant per-node there.

swiatekm avatar Aug 06 '24 11:08 swiatekm

@zh-w what do the metrics from the target allocator look like for the targets per collector metric? It's possible that the distribution is even, but the memory usage is not in the scenario where some of your scrape targets emit more metrics than others.

I retested in the my k8s cluster, which consists of about 1000 nodes. The configuration of my collector cr is as follows(Simplified):

exporters:
      prometheusremotewrite:xxx
    extensions:
      health_check: {}
      pprof:
        endpoint: :1888
      promhttp: {}
      zpages:xxx
    processors:
      attributes/insertpod:xxx
      batch: {}
      stats: {}
      groupbyattrs/pod:xxx
      k8sattributes:xxx
      memory_limiter:xxx
      resource:
        attributes:
        - action: insert
          from_attribute: k8s.pod.name
          key: pod
        - action: insert
          from_attribute: k8s.namespace.name
          key: namespace
        - action: insert
          from_attribute: k8s_node_name
          key: k8s.node.name
      resourcedetection:
        detectors:
        - env
        override: false
        timeout: 10s
      prometheus:
        config:
          scrape_configs:
          - job_name: cadvisor xxx
          - job_name: coredns xxx
          - job_name: etcd-cluster xxx
          - job_name: etcd-events xxx
          - job_name: kube-apiserver xxx
          - job_name: kube-scheduler xxx
          - job_name: kubelet xxx
          - job_name: kubelet-resource xxx
          - job_name: node-exporter xxx
          - job_name: ingress-nginx xxx
            scrape_interval: 30s
        target_allocator:
          collector_id: ${POD_NAME}
          endpoint: http://otel-targetallocator
          interval: 30s
      prometheus/internal:
        config:
          scrape_configs:
          - job_name: opentelemetry-collector
            scrape_interval: 10s
            static_configs:
            - targets:
              - ${K8S_POD_IP}:8888
              labels:
                collector_name: '${POD_NAME}'
          - job_name: opentelemetry-target-allocator
            scrape_interval: 30s
            static_configs:
            - targets:
              - otel-targetallocator:80
      zipkin:
        endpoint: 0.0.0.0:9411
    service:
      extensions:
      - health_check
      - pprof
      - zpages
      - promhttp
      - agileauth/vmp
      pipelines:
        metrics/resource:
          exporters:
          - prometheusremotewrite
          processors:
          - memory_limiter
          - batch
          - stats
          - resourcedetection
          - attributes/insertpod
          - groupbyattrs/pod
          - resource
          receivers:
          - prometheus
          - prometheus/internal

When I use the consistent-hashing algorithm, the number of targets for the collector is as follows: image

and the datapoints received by collector distributed as follows(The statistical data was collected by a processor ‘stats’ I implemented): image

When I use the new job-average algorithm, the distribution of target numbers for the collector is as follows: image

the datapoints received by collector distributed as follows: image

zh-w avatar Aug 07 '24 08:08 zh-w

I use metric "opentelemetry_allocator_targets_per_collector" from target-allocator to count the number of targets for the collector,and I added a “job_name” label to this metric for more accurately tally the target number of different jobs。

zh-w avatar Aug 07 '24 08:08 zh-w

The ‘stats’ processor in collector implemented as follows:

func (p *processorImp) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
	rms := md.ResourceMetrics()
	md.DataPointCount()
	for i := 0; i < rms.Len(); i++ {
		metricCount := 0
		metricDpCount := 0
		rm := rms.At(i)
		serviceName, haveServiceName := rm.Resource().Attributes().Get(conventions.AttributeServiceName)
		ilms := rm.ScopeMetrics()
		for j := 0; j < ilms.Len(); j++ {
			ilm := ilms.At(j)
			metricCount += ilm.Metrics().Len()
			ms := ilm.Metrics()
			for k := 0; k < ms.Len(); k++ {
				m := ms.At(k)
				switch m.Type() {
				case pmetric.MetricTypeGauge:
					metricDpCount += m.Gauge().DataPoints().Len()
				case pmetric.MetricTypeSum:
					metricDpCount += m.Sum().DataPoints().Len()
				case pmetric.MetricTypeHistogram:
					metricDpCount += m.Histogram().DataPoints().Len()
				case pmetric.MetricTypeExponentialHistogram:
					metricDpCount += m.ExponentialHistogram().DataPoints().Len()
				case pmetric.MetricTypeSummary:
					metricDpCount += m.Summary().DataPoints().Len()
				}
			}
		}
		if haveServiceName {
			p.telemetry.recordMetricReceived(ctx, metricCount, serviceName.AsString())
			p.telemetry.recordMetricDpReceived(ctx, metricDpCount, serviceName.AsString())
		} else {
			p.telemetry.recordMetricReceived(ctx, metricCount, "unkown-job")
			p.telemetry.recordMetricDpReceived(ctx, metricDpCount, "unkown-job")
		}
	}
	return md, nil
}

Exposed metric of 'stats' processor is as follews:

// labels
processorTagKey      = tag.MustNewKey("processor")
jobTagKey            = tag.MustNewKey("job_name")
// metric
statMetricDpReceived = stats.Int64("metric_datapoints_received", "Counter of metric datapoints received", stats.UnitDimensionless)

zh-w avatar Aug 07 '24 08:08 zh-w

I wouldn't really mind if we just made the least-weighted strategy behave this way. @zh-w would that work? Out of the set of collectors with the least amount of targets, we'd pick the one with the least amount of targets from the assigned target's job. Right now we just pick randomly.

Both least-weighted and your proposal have the undesirable property that they're not stable - the actual assignment can depend on the order of target additions and deletions, whereas consistent-hashing and per-node always give the same result. This makes the implementation of the former more complex and requires doing things we'd prefer not to do, like hashing all the target label sets to determine their identities.

I think there are different use situations。One kind of situation is that targets are relatively stable:for example,collector is deployed to collect stable jobs ,like cadvior、kube-state-metrics、node-exporter etc,targets of those jobs usually related to number of node。In this situation,which target would not frequently adding or deleting,this new algorithm works well。 As for algorithm stability, I think it is hard to balance the effectiveness of load balancing and the stability of algorithms。Maybe effectiveness of load balancing is more important ,which we need to guarantee。

zh-w avatar Aug 07 '24 09:08 zh-w

@zh-w what do the metrics from the target allocator look like for the targets per collector metric? It's possible that the distribution is even, but the memory usage is not in the scenario where some of your scrape targets emit more metrics than others.

I retested in the my k8s cluster, which consists of about 1000 nodes. The configuration of my collector cr is as follows(Simplified):

exporters:
      prometheusremotewrite:xxx
    extensions:
      health_check: {}
      pprof:
        endpoint: :1888
      promhttp: {}
      zpages:xxx
    processors:
      attributes/insertpod:xxx
      batch: {}
      stats: {}
      groupbyattrs/pod:xxx
      k8sattributes:xxx
      memory_limiter:xxx
      resource:
        attributes:
        - action: insert
          from_attribute: k8s.pod.name
          key: pod
        - action: insert
          from_attribute: k8s.namespace.name
          key: namespace
        - action: insert
          from_attribute: k8s_node_name
          key: k8s.node.name
      resourcedetection:
        detectors:
        - env
        override: false
        timeout: 10s
      prometheus:
        config:
          scrape_configs:
          - job_name: cadvisor xxx
          - job_name: coredns xxx
          - job_name: etcd-cluster xxx
          - job_name: etcd-events xxx
          - job_name: kube-apiserver xxx
          - job_name: kube-scheduler xxx
          - job_name: kubelet xxx
          - job_name: kubelet-resource xxx
          - job_name: node-exporter xxx
          - job_name: ingress-nginx xxx
            scrape_interval: 30s
        target_allocator:
          collector_id: ${POD_NAME}
          endpoint: http://otel-targetallocator
          interval: 30s
      prometheus/internal:
        config:
          scrape_configs:
          - job_name: opentelemetry-collector
            scrape_interval: 10s
            static_configs:
            - targets:
              - ${K8S_POD_IP}:8888
              labels:
                collector_name: '${POD_NAME}'
          - job_name: opentelemetry-target-allocator
            scrape_interval: 30s
            static_configs:
            - targets:
              - otel-targetallocator:80
      zipkin:
        endpoint: 0.0.0.0:9411
    service:
      extensions:
      - health_check
      - pprof
      - zpages
      - promhttp
      - agileauth/vmp
      pipelines:
        metrics/resource:
          exporters:
          - prometheusremotewrite
          processors:
          - memory_limiter
          - batch
          - stats
          - resourcedetection
          - attributes/insertpod
          - groupbyattrs/pod
          - resource
          receivers:

Use

I wouldn't really mind if we just made the least-weighted strategy behave this way. @zh-w would that work? Out of the set of collectors with the least amount of targets, we'd pick the one with the least amount of targets from the assigned target's job. Right now we just pick randomly.

Both least-weighted and your proposal have the undesirable property that they're not stable - the actual assignment can depend on the order of target additions and deletions, whereas consistent-hashing and per-node always give the same result. This makes the implementation of the former more complex and requires doing things we'd prefer not to do, like hashing all the target label sets to determine their identities.

I tested least-weighted algorithm @swiatekm , the number of targets for the collector is as follows: image

the datapoints received by collector distributed as follows: image

It seems that a small number of collectors got just one target.

zh-w avatar Aug 07 '24 11:08 zh-w

we met the similar issue, I think the requirement makes sense.

@zh-w hi, could you please explain how the datapoints per target is taken into account in your algorithm? I didn't catch that.

chenlujjj avatar Dec 30 '24 12:12 chenlujjj

We face exactly the same problem, in our case we should even feel relief from just distributing targets per job equally because they usually produce exactly same amount of samples, making loadbalancing samples count aware is nice but could be probably a separate feature not everyone would want

I wonder if it is possible to merge this PR soon or there are some blockers?

123BLiN avatar Feb 21 '25 13:02 123BLiN

The primary blocker is that we don't want to add more allocation strategies which aren't stable. Stability in this context means that:

  1. Restarting the target allocator should yield the same target distribution.
  2. The target distribution should not depend on the order of target changes.

Both consistent-hashing and per-node have this property, while neither least-weighted not the strategy proposed in this PR do. The reason we care about this is that it makes the allocation process much easier to understand, and it allows the target allocator to be scaled horizontally.

I would be very open to improving the consistent-hashing strategy or making it more configurable. In principle, making that strategy take the job name into account would be simple. The fact that it appears to result in uneven distributions in @zh-w's testing is worrying and we should probably start by looking at that. It would help a lot if someone could reproduce this in a unit test.

As it seems like there's a demand for this kind of functionality, I'm also going to tag @open-telemetry/operator-approvers for more opinions. I'll also tag this issue to be discussed during our next SIG meeting on 27.02. Anyone interested in participating in this discussion is welcome to join.

swiatekm avatar Feb 21 '25 15:02 swiatekm

Were you able to discuss this in the SIG?

We just saw the issue this week in a cluster with some pretty busy nginx-ingress pods. Each ingress pod is generating ~250k series and one of our 10 collector replicas is being assigned to scrape 3 of them and it means we’ve had to increase the memory limit on all these collectors.

Just for LOLs we spotted the situation after a collector upgrade because apparently the nginx scrapes were broken in the version of the collector we were running previously, so memory usage doubled (on some pods) when we upgraded and the worst affected collector suddenly had 750k more series to handle. 🤦🏻

We’re now in the situation where the largest out of ten collectors uses 10x the memory of the smallest.

swythan avatar Mar 14 '25 07:03 swythan

The outcome of that discussion was roughly along the lines of what I outlined in my previous comment. I'll summarize in bullet points below:

  • We don't want to have another allocation strategy with the same problems as least-weighted.
  • We're not against making least-weighted take job name into account when allocating.
  • We're also not against making consistent-hashing do the above.
  • More broadly, we're not against making strategies more configurable, but that would require its own issue and would probably take more time to hash out how the API should look like.
  • If consistent-hashing gives unacceptably uneven distributions, we should fix it. A reproduction would be helpful here.

Does that help?

swiatekm avatar Mar 14 '25 13:03 swiatekm

Thanks. Taking job name into account would probably be enough to sort out our issue (multiple nginx-ingress jobs landing on the same collector). I've been looking at the code and I'm struggling to think what that would look like for the consistent-hashing algorithm. It seems like we're just getting really unlucky with the hashing?

If we wanted to push this forward then I guess the first thing to do is to try and extract repro data from our live cluster and see if we can reproduce the behaviour we're seeing "on the bench". We've been gathering the list of jobs allocated to each collector from the TA http interface, but do you know of any way of retrieving the URLs its using for the hashing?

swythan avatar Mar 14 '25 16:03 swythan

Thanks. Taking job name into account would probably be enough to sort out our issue (multiple nginx-ingress jobs landing on the same collector). I've been looking at the code and I'm struggling to think what that would look like for the consistent-hashing algorithm. It seems like we're just getting really unlucky with the hashing?

Right now, the hashing only takes the target url into account. Adding the job name into the hash should in principle result in a more even distribution across that dimension. But that's just a theory I haven't actually tested.

If we wanted to push this forward then I guess the first thing to do is to try and extract repro data from our live cluster and see if we can reproduce the behaviour we're seeing "on the bench". We've been gathering the list of jobs allocated to each collector from the TA http interface, but do you know of any way of retrieving the URLs its using for the hashing?

You can see these via the target allocator's REST API. It has a list of targets for each collector.

swiatekm avatar Mar 14 '25 16:03 swiatekm

The targets list in the return from the REST API lists host & port (e.g. 10.1.4.87:8080). Is that all that is used for the hashing, or do I need the rest of the URL as well?

swythan avatar Mar 18 '25 11:03 swythan

The targets list in the return from the REST API lists host & port (e.g. 10.1.4.87:8080). Is that all that is used for the hashing, or do I need the rest of the URL as well?

Currently, it's just that url: https://github.com/open-telemetry/opentelemetry-operator/blob/3c298b87e466f1438ba2c20baf5d5aab76531adb/cmd/otel-allocator/internal/allocation/consistent_hashing.go#L50.

swiatekm avatar Mar 18 '25 13:03 swiatekm

Any updates on this issue?

The uneven distribution could happen when metrics count of each target has significant difference like this:

count({k8s.cluster.name="clusterA"}) by (job, instance)
Image

chenlujjj avatar Sep 15 '25 12:09 chenlujjj

@chenlujjj this is more of a problem of kube-state-metric being an outsized metric cardinality contributor. I would recommend sharding kube-state-metrics. see here for more

jaronoff97 avatar Sep 15 '25 13:09 jaronoff97

Hi @jaronoff97 , Thanks for your advice of the sharding.

I should have put the problem more generally (not related to KSM specifically)

Just as @swythan and @123BLiN mentioned, since the allocator is not aware of metric count of each target, it's highly possible that some collectors need t o collect much more metrics than others, causing the imbalanced resource usage.

chenlujjj avatar Sep 15 '25 15:09 chenlujjj

I created an unit test in this PR to demonstrate the uneven distribution. From the test output, the max / min samples scraped per collector can be up to 10 times

chenlujjj avatar Sep 15 '25 16:09 chenlujjj

yes, i don't doubt this problem, but as stated above we don't have a way of getting sample counts. The current recommendation is still as @swiatekm said above https://github.com/open-telemetry/opentelemetry-operator/issues/3128#issuecomment-2724668959

jaronoff97 avatar Sep 15 '25 16:09 jaronoff97

@jaronoff97 If I understand correctly, one improvement we can make to the consistent-hashing allocator is, to take the target job name into account when calculating the hash key.

But I think simply changing hashKey like should not work:

hashKey := item.TargetURL + item.JobName

Do we need to maintain a two-layer hashing, i.e., for each job name, allocate the targets of the job to the collectors?

chenlujjj avatar Sep 16 '25 06:09 chenlujjj

Honestly, if the hashing algorithm itself is halfway decent then I think just including the job name in the hash input will just yield a different set of hashes. There's no reason to think it would do anything to consistently improve the situation. In some cases the new distribution might be better, but in some it would be worse.

Is that what you mean?

swythan avatar Sep 16 '25 06:09 swythan

@swythan yes, that's what I mean.

chenlujjj avatar Sep 16 '25 07:09 chenlujjj

as stated above we don't have a way of getting sample counts

Hi @jaronoff97 , I have an rough idea about this:

  • Add a HTTP API to target allocator, like /report_samples, it receives POST requests which reports the sample count of target
  • The prometheus receiver of otel-collector calls /report_samples API to report sample count after scraping metrics
  • Target allocator can assign targets to otel-collectors based on the map[target]samplecount

Do you think it's a doable way?

chenlujjj avatar Sep 17 '25 03:09 chenlujjj

as stated above we don't have a way of getting sample counts

Hi @jaronoff97 , I have an rough idea about this:

* Add a HTTP API to target allocator, like `/report_samples`, it receives POST requests which reports the sample count of target

* The prometheus receiver of otel-collector calls `/report_samples` API to report sample count after scraping metrics

* Target allocator can assign targets to otel-collectors based on the `map[target]samplecount`

Do you think it's a doable way?

If we're going to do that, it's simpler for the target allocator to hit the collector's metrics endpoint and check the value of otelcol_receiver_accepted_metric_points for the prometheus receiver. This introduces significant additional complexity, but I would be willing to accept it as an experimental feature. It's worth noting that this only allows you to detect which collectors are doing the most work, not which targets are the heaviest. I'd try to experiment with your implementation in your environment to verify that it actually does what you expect, first.

swiatekm avatar Sep 17 '25 10:09 swiatekm

It's worth noting that this only allows you to detect which collectors are doing the most work, not which targets are the heaviest.

Would that help us much, then? What could the target allocation algorithm do based on this information?

Do you think there would be any appetite for adding metrics like otelcol_receiver_target_accepted_metric_points to the prometheus receiver that split the metrics by scrape target?

They would be interesting/useful in diagnosing issues like this, as well as potentially useful in driving target allocation. They don't look trivial to add, though, given that the existing metrics seem to be published by a helper shared by all the receivers.

swythan avatar Sep 18 '25 08:09 swythan