fasthttp icon indicating copy to clipboard operation
fasthttp copied to clipboard

Using atomic instead of mutex and delete scratch slice

Open NikoMalik opened this issue 1 year ago • 7 comments
trafficstars

NikoMalik avatar Aug 16 '24 15:08 NikoMalik

Whether using an array with a lock or a linked list with atomic operations to manage the workerChan resources, the subsequent operations on workerChan are I/O-intensive. Given that the operations are FILO and each element involves significant I/O, I don't think a linked list has any particular advantage.

newacorn avatar Aug 17 '24 00:08 newacorn

Whether using an array with a lock or a linked list with atomic operations to manage the workerChan resources, the subsequent operations on workerChan are I/O-intensive. Given that the operations are FILO and each element involves significant I/O, I don't think a linked list has any particular advantage.

I got a boost in benchmark tests by almost a factor and a half

NikoMalik avatar Aug 17 '24 06:08 NikoMalik

Can you show which benchmarks and their results here?

erikdubbelboer avatar Aug 20 '24 21:08 erikdubbelboer


func BenchmarkWorkerPoolStartStopSerial(b *testing.B) {
	for i := 0; i < b.N; i++ {
		testWorkerPoolStartStopBENCH()
	}
}

func BenchmarkWorkerPoolStartStopConcurrent(b *testing.B) {
	concurrency := 10
	ch := make(chan struct{}, concurrency)
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		for j := 0; j < concurrency; j++ {
			go func() {
				testWorkerPoolStartStopBENCH()
				ch <- struct{}{}
			}()
		}
		for j := 0; j < concurrency; j++ {
			select {
			case <-ch:
			case <-time.After(time.Second):
				b.Fatalf("timeout")
			}
		}
	}
}

func BenchmarkWorkerPoolMaxWorkersCountSerial(b *testing.B) {
	for i := 0; i < b.N; i++ {
		testWorkerPoolMaxWorkersCountMultiBENCH(b)
	}
}

func BenchmarkWorkerPoolMaxWorkersCountConcurrent(b *testing.B) {
	concurrency := 4
	ch := make(chan struct{}, concurrency)
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		for j := 0; j < concurrency; j++ {
			go func() {
				testWorkerPoolMaxWorkersCountMultiBENCH(b)
				ch <- struct{}{}
			}()
		}
		for j := 0; j < concurrency; j++ {
			select {
			case <-ch:
			case <-time.After(time.Second * 2):
				b.Fatalf("timeout")
			}
		}
	}
}

func testWorkerPoolStartStopBENCH() {
	wp := &workerPool{
		WorkerFunc:      func(conn net.Conn) error { return nil },
		MaxWorkersCount: 10,
		Logger:          defaultLogger,
	}
	for i := 0; i < 10; i++ {
		wp.Start()
		wp.Stop()
	}
}

func testWorkerPoolMaxWorkersCountMultiBENCH(b *testing.B) {
	for i := 0; i < 5; i++ {
		testWorkerPoolMaxWorkersCountBENCH(b)
	}
}

