telegraf icon indicating copy to clipboard operation
telegraf copied to clipboard

feat(processors.snmp_lookup): New SNMP lookup processor

Open Hipska opened this issue 2 years ago • 19 comments

Required for all PRs

  • [x] Updated associated README.md.
  • [x] Wrote appropriate unit tests.
  • [x] Pull request title or commits are in conventional commit format
  • closes #8011

Add a new, more generic SNMP lookup processor that will deprecate the ifname processor as the same results can be achieved.

Hipska avatar Oct 31 '23 16:10 Hipska

As proposed on Slack, this PoC implementation might be an alternative and easier to read IMO

package main

import (
	"errors"
	"fmt"
	"math/rand"
	"sync"
	"time"

	"github.com/alitto/pond"
	"github.com/hashicorp/golang-lru/v2/expirable"
)

var ErrNotYetAvailable = errors.New("data not yet available")

var minTimeBetweenUpdates = 5 * time.Second

type metric struct {
	agent string
	index string
	value string
	ts    time.Time
}

var hold map[string][]metric
var holdLock sync.Mutex

type tagMap struct {
	created time.Time
	rows    map[string]string
}

type store struct {
	cache        *expirable.LRU[string, tagMap]
	pool         *pond.WorkerPool
	inflight     sync.Map
	backlog      map[string]time.Time
	backlogTimer *time.Timer
	notify       func(string)
	sync.Mutex
}

func newStore() *store {
	return &store{
		cache:   expirable.NewLRU[string, tagMap](10, nil, 20*time.Second),
		pool:    pond.New(3, 0, pond.MinWorkers(10)),
		backlog: make(map[string]time.Time),
	}
}

func (s *store) addBacklog(agent string, earliest time.Time) {
	s.Lock()
	defer s.Unlock()
	fmt.Printf("  - adding backlog for agent %s\n", agent)
	t, found := s.backlog[agent]
	if !found || t.After(earliest) {
		s.backlog[agent] = earliest
		s.refreshTimer()
	}
}

func (s *store) removeBacklog(agent string) {
	s.Lock()
	defer s.Unlock()
	fmt.Printf("  - removing backlog for agent %s\n", agent)
	delete(s.backlog, agent)
	s.refreshTimer()
}

func (s *store) refreshTimer() {
	fmt.Println("  - refreshing timer")

	if s.backlogTimer != nil {
		s.backlogTimer.Stop()
	}
	if len(s.backlog) == 0 {
		return
	}
	var agent string
	var earliest time.Time
	for k, t := range s.backlog {
		if agent == "" || t.Before(earliest) {
			agent = k
			earliest = t
		}
	}
	s.backlogTimer = time.AfterFunc(earliest.Sub(time.Now()), func() { s.enqueue(agent) })
}

func (s *store) enqueue(agent string) {
	fmt.Printf("  - enqueuing agent %s\n", agent)
	s.pool.Submit(func() {
		if _, inflight := s.inflight.LoadOrStore(agent, true); inflight {
			fmt.Println("  -> already in-flight...")
			return
		}
		s.cache.Add(agent, updateAgent(agent))
		s.removeBacklog(agent)
		if s.notify != nil {
			s.notify(agent)
		}
		s.inflight.Delete(agent)
	})
}

func (s *store) lookup(agent string, index string) (string, error) {
	entry, cached := s.cache.Peek(agent)
	if !cached {
		fmt.Println("  * not cached")
		// There is no cache at all, so we need to enqueue an update.
		s.enqueue(agent)
		return "", ErrNotYetAvailable
	}

	value, found := entry.rows[index]
	if !found {
		// The index does not exist, therefore we need to update the
		// agent as it maybe appeared in the meantime
		if time.Since(entry.created) > minTimeBetweenUpdates {
			fmt.Println("  * not cached pause passed")
			// The minimum time between updates has passed so we are good to
			// directly update the cache.
			s.enqueue(agent)
			return "", ErrNotYetAvailable
		} else {
			fmt.Println("  * not cached deferring")
			// The minimum time between updates has not yet passed so we
			// need to defer the agent update to later.
			s.addBacklog(agent, entry.created.Add(minTimeBetweenUpdates))
			return "", ErrNotYetAvailable
		}
	}

	return value, nil
}

func (s *store) destroy() {
	s.pool.StopAndWait()
}

