feat(processors.snmp_lookup): New SNMP lookup processor
Required for all PRs
- [x] Updated associated README.md.
- [x] Wrote appropriate unit tests.
- [x] Pull request title or commits are in conventional commit format
- closes #8011
Add a new, more generic SNMP lookup processor that will deprecate the ifname processor as the same results can be achieved.
As proposed on Slack, this PoC implementation might be an alternative and easier to read IMO
package main
import (
"errors"
"fmt"
"math/rand"
"sync"
"time"
"github.com/alitto/pond"
"github.com/hashicorp/golang-lru/v2/expirable"
)
var ErrNotYetAvailable = errors.New("data not yet available")
var minTimeBetweenUpdates = 5 * time.Second
type metric struct {
agent string
index string
value string
ts time.Time
}
var hold map[string][]metric
var holdLock sync.Mutex
type tagMap struct {
created time.Time
rows map[string]string
}
type store struct {
cache *expirable.LRU[string, tagMap]
pool *pond.WorkerPool
inflight sync.Map
backlog map[string]time.Time
backlogTimer *time.Timer
notify func(string)
sync.Mutex
}
func newStore() *store {
return &store{
cache: expirable.NewLRU[string, tagMap](10, nil, 20*time.Second),
pool: pond.New(3, 0, pond.MinWorkers(10)),
backlog: make(map[string]time.Time),
}
}
func (s *store) addBacklog(agent string, earliest time.Time) {
s.Lock()
defer s.Unlock()
fmt.Printf(" - adding backlog for agent %s\n", agent)
t, found := s.backlog[agent]
if !found || t.After(earliest) {
s.backlog[agent] = earliest
s.refreshTimer()
}
}
func (s *store) removeBacklog(agent string) {
s.Lock()
defer s.Unlock()
fmt.Printf(" - removing backlog for agent %s\n", agent)
delete(s.backlog, agent)
s.refreshTimer()
}
func (s *store) refreshTimer() {
fmt.Println(" - refreshing timer")
if s.backlogTimer != nil {
s.backlogTimer.Stop()
}
if len(s.backlog) == 0 {
return
}
var agent string
var earliest time.Time
for k, t := range s.backlog {
if agent == "" || t.Before(earliest) {
agent = k
earliest = t
}
}
s.backlogTimer = time.AfterFunc(earliest.Sub(time.Now()), func() { s.enqueue(agent) })
}
func (s *store) enqueue(agent string) {
fmt.Printf(" - enqueuing agent %s\n", agent)
s.pool.Submit(func() {
if _, inflight := s.inflight.LoadOrStore(agent, true); inflight {
fmt.Println(" -> already in-flight...")
return
}
s.cache.Add(agent, updateAgent(agent))
s.removeBacklog(agent)
if s.notify != nil {
s.notify(agent)
}
s.inflight.Delete(agent)
})
}
func (s *store) lookup(agent string, index string) (string, error) {
entry, cached := s.cache.Peek(agent)
if !cached {
fmt.Println(" * not cached")
// There is no cache at all, so we need to enqueue an update.
s.enqueue(agent)
return "", ErrNotYetAvailable
}
value, found := entry.rows[index]
if !found {
// The index does not exist, therefore we need to update the
// agent as it maybe appeared in the meantime
if time.Since(entry.created) > minTimeBetweenUpdates {
fmt.Println(" * not cached pause passed")
// The minimum time between updates has passed so we are good to
// directly update the cache.
s.enqueue(agent)
return "", ErrNotYetAvailable
} else {
fmt.Println(" * not cached deferring")
// The minimum time between updates has not yet passed so we
// need to defer the agent update to later.
s.addBacklog(agent, entry.created.Add(minTimeBetweenUpdates))
return "", ErrNotYetAvailable
}
}
return value, nil
}
func (s *store) destroy() {
s.pool.StopAndWait()
}
func process(s *store, m metric) {
v, err := s.lookup(m.agent, m.index)
if errors.Is(err, ErrNotYetAvailable) {
fmt.Println("=> holding back metric")
holdLock.Lock()
hold[m.agent] = append(hold[m.agent], m)
holdLock.Unlock()
return
}
m.value = v
fmt.Printf("=> releasing: %s.%s: %q (%s)\n", m.agent, m.index, m.value, m.ts)
}
func main() {
hold = make(map[string][]metric)
s := newStore()
s.notify = func(agent string) {
holdLock.Lock()
heldMetrics, found := hold[agent]
if !found {
holdLock.Unlock()
return
}
delete(hold, agent)
holdLock.Unlock()
for _, m := range heldMetrics {
process(s, m)
}
}
// Randomly lookup entries every second
ticker := time.NewTicker(time.Second)
for range ticker.C {
m := metric{
agent: fmt.Sprintf("agent%d", rand.Intn(3)),
index: fmt.Sprintf("index%d", rand.Intn(10)),
ts: time.Now(),
}
fmt.Printf("Looking up metric %s.%s (%s)\n", m.agent, m.index, m.ts)
process(s, m)
}
s.destroy()
}
func updateAgent(agent string) tagMap {
fmt.Printf("Updating agent %s\n", agent)
time.Sleep(5 * time.Second)
return tagMap{
created: time.Now(),
rows: map[string]string{
"index0": agent + "_0",
"index1": agent + "_1",
"index2": agent + "_2",
"index3": agent + "_3",
"index4": agent + "_4",
"index5": agent + "_5",
"index6": agent + "_6",
"index7": agent + "_7",
"index8": agent + "_8",
"index9": agent + "_9",
},
}
}
!retry-failed
@powersj Sure, I already did and had conversation with him about it: https://github.com/Super-Visions/telegraf/compare/feat/processors/snmp_lookup...Super-Visions:telegraf:feat/processors/snmp_lookup_store But I also got questions and got somewhat "ghosted" on it..
On the other hand, I incorporated the remarks of it being hard to read and improved the code in this PR so that it is better to read. So I really would like to have a full review on the current state of this PR to see if it is still that hard to read..
But I also got questions and got somewhat "ghosted" on it..
Where are these questions? I do not see any unresolved conversations.
So I really would like to have a full review on the current state of this PR to see if it is still that hard to read..
My review yesterday was done with the intention to determine whether we wanted to grab your code as is or continue to push you to refactor this. You have my answer.
Where are these questions? I do not see any unresolved conversations.
He was smart enough to do that in private chat on Slack, but you can see here in the conversations as well where I closed them because of lack of response.
My review yesterday was done with the intention to determine whether we wanted to grab your code as is or continue to push you to refactor this. You have my answer.
Sad to hear that, please comment/identify the parts that are still hard to read/understand. IMHO, there are much more complicated plugins/code sections already included in telegraf who don't get such pushback in the PRs.
!retry-failed
!retry-checks
He was smart enough to do that in private chat on Slack, but you can see here in the conversations as well where I closed them because of lack of response.
I cannot help if things are done in private.
please comment/identify the parts that are still hard to read/understand.
I already did in my original review.
there are much more complicated plugins/code sections already included in telegraf who don't get such pushback in the PRs.
I understand it can be hard to read feedback on a PR, however that is an unacceptable justification. You were given the code of how to resolve the concern. If that will not work I would ask that you express why on this PR.
I already did in my original review.
Okay, I added some extra comments to the mentioned method and others as well. I hope this makes it more clear.
I understand it can be hard to read feedback on a PR, however that is an unacceptable justification.
Yes indeed, it was just to show how i'm not getting why this PR is getting so much pushback. As shown, I'm willing to adapt.
You were given the code of how to resolve the concern. If that will not work I would ask that you express why on this PR.
I already expressed that I'm willing to adapt to that as well, see the new created branch: Super-Visions/telegraf@snmp_lookup_store. Can you have a look at that and see if that is going in a good direction or not?
The 2 problems I have with the proposed solution with the store are:
- If a given index can't be found on the device, the metric is added to a backlog. Some situations with misconfigured/misbehaving systems produce an index value that simply doesn't exist, so in that case the metric is kept forever? The backlog itself is also not a thing I want to introduce, most other processors I checked that are doing a lookup just return the metric as is when the key can't be found. (
reverse_dns,ifname,port_name,lookup,aws_ec2) - Processors that are doing external or long operations on the
Addmethod usually use theplugins/common/parallelpackage where the metrics get processed in parallel, but provide the option to keep the metrics in order as received (FIFO). This order is very important for some outputs. The proposed solution does not provide this. Also partly because of the backlog thing, but mainly because of the newly introducedWorkerPoolfromalitto/pond, which can't guarantee the order of metrics..
Can someone give feedback to my 2 concerns here please: https://github.com/influxdata/telegraf/pull/14223#issuecomment-1851842807
Anyone please? I would really would like to get this landed before next release, but if I don't get responses weeks later, I'm afraid we will mis the next one as well..
[...] The 2 problems I have with the proposed solution with the store are:
- If a given index can't be found on the device, the metric is added to a backlog. Some situations with misconfigured/misbehaving systems produce an index value that simply doesn't exist, so in that case the metric is kept forever? The backlog itself is also not a thing I want to introduce, most other processors I checked that are doing a lookup just return the metric as is when the key can't be found. (
reverse_dns,ifname,port_name,lookup,aws_ec2)
IIRC each of the metrics get notified if an agent's table is updated, at this point we can drop metrics/send metrics unmodified if the index does not exist. If the cache is already up-to-date, we can do the above right-away without adding the metrics to the backlog...
- Processors that are doing external or long operations on the
Addmethod usually use theplugins/common/parallelpackage where the metrics get processed in parallel, but provide the option to keep the metrics in order as received (FIFO). This order is very important for some outputs. The proposed solution does not provide this. Also partly because of the backlog thing, but mainly because of the newly introducedWorkerPoolfromalitto/pond, which can't guarantee the order of metrics..
Hmmm, that's actually an issue as we then would need to keep all incoming metrics in a queue and then release them in order as soon as we can resolve them... Not sure if we can use the parallel think easily with my suggested implementation...
Let me think a bit about the second point...
@Hipska I set-up a proof-of-concept here: https://github.com/srebhan/telegraf/tree/snmp_lookup_processor_poc. It is built on your code but with the cache I proposed... Happy to hear your opinion!
If you like it, I'm happy to push it to your repo if possible...
Yeah, looks like this is a good direction. I think you will need to create a PR at our fork in order to get it added to this branch.
I still don't know why the preference of an external module instead of the internal parallel. Is the intention to deprecate this and also remove from the other plugins using this?
@Hipska the issue with parallel is that it can only take function with a metric argument returning metrics. What we want here is a parallelism in agent-connections and this is only doable in parallel if we are working around the initial intention of being metric-parallel.
For the code, you could also cherry-pick the two commits as the code is based on your branch, this should be straight forward. If you prefer me to create a PR, please let me know!
!retry-failed
@powersj @srebhan any update? I would rather not miss the next feature release.
My understanding is the next step was the refactoring PR you recently put up. We can take a look shortly.
@srebhan @powersj I think this is ready for a final review. I have it now running fine on DEV with 5 instances of the plugin.
@srebhan @powersj Please, there's only 2 weeks left for next feature release.
Done, moved deprecation to #14899.
Download PR build artifacts for linux_amd64.tar.gz, darwin_arm64.tar.gz, and windows_amd64.zip. Downloads for additional architectures and packages are available below.
:relaxed: This pull request doesn't significantly change the Telegraf binary size (less than 1%)