func testWorkerPoolMaxWorkersCountBENCH(b *testing.B) {
	ready := make(chan struct{})
	wp := &workerPool{
		WorkerFunc: func(conn net.Conn) error {
			buf := make([]byte, 100)
			n, err := conn.Read(buf)
			if err != nil {
				b.Errorf("unexpected error: %v", err)
			}
			buf = buf[:n]
			if string(buf) != "foobar" {
				b.Errorf("unexpected data read: %q. Expecting %q", buf, "foobar")
			}
			if _, err = conn.Write([]byte("baz")); err != nil {
				b.Errorf("unexpected error: %v", err)
			}

			<-ready

			return nil
		},
		MaxWorkersCount: 10,
		Logger:          defaultLogger,
		connState:       func(net.Conn, ConnState) {},
	}
	wp.Start()

	ln := fasthttputil.NewInmemoryListener()

	clientCh := make(chan struct{}, wp.MaxWorkersCount)
	for i := 0; i < wp.MaxWorkersCount; i++ {
		go func() {
			conn, err := ln.Dial()
			if err != nil {
				b.Errorf("unexpected error: %v", err)
			}
			if _, err = conn.Write([]byte("foobar")); err != nil {
				b.Errorf("unexpected error: %v", err)
			}
			data, err := io.ReadAll(conn)
			if err != nil {
				b.Errorf("unexpected error: %v", err)
			}
			if string(data) != "baz" {
				b.Errorf("unexpected value read: %q. Expecting %q", data, "baz")
			}
			if err = conn.Close(); err != nil {
				b.Errorf("unexpected error: %v", err)
			}
			clientCh <- struct{}{}
		}()
	}

	for i := 0; i < wp.MaxWorkersCount; i++ {
		conn, err := ln.Accept()
		if err != nil {
			b.Fatalf("unexpected error: %v", err)
		}
		if !wp.Serve(conn) {
			b.Fatalf("worker pool must have enough workers to serve the conn")
		}
	}

	go func() {
		if _, err := ln.Dial(); err != nil {
			b.Errorf("unexpected error: %v", err)
		}
	}()
	conn, err := ln.Accept()
	if err != nil {
		b.Fatalf("unexpected error: %v", err)
	}
	for i := 0; i < 5; i++ {
		if wp.Serve(conn) {
			b.Fatalf("worker pool must be full")
		}
	}
	if err = conn.Close(); err != nil {
		b.Fatalf("unexpected error: %v", err)
	}

	close(ready)

	for i := 0; i < wp.MaxWorkersCount; i++ {
		select {
		case <-clientCh:
		case <-time.After(time.Second):
			b.Fatalf("timeout")
		}
	}

	if err := ln.Close(); err != nil {
		b.Fatalf("unexpected error: %v", err)
	}
	wp.Stop()
}

I used this benchmark based on tests resultsUsingAtomic:

goarch: amd64
pkg: github.com/valyala/fasthttp
cpu: 11th Gen Intel(R) Core(TM) i5-11400F @ 2.60GHz
=== RUN   BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial-12
  216567              5454 ns/op            1494 B/op         21 allocs/op
=== RUN   BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent-12
   35728             31353 ns/op           17913 B/op        250 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial-12
    2488            523983 ns/op          253846 B/op       1143 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent-12
    1339            811728 ns/op         1019013 B/op       4624 allocs/op
PASS
ok      github.com/valyala/fasthttp     5.554s

resultsUsingMutexAndSlice:

goarch: amd64
pkg: github.com/valyala/fasthttp
cpu: 11th Gen Intel(R) Core(TM) i5-11400F @ 2.60GHz
=== RUN   BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial-12
  211056              5594 ns/op            1508 B/op         21 allocs/op
=== RUN   BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent-12
   35869             32778 ns/op           18003 B/op        250 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial-12
    2433            537527 ns/op          256182 B/op       1093 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent-12
    1458            876898 ns/op         1027666 B/op       4420 allocs/op
PASS
ok      github.com/valyala/fasthttp     5.788s

NikoMalik avatar Aug 21 '24 08:08 NikoMalik

Any idea what is causing the extra allocations?

erikdubbelboer avatar Aug 21 '24 20:08 erikdubbelboer

Any idea what is causing the extra allocations?

i fixed

cpu: 11th Gen Intel(R) Core(TM) i5-11400F @ 2.60GHz
=== RUN   BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial
BenchmarkWorkerPoolStartStopSerial-12
  205198              5466 ns/op            1494 B/op         21 allocs/op
=== RUN   BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent
BenchmarkWorkerPoolStartStopConcurrent-12
   34980             30404 ns/op           17959 B/op        250 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial
BenchmarkWorkerPoolMaxWorkersCountSerial-12
    2520            509416 ns/op          251338 B/op       1050 allocs/op
=== RUN   BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent
BenchmarkWorkerPoolMaxWorkersCountConcurrent-12
    1652            782699 ns/op         1008180 B/op       4218 allocs/op
PASS
ok      github.com/valyala/fasthttp     5.588s 

problem was in getch()

NikoMalik avatar Aug 22 '24 09:08 NikoMalik

Seems like the code isn't completely thread safe, 3 tests failed with the race detector.

I may have ruled out the last possible data races

NikoMalik avatar Aug 24 '24 08:08 NikoMalik

Thanks!

erikdubbelboer avatar Aug 25 '24 15:08 erikdubbelboer