stanza icon indicating copy to clipboard operation
stanza copied to clipboard

GoFlow stop fail bug

Open jxplus opened this issue 6 months ago • 0 comments

Describe the bug this goroutine will never stop,because flow.FlowRoutine also has an Infinite loop。 see codes,

  1. operator/builtin/input/goflow/goflow.go:Start() lin109
func (n *GoflowInput) Start() error {
	n.ctx, n.cancel = context.WithCancel(context.Background())

	go func() {
		...
		for {
			n.Infof("Starting Goflow on %s:%d in %s mode", n.address, n.port, n.mode)
			switch n.mode {
			case modeSflow:
				flow := &utils.StateSFlow{Transport: n, Logger: n}
				goflowErr = flow.FlowRoutine(n.workers, n.address, n.port, reuse)
			case modeNetflowV5:
				flow := &utils.StateNFLegacy{Transport: n, Logger: n}
				goflowErr = flow.FlowRoutine(n.workers, n.address, n.port, reuse)
			case modeNetflowIPFIX:
				flow := &utils.StateNetFlow{Transport: n, Logger: n}
				goflowErr = flow.FlowRoutine(n.workers, n.address, n.port, reuse)
			}
                        
                        // never exec
			select {
			case <-n.ctx.Done():
				return
			default:
			}
                       ...
		}
	}()

	return nil
}`

2. C:/Users/Administrator/go/pkg/mod/github.com/observiq/goflow/[email protected]/utils/utils.go:256
```func UDPRoutine(name string, decodeFunc decoder.DecoderFunc, workers int, addr string, port int, sockReuse bool, logger Logger) error {
	ecb := DefaultErrorCallback{
		Logger: logger,
	}
       ...
       // this also is a ifinite loop
	for {
		size, pktAddr, _ := udpconn.ReadFromUDP(payload)
		payloadCut := make([]byte, size)
		copy(payloadCut, payload[0:size])

		baseMessage := BaseMessage{
			Src:     pktAddr.IP,
			Port:    pktAddr.Port,
			Payload: payloadCut,
		}
		processor.ProcessMessage(baseMessage)

		MetricTrafficBytes.With(
			prometheus.Labels{
				"remote_ip":   pktAddr.IP.String(),
				"remote_port": strconv.Itoa(pktAddr.Port),
				"local_ip":    localIP,
				"local_port":  strconv.Itoa(addrUDP.Port),
				"type":        name,
			}).
			Add(float64(size))
		MetricTrafficPackets.With(
			prometheus.Labels{
				"remote_ip":   pktAddr.IP.String(),
				"remote_port": strconv.Itoa(pktAddr.Port),
				"local_ip":    localIP,
				"local_port":  strconv.Itoa(addrUDP.Port),
				"type":        name,
			}).
			Inc()
		MetricPacketSizeSum.With(
			prometheus.Labels{
				"remote_ip":   pktAddr.IP.String(),
				"remote_port": strconv.Itoa(pktAddr.Port),
				"local_ip":    localIP,
				"local_port":  strconv.Itoa(addrUDP.Port),
				"type":        name,
			}).
			Observe(float64(size))
	}
}```

**To Reproduce**
Steps to reproduce the behavior:
1. Go to '...'
5. Click on '....'
6. Scroll down to '....'
7. See error

**Expected behavior**
A clear and concise description of what you expected to happen.

**Screenshots**
If applicable, add screenshots to help explain your problem.

**Environment:**
 - OS
 - Stanza Version or Commit Hash

**Additional context**
Add any other context about the problem here.

jxplus avatar Aug 02 '24 00:08 jxplus