peerdiscovery icon indicating copy to clipboard operation
peerdiscovery copied to clipboard

use a deadline to check for exit

Open fzwoch opened this issue 2 years ago • 3 comments

A proposal. Use a deadline for the read operation in the Go routine. In case the routine is supposed to shut down but no traffic is happening on the multicast address this routine never gets the chance to quit.

Instead time out the read operation after 5 seconds of no data and check whether shut down is expected.

~~Note that this is done against master branch. In case #29 gets accepted in some form the check for shutdown should be extended here too.~~

fzwoch avatar Dec 20 '21 13:12 fzwoch

Here would be another approach to fix it.

Let the main loop handler close the socket of the listen() go routine. This example lacks proper locking of the connection variable, but for demonstration it should be okay.

diff --git a/peerdiscovery.go b/peerdiscovery.go
diff --git a/peerdiscovery.go b/peerdiscovery.go
index e41526f..ce5c291 100644
--- a/peerdiscovery.go
+++ b/peerdiscovery.go
@@ -81,6 +81,7 @@ type peerDiscovery struct {
        received map[string][]byte
        sync.RWMutex
        exit bool
+       c    net.PacketConn
 }
 
 // initialize returns a new peerDiscovery object which can be used to discover peers.
@@ -259,6 +260,10 @@ func Discover(settings ...Settings) (discoveries []Discovered, err error) {
                }
        }
 
+       if p.c != nil {
+               p.c.Close()
+       }
+
        if !s.DisableBroadcast {
                payload := p.settings.Payload
                if p.settings.PayloadFunc != nil {
@@ -323,7 +328,7 @@ func (p *peerDiscovery) listen() (recievedBytes []byte, err error) {
        if err != nil {
                return
        }
-       defer c.Close()
+       p.c = c
 
        group := p.settings.multicastAddressNumbers
        var p2 NetPacketConn

Here is some example code to trigger the issue. (That is.. if there is no other process sending broadcasts that will shut down the listen() routine).

package main

import (
	"log"
	"time"

	"github.com/schollz/peerdiscovery"
)

func main() {
	count := 0

	for {
		log.Println("loop:", count)
		count++

		discover := make(chan struct{})

		go peerdiscovery.Discover(peerdiscovery.Settings{
			TimeLimit:        -1,
			StopChan:         discover,
			AllowSelf:        true,
			DisableBroadcast: true,
		})

		time.Sleep(time.Millisecond * 10)
		close(discover)
	}
}

On my Linux test socket file handles and memory grows up to the 1000th loop then it would silently fail at:

	c, err := net.ListenPacket(fmt.Sprintf("udp%d", p.settings.IPVersion), address)
	if err != nil {
		return
	}

I'm pretty sure this is also very close to #11.

fzwoch avatar Feb 10 '22 11:02 fzwoch

I guess that this block is intended to shut down the routine:

	if !s.DisableBroadcast {
		payload := p.settings.Payload
		if p.settings.PayloadFunc != nil {
			payload = p.settings.PayloadFunc()
		}
		// send out broadcast that is finished
		broadcast(p2, payload, ifaces, &net.UDPAddr{IP: group, Port: portNum})
	}

Which obviously won't trigger for DisableBroadcast: true. Since this routine seems to send the same data as the regular broadcast (and not a special BYE message?) it seems redundant when closing the routine's socket manually.

fzwoch avatar Feb 10 '22 11:02 fzwoch

Or, maybe the simplest idea - reuse the already existing connection for listening as well. Not sure if that has any side effects?

--- peerdiscovery.go	2022-02-17 11:00:41.608374260 +0100
+++ peerdiscovery.go	2022-02-17 11:19:40.510144732 +0100
@@ -227,7 +227,7 @@
 		p2.JoinGroup(&ifaces[i], &net.UDPAddr{IP: group, Port: portNum})
 	}
 
-	go p.listen()
+	go p.listen(c)
 	ticker := time.NewTicker(tickerDuration)
 	defer ticker.Stop()
 	start := time.Now()
@@ -301,9 +301,8 @@
 
 // Listen binds to the UDP address and port given and writes packets received
 // from that address to a buffer which is passed to a hander
-func (p *peerDiscovery) listen() (recievedBytes []byte, err error) {
+func (p *peerDiscovery) listen(c net.PacketConn) (recievedBytes []byte, err error) {
 	p.RLock()
-	address := net.JoinHostPort(p.settings.MulticastAddress, p.settings.Port)
 	portNum := p.settings.portNum
 	allowSelf := p.settings.AllowSelf
 	timeLimit := p.settings.TimeLimit
@@ -318,13 +317,6 @@
 	}
 	// log.Println(ifaces)
 
-	// Open up a connection
-	c, err := net.ListenPacket(fmt.Sprintf("udp%d", p.settings.IPVersion), address)
-	if err != nil {
-		return
-	}
-	defer c.Close()
-
 	group := p.settings.multicastAddressNumbers
 	var p2 NetPacketConn
 	if p.settings.IPVersion == IPv4 {

fzwoch avatar Feb 18 '22 14:02 fzwoch

I have updated the PR to use the latest proposal. It seemed to work fine in my use case - but that may not cover everyone's.

@schollz do you have an opinion on that one?

fzwoch avatar Nov 05 '22 10:11 fzwoch

Yeah it looks okay, I will have to run it against some tests

schollz avatar Nov 06 '22 22:11 schollz

Is there any chance of this getting merged? I've seen the Discover method taking up memory gradually over time, after running for months (looking very similar to #11).

image

Hopefully, this will also resolve this problem.

maddie avatar Jul 05 '23 02:07 maddie

Looks good, thanks!!!

schollz avatar Nov 09 '23 14:11 schollz