nex-go icon indicating copy to clipboard operation
nex-go copied to clipboard

[Enhancement]: Rework background tasks/timers

Open jonbarrow opened this issue 1 year ago • 4 comments

Checked Existing

  • [X] I have checked the repository for duplicate issues.

What enhancement would you like to see?

(Using the new issue templates flow for this one)

Tagging @DaniElectra for thoughts.

@PabloMK7 has reported that there is another memory leak potentially somewhere in here (the CTGP-7 server apparently crashed from an OOM error). While we have not been able to duplicate the issue, it's still something to look into.

The only real place I can think of that this would happen with our usage of timers and tickers in PRUDPConnection and PendingPacket in the resend scheduler. Timers and tickers can leave resources unfreed and are somewhat annoying to manage between structs and goroutines, making them prone to memory leaks. https://github.com/PretendoNetwork/nex-go/pull/44 was made to address this, and seemed to work, but it's possible this didn't catch everything.

We can remove this reliance on tickers/timers entirely by moving to a context based model. This would also be the more "Go" way of doing things (the timer/ticker pattern was carried over from v1 of this module). The context package provides ways to handle tasks that can be canceled and timed out.

Some good reading on this pattern:

  • https://medium.com/@alireza.stack/cancellation-pattern-implementation-in-go-179706ae772b
  • https://golangbot.com/context-timeout-cancellation
  • https://ayada.dev/posts/background-tasks-in-go

A basic implementation using this pattern could look something like:

package main

import (
	"context"
	"fmt"
	"time"
)

type Packet struct {
	data string
}

type ResendTask struct {
	ctx            context.Context
	cancel         context.CancelFunc
	resendCount    int
	maxResends     int
	timeoutSeconds time.Duration
	packet         *Packet
}

func (rt *ResendTask) Begin() {
	if rt.resendCount <= rt.maxResends {
		ctx, cancel := context.WithTimeout(context.Background(), rt.timeoutSeconds*time.Second)

		rt.ctx = ctx
		rt.cancel = cancel

		go rt.start()
	} else {
		fmt.Println("Resent too many times")
	}
}

func (rt *ResendTask) Stop() {
	rt.cancel()
}

func (rt *ResendTask) start() {
	for {
		select {
		case <-rt.ctx.Done():
			fmt.Printf("Timeout. Resending %+v\n", rt.packet)
			rt.resendCount++
			rt.Begin() // * Start again
			return
		}
	}
}

func NewResendTask() *ResendTask {
	return &ResendTask{
		maxResends:     5,
		timeoutSeconds: 5,
		packet: &Packet{
			data: "Payload",
		},
	}
}

func main() {
	task := NewResendTask()

	task.Begin()

	for {
	}
}

In this example:

  • A ResendTask struct is created to manage the retransmission of a packet.
  • Every time ResendTask.Begin is called, a new Background context is created and used with a WithTimeout context.
  • The ResendTask waits to see if the context's timeout elapses, and if so it calls Begin again to create a new timeout context, repeating until the max amount of resends is reached.

This pattern makes managing these timeout/resends easier and less error prone due to 2 key factors of contexts using WithTimeout:

  1. WithTimeout has a cancel function, but unlike WithCancel the Done channel closes and resources are freed once the timeout elapses.
  2. Calling the cancel function from a WithTimeout context will immediately free the resources associated with the context and stop all operations using the context (stopping the timeout check).

Because of these 2 properties we essentially do not need to worry about memory usage for packet retransmission. Whenever a timeout occurs, the context frees itself. If a packet is acknowledged and the context canceled then the resources are also freed.

Any other details to share? (OPTIONAL)

No response

jonbarrow avatar May 07 '24 21:05 jonbarrow

I agree with using contexts as it would be the more "Go" way of doing things. Though I think most of our issues are related with race conditions, because the logic of resending a packet isn't atomic. See also https://github.com/PretendoNetwork/nex-go/blob/5f38bf34bfabacd03b586226350edc5706394b15/resend_scheduler.go#L99

https://github.com/PretendoNetwork/nex-go/pull/47 could help with this issue

Calling the cancel function from a WithTimeout context will immediately free the resources associated with the context and stop all operations using the context (stopping the timeout check)

I'm not sure I understand what this means? In the example you give the goroutine doesn't stop, even if using Stop. Adding the following code on main:

