stanza
stanza copied to clipboard
GoFlow stop fail bug
Describe the bug this goroutine will never stop,because flow.FlowRoutine also has an Infinite loop。 see codes,
- operator/builtin/input/goflow/goflow.go:Start() lin109
func (n *GoflowInput) Start() error {
n.ctx, n.cancel = context.WithCancel(context.Background())
go func() {
...
for {
n.Infof("Starting Goflow on %s:%d in %s mode", n.address, n.port, n.mode)
switch n.mode {
case modeSflow:
flow := &utils.StateSFlow{Transport: n, Logger: n}
goflowErr = flow.FlowRoutine(n.workers, n.address, n.port, reuse)
case modeNetflowV5:
flow := &utils.StateNFLegacy{Transport: n, Logger: n}
goflowErr = flow.FlowRoutine(n.workers, n.address, n.port, reuse)
case modeNetflowIPFIX:
flow := &utils.StateNetFlow{Transport: n, Logger: n}
goflowErr = flow.FlowRoutine(n.workers, n.address, n.port, reuse)
}
// never exec
select {
case <-n.ctx.Done():
return
default:
}
...
}
}()
return nil
}`
2. C:/Users/Administrator/go/pkg/mod/github.com/observiq/goflow/[email protected]/utils/utils.go:256
```func UDPRoutine(name string, decodeFunc decoder.DecoderFunc, workers int, addr string, port int, sockReuse bool, logger Logger) error {
ecb := DefaultErrorCallback{
Logger: logger,
}
...
// this also is a ifinite loop
for {
size, pktAddr, _ := udpconn.ReadFromUDP(payload)
payloadCut := make([]byte, size)
copy(payloadCut, payload[0:size])
baseMessage := BaseMessage{
Src: pktAddr.IP,
Port: pktAddr.Port,
Payload: payloadCut,
}
processor.ProcessMessage(baseMessage)
MetricTrafficBytes.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"remote_port": strconv.Itoa(pktAddr.Port),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
}).
Add(float64(size))
MetricTrafficPackets.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"remote_port": strconv.Itoa(pktAddr.Port),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
}).
Inc()
MetricPacketSizeSum.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"remote_port": strconv.Itoa(pktAddr.Port),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
}).
Observe(float64(size))
}
}```
**To Reproduce**
Steps to reproduce the behavior:
1. Go to '...'
5. Click on '....'
6. Scroll down to '....'
7. See error
**Expected behavior**
A clear and concise description of what you expected to happen.
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Environment:**
- OS
- Stanza Version or Commit Hash
**Additional context**
Add any other context about the problem here.