func process(s *store, m metric) {
	v, err := s.lookup(m.agent, m.index)
	if errors.Is(err, ErrNotYetAvailable) {
		fmt.Println("=> holding back metric")
		holdLock.Lock()
		hold[m.agent] = append(hold[m.agent], m)
		holdLock.Unlock()
		return
	}
	m.value = v
	fmt.Printf("=> releasing: %s.%s: %q (%s)\n", m.agent, m.index, m.value, m.ts)
}

func main() {
	hold = make(map[string][]metric)
	s := newStore()
	s.notify = func(agent string) {
		holdLock.Lock()
		heldMetrics, found := hold[agent]
		if !found {
			holdLock.Unlock()
			return
		}
		delete(hold, agent)
		holdLock.Unlock()

		for _, m := range heldMetrics {
			process(s, m)
		}
	}

	// Randomly lookup entries every second
	ticker := time.NewTicker(time.Second)
	for range ticker.C {
		m := metric{
			agent: fmt.Sprintf("agent%d", rand.Intn(3)),
			index: fmt.Sprintf("index%d", rand.Intn(10)),
			ts:    time.Now(),
		}
		fmt.Printf("Looking up metric %s.%s (%s)\n", m.agent, m.index, m.ts)
		process(s, m)
	}
	s.destroy()
}

func updateAgent(agent string) tagMap {
	fmt.Printf("Updating agent %s\n", agent)
	time.Sleep(5 * time.Second)
	return tagMap{
		created: time.Now(),
		rows: map[string]string{
			"index0": agent + "_0",
			"index1": agent + "_1",
			"index2": agent + "_2",
			"index3": agent + "_3",
			"index4": agent + "_4",
			"index5": agent + "_5",
			"index6": agent + "_6",
			"index7": agent + "_7",
			"index8": agent + "_8",
			"index9": agent + "_9",
		},
	}
}

srebhan avatar Nov 10 '23 16:11 srebhan

!retry-failed

Hipska avatar Nov 27 '23 14:11 Hipska

@powersj Sure, I already did and had conversation with him about it: https://github.com/Super-Visions/telegraf/compare/feat/processors/snmp_lookup...Super-Visions:telegraf:feat/processors/snmp_lookup_store But I also got questions and got somewhat "ghosted" on it..

On the other hand, I incorporated the remarks of it being hard to read and improved the code in this PR so that it is better to read. So I really would like to have a full review on the current state of this PR to see if it is still that hard to read..

Hipska avatar Dec 08 '23 10:12 Hipska

But I also got questions and got somewhat "ghosted" on it..

Where are these questions? I do not see any unresolved conversations.

So I really would like to have a full review on the current state of this PR to see if it is still that hard to read..

My review yesterday was done with the intention to determine whether we wanted to grab your code as is or continue to push you to refactor this. You have my answer.

powersj avatar Dec 08 '23 16:12 powersj

Where are these questions? I do not see any unresolved conversations.

He was smart enough to do that in private chat on Slack, but you can see here in the conversations as well where I closed them because of lack of response.

My review yesterday was done with the intention to determine whether we wanted to grab your code as is or continue to push you to refactor this. You have my answer.

Sad to hear that, please comment/identify the parts that are still hard to read/understand. IMHO, there are much more complicated plugins/code sections already included in telegraf who don't get such pushback in the PRs.

Hipska avatar Dec 11 '23 11:12 Hipska

!retry-failed

Hipska avatar Dec 11 '23 15:12 Hipska

!retry-checks

Hipska avatar Dec 11 '23 15:12 Hipska

He was smart enough to do that in private chat on Slack, but you can see here in the conversations as well where I closed them because of lack of response.

I cannot help if things are done in private.

please comment/identify the parts that are still hard to read/understand.

I already did in my original review.

there are much more complicated plugins/code sections already included in telegraf who don't get such pushback in the PRs.

I understand it can be hard to read feedback on a PR, however that is an unacceptable justification. You were given the code of how to resolve the concern. If that will not work I would ask that you express why on this PR.

powersj avatar Dec 11 '23 22:12 powersj

I already did in my original review.

Okay, I added some extra comments to the mentioned method and others as well. I hope this makes it more clear.

I understand it can be hard to read feedback on a PR, however that is an unacceptable justification.

Yes indeed, it was just to show how i'm not getting why this PR is getting so much pushback. As shown, I'm willing to adapt.

You were given the code of how to resolve the concern. If that will not work I would ask that you express why on this PR.