func main() {
	task := NewResendTask()

	task.Begin()
	fmt.Println("test1")
	time.Sleep(9 * time.Second)
	fmt.Println("test2")
	task.Stop()
	time.Sleep(10 * time.Second)
	fmt.Println("test3")
}

Doesn't stop the task, at least on Go Playground:

test1
Timeout. Resending &{data:Payload}
test2
Timeout. Resending &{data:Payload}
Timeout. Resending &{data:Payload}
test3

DaniElectra avatar May 08 '24 15:05 DaniElectra

In the example you give the goroutine doesn't stop at least on Go Playground

This may be an issue with the playground, then? Run the following normally and you'll see the context cancels all operations and frees itself, removing the goroutine:

package main

import (
	"context"
	"fmt"
	"runtime"
	"time"
)

type Packet struct {
	data string
}

type ResendTask struct {
	ctx            context.Context
	cancel         context.CancelFunc
	resendCount    int
	maxResends     int
	timeoutSeconds time.Duration
	packet         *Packet
}

func (rt *ResendTask) Begin() {
	if rt.resendCount == 3 {
		rt.Stop()
	} else if rt.resendCount <= rt.maxResends {
		ctx, cancel := context.WithTimeout(context.Background(), rt.timeoutSeconds*time.Second)

		rt.ctx = ctx
		rt.cancel = cancel

		go rt.start()
	} else {
		fmt.Println("Resent too many times")
	}
}

func (rt *ResendTask) Stop() {
	rt.cancel()
}

func (rt *ResendTask) start() {
	for {
		select {
		case <-rt.ctx.Done():
			fmt.Printf("Timeout. Resending %+v\n", rt.packet)
			rt.resendCount++
			rt.Begin() // * Start again
			return
		}
	}
}

func NewResendTask() *ResendTask {
	return &ResendTask{
		maxResends:     5,
		timeoutSeconds: 5,
		packet: &Packet{
			data: "Payload",
		},
	}
}

func main() {
	task := NewResendTask()

	task.Begin()

	for {
		logGoroutines()
	}
}

func logGoroutines() {
	time.Sleep(1 * time.Second)
	fmt.Printf("goroutines: %d\n", runtime.NumGoroutine())
}

This will print goroutines: 2 up until rt.Stop() is called, at which point it will print goroutines: 1 forever (showing the goroutine has indeed closed) and the inner prints no longer happen

Though I think most of our issues are related with race conditions

It addresses the same issue. The underlying issue resulting in memory leaks here is likely down to "bad management of tickers/timers". #47 addresses this as well, but this PR aims to remove the issue entirely.

jonbarrow avatar May 08 '24 15:05 jonbarrow

Example output from the above example:

goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
Timeout. Resending &{data:Payload}
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
Timeout. Resending &{data:Payload}
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
Timeout. Resending &{data:Payload}
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1

jonbarrow avatar May 08 '24 15:05 jonbarrow

@DaniElectra No, you were correct. My results were from a faulty test. Calling cancel here will still trigger Done as expected, however this can easily be worked around with an acknowledged field on the struct:

type ResendTask struct {
	ctx            context.Context
	cancel         context.CancelFunc
	resendCount    int
	maxResends     int
	timeoutSeconds time.Duration
	packet         *Packet
	acknowledged   bool
}

func (rt *ResendTask) Stop() {
	rt.acknowledged = true
	rt.cancel()
}

func (rt *ResendTask) start() {
	for {
		select {
		case <-rt.ctx.Done():
			if !rt.acknowledged {
				fmt.Printf("Timeout. Resending %+v\n", rt.packet)
				rt.resendCount++
				rt.Begin() // * Start again
			}
			return
		}
	}
}

Now if you move the resendCount check and the Stop call outside of that function, say into logGoroutines, it stops as expected:

func logGoroutines() {
	if task.resendCount == 3 {
		task.Stop()
	}

	time.Sleep(1 * time.Second)
	fmt.Printf("goroutines: %d\n", runtime.NumGoroutine())
}
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
Timeout. Resending &{data:Payload}
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
Timeout. Resending &{data:Payload}
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
goroutines: 2
Timeout. Resending &{data:Payload}
goroutines: 2
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1
goroutines: 1

jonbarrow avatar May 08 '24 15:05 jonbarrow

@DaniElectra Do you think this kind of task manager could be useful outside of nex-go? I wonder if this is something we should make as it's own module

jonbarrow avatar May 15 '24 16:05 jonbarrow