gaio
gaio copied to clipboard
OOM for 3K websockets
Hi,
I am trying to implement a websocket push based server using this library and I am constantly running into OOM for large number of sockets like 3K for example. I am wondering why this is happening? Below is the code. OOM doesn't happen for small number of sockets and memory seems to be stable. It only happens for large number of sockets.
clients := map[string]net.Conn{}
go func() {
for {
select {
case res := <-chIO: // receive IO events from watcher
if res.Error != nil {
log.Error().Msgf("Error receiving IO event from watcher: %v", res.Error)
delete(clients, res.Conn.RemoteAddr().String())
err = w.Free(res.Conn)
if err != nil {
log.Error().Msgf("error freeing connection: %v", err)
}
continue
}
case feed := <-out:
f := ws.NewTextFrame(feed)
bts := CompileHeader(f.Header)
for index, conn := range clients {
if conn != nil {
err = w.Write(nil, conn, bts)
if err != nil {
if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
delete(clients, index)
} else {
log.Error().Msgf("unable to write header: %v", err)
}
}
err = w.Write(nil, conn, f.Payload)
if err != nil {
if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
delete(clients, index)
} else {
log.Error().Msgf("unable to write payload: %v", err)
}
}
}
}
case conn := <-chConn: // receive new connection events
clients[conn.RemoteAddr().String()] = conn
}
}
}()
I did some profiling with pprof and here is what I have
heap profile: 35098695: 5565869472 [80607043: 16351145656] @ heap/2
14246148: 2507322048 [14882947: 2619398672] @ 0x84d345 0x475a92 0x84b25a 0x8f7a31 0x8f785b 0x46d041
0x84d344 github.com/xtaci/gaio.init.0.func1+0x24 /home/circleci/.go_workspace/pkg/mod/github.com/xtaci/[email protected]/watcher.go:28
0x475a91 sync.(*Pool).Get+0xb1 /usr/local/go/src/sync/pool.go:148
0x84b259 github.com/xtaci/gaio.(*watcher).aioCreate+0x1b9 /home/circleci/.go_workspace/pkg/mod/github.com/xtaci/[email protected]/watcher.go:272
0x8f7a30 github.com/xtaci/gaio.(*watcher).Write+0x7b0 /home/circleci/.go_workspace/pkg/mod/github.com/xtaci/[email protected]/watcher.go:240
14246061: 2507306736 [14882895: 2619389520] @ 0x84d345 0x475a92 0x84b25a 0x8f7845 0x8f75db 0x46d041
0x84d344 github.com/xtaci/gaio.init.0.func1+0x24 /home/circleci/.go_workspace/pkg/mod/github.com/xtaci/[email protected]/watcher.go:28
0x475a91 sync.(*Pool).Get+0xb1 /usr/local/go/src/sync/pool.go:148
0x84b259 github.com/xtaci/gaio.(*watcher).aioCreate+0x1b9 /home/circleci/.go_workspace/pkg/mod/github.com/xtaci/[email protected]/watcher.go:272
0x8f7844 github.com/xtaci/gaio.(*watcher).Write+0x5c4 /home/circleci/.go_workspace/pkg/mod/github.com/xtaci/[email protected]/watcher.go:240
6377990: 306143520 [24276054: 1165250592] @ 0x84ca52 0x84ca3e 0x84cb0f 0x84c0b8 0x46d041
0x84ca51 container/list.(*List).insertValue+0x4d1 /usr/local/go/src/container/list/list.go:104
0x84ca3d container/list.(*List).PushBack+0x4bd /usr/local/go/src/container/list/list.go:155
0x84cb0e github.com/xtaci/gaio.(*watcher).handlePending+0x58e /home/circleci/.go_workspace/pkg/mod/github.com/xtaci/[email protected]/watcher.go:563
0x84c0b7 github.com/xtaci/gaio.(*watcher).loop+0x2d7 /home/circleci/.go_workspace/pkg/mod/github.com/xtaci/[email protected]/watcher.go:437
1: 132481024 [1: 132481024] @ 0x84b425 0x8f7a31 0x8f785b 0x46d041
0x84b424 github.com/xtaci/gaio.(*watcher).aioCreate+0x384 /home/circleci/.go_workspace/pkg/mod/github.com/xtaci/[email protected]/watcher.go:276
0x8f7a30 github.com/xtaci/gaio.(*watcher).Write+0x7b0 /home/circleci/.go_workspace/pkg/mod/github.com/xtaci/[email protected]/watcher.go:240
1: 67821568 [1: 67821568] @ 0x84b425 0x8f7845 0x8f75db 0x46d041
0x84b424 github.com/xtaci/gaio.(*watcher).aioCreate+0x384 /home/circleci/.go_workspace/pkg/mod/github.com/xtaci/[email protected]/watcher.go:276
0x8f7844 github.com/xtaci/gaio.(*watcher).Write+0x5c4 /home/circleci/.go_workspace/pkg/mod/github.com/xtaci/[email protected]/watcher.go:240
159604: 28090304 [159656: 28099456] @ 0x84d345 0x475a92 0x84b25a 0x8f7545 0x8f74e7 0x46d041
0x84d344 github.com/xtaci/gaio.init.0.func1+0x24 /home/circleci/.go_workspace/pkg/mod/github.com/xtaci/[email protected]/watcher.go:28
0x475a91 sync.(*Pool).Get+0xb1 /usr/local/go/src/sync/pool.go:148
0x84b259 github.com/xtaci/gaio.(*watcher).aioCreate+0x1b9 /home/circleci/.go_workspace/pkg/mod/github.com/xtaci/[email protected]/watcher.go:272
0x8f7544 github.com/xtaci/gaio.(*watcher).Free+0x2c4 /home/circleci/.go_workspace/pkg/mod/github.com/xtaci/[email protected]/watcher.go:256
@xtaci any idea, why? looking at the profiler it shows somehow this list https://github.com/xtaci/gaio/blob/master/watcher.go#L563 is getting larger and larger and eventually runs out even though I have 8GB machine.
lemme look into this, somewhat wired.
@xtaci Please let me know if you need any more info. I can provide that pretty quickly since I spent few days tweaking it. Also, I have one question regarding async io call backs. It appears to me(although not 100% sure) that an aiocb object is created for every write request (ignoring reads for the sake of this discussion) if so, I am guessing what might be happening is that if we have slow clients the number of callbacks are piling up in the list pointed out in my previous comment and eventually running out of 8GB memory. so I tried changing write to writeTimeout of 100ms in the above code and that still didn't work. it is still causing OOM. Now the question I wanted to ask is, what happens if my server send a ton of write requests to all 3K sockets? would ii block at some point? if not, how to prevent OOM?
could you show me a code snippet which can re-appear the OOM
clients
aiocb is allocated via pool, and will be recycled
func init() {
aiocbPool.New = func() interface{} {
return new(aiocb)
}
}
@xtaci its the same as pasted above. let me know if you need more clarification ? I am also pasting below. this is very similar to your push server example, except I am using map of connections instead of can array. OOM happens when there are 1K+ connections with the code below. On a high level the code is just trying broadcast each message from a message queue to all the connected clients concurrently.
clients := map[string]net.Conn{}
go func() {
for {
select {
case res := <-chIO: // receive IO events from watcher
if res.Error != nil {
log.Error().Msgf("Error receiving IO event from watcher: %v", res.Error)
delete(clients, res.Conn.RemoteAddr().String())
err = w.Free(res.Conn)
if err != nil {
log.Error().Msgf("error freeing connection: %v", err)
}
continue
}
case feed := <-out:
f := ws.NewTextFrame(feed)
bts := CompileHeader(f.Header)
for index, conn := range clients {
if conn != nil {
err = w.Write(nil, conn, bts)
if err != nil {
if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
delete(clients, index)
} else {
log.Error().Msgf("unable to write header: %v", err)
}
}
err = w.Write(nil, conn, f.Payload)
if err != nil {
if errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET) {
delete(clients, index)
} else {
log.Error().Msgf("unable to write payload: %v", err)
}
}
}
}
case conn := <-chConn: // receive new connection events
clients[conn.RemoteAddr().String()] = conn
}
}
}()
@xtaci let me know if there are specific parts of this code you want me to explain?
func init() {
aiocbPool.New = func() interface{} {
return new(aiocb)
}
}
@xtaci what is the pool size here? and what happens if the pool is full? Does the event loop suspend its thread?