I already expressed that I'm willing to adapt to that as well, see the new created branch: Super-Visions/telegraf@snmp_lookup_store. Can you have a look at that and see if that is going in a good direction or not?

The 2 problems I have with the proposed solution with the store are:

  • If a given index can't be found on the device, the metric is added to a backlog. Some situations with misconfigured/misbehaving systems produce an index value that simply doesn't exist, so in that case the metric is kept forever? The backlog itself is also not a thing I want to introduce, most other processors I checked that are doing a lookup just return the metric as is when the key can't be found. (reverse_dns, ifname, port_name, lookup, aws_ec2)
  • Processors that are doing external or long operations on the Add method usually use the plugins/common/parallel package where the metrics get processed in parallel, but provide the option to keep the metrics in order as received (FIFO). This order is very important for some outputs. The proposed solution does not provide this. Also partly because of the backlog thing, but mainly because of the newly introduced WorkerPool from alitto/pond, which can't guarantee the order of metrics..

Hipska avatar Dec 12 '23 11:12 Hipska

Can someone give feedback to my 2 concerns here please: https://github.com/influxdata/telegraf/pull/14223#issuecomment-1851842807

Hipska avatar Jan 15 '24 10:01 Hipska

Anyone please? I would really would like to get this landed before next release, but if I don't get responses weeks later, I'm afraid we will mis the next one as well..

Hipska avatar Jan 22 '24 14:01 Hipska

[...] The 2 problems I have with the proposed solution with the store are:

  • If a given index can't be found on the device, the metric is added to a backlog. Some situations with misconfigured/misbehaving systems produce an index value that simply doesn't exist, so in that case the metric is kept forever? The backlog itself is also not a thing I want to introduce, most other processors I checked that are doing a lookup just return the metric as is when the key can't be found. (reverse_dns, ifname, port_name, lookup, aws_ec2)

IIRC each of the metrics get notified if an agent's table is updated, at this point we can drop metrics/send metrics unmodified if the index does not exist. If the cache is already up-to-date, we can do the above right-away without adding the metrics to the backlog...

  • Processors that are doing external or long operations on the Add method usually use the plugins/common/parallel package where the metrics get processed in parallel, but provide the option to keep the metrics in order as received (FIFO). This order is very important for some outputs. The proposed solution does not provide this. Also partly because of the backlog thing, but mainly because of the newly introduced WorkerPool from alitto/pond, which can't guarantee the order of metrics..

Hmmm, that's actually an issue as we then would need to keep all incoming metrics in a queue and then release them in order as soon as we can resolve them... Not sure if we can use the parallel think easily with my suggested implementation...

Let me think a bit about the second point...

srebhan avatar Jan 24 '24 20:01 srebhan

@Hipska I set-up a proof-of-concept here: https://github.com/srebhan/telegraf/tree/snmp_lookup_processor_poc. It is built on your code but with the cache I proposed... Happy to hear your opinion!

If you like it, I'm happy to push it to your repo if possible...

srebhan avatar Jan 29 '24 18:01 srebhan

Yeah, looks like this is a good direction. I think you will need to create a PR at our fork in order to get it added to this branch.

I still don't know why the preference of an external module instead of the internal parallel. Is the intention to deprecate this and also remove from the other plugins using this?

Hipska avatar Jan 30 '24 11:01 Hipska

@Hipska the issue with parallel is that it can only take function with a metric argument returning metrics. What we want here is a parallelism in agent-connections and this is only doable in parallel if we are working around the initial intention of being metric-parallel.

For the code, you could also cherry-pick the two commits as the code is based on your branch, this should be straight forward. If you prefer me to create a PR, please let me know!

srebhan avatar Jan 30 '24 14:01 srebhan

!retry-failed

Hipska avatar Feb 02 '24 15:02 Hipska

@powersj @srebhan any update? I would rather not miss the next feature release.

Hipska avatar Feb 14 '24 16:02 Hipska

My understanding is the next step was the refactoring PR you recently put up. We can take a look shortly.

powersj avatar Feb 15 '24 18:02 powersj

@srebhan @powersj I think this is ready for a final review. I have it now running fine on DEV with 5 instances of the plugin.

Hipska avatar Feb 23 '24 13:02 Hipska

@srebhan @powersj Please, there's only 2 weeks left for next feature release.

Hipska avatar Feb 26 '24 08:02 Hipska

Done, moved deprecation to #14899.

Hipska avatar Feb 27 '24 08:02 Hipska