websocket
websocket copied to clipboard
Increase in memory usage when compression is on
Hi,
I've been started using with Centrifugo in the past week. I'm using the raw Websocket endpoint which uses this library under the hood.
I'm experiencing a situation where per-message-deflate is enabled there is a massive grow in memory up to the point the docker container crashes for using too much memory.
I'm running inside a docker container, with average 150K-200K concurrent users, my average message rate is between 30K-50K messages per sec, with average message size of 600 bytes.
Without the per-message-deflate the is no memory grow at all and performance is awesome, but the data transfer is very high.
Can anyone help me investigate it ?
Thank you.
First step would probably be to get a heap and allocation profile of your application using pprof.
Please generate the profiles with the latest version of the package. The recent change to pool flate readers and writes should help.
Thanks guys,
I'm waiting for the next version of Centrifugo which includes your latest version and then will profile the application and upload here.
You can close this issue until then, but I hope to have the new version today or tomorrow the latest so it's up to you.
Here is the pprof dump
This is the heap profile
@kisielk @garyburd I've updated the gist please check now
https://gist.github.com/joshdvir/091229e3d3e4ade8d73b8cffe86c602b
I asked @joshdvir to send be cpu and memory profiles from production node, here is what we have:
CPU:
(pprof) top 20 --cum
28.99s of 62.03s total (46.74%)
Dropped 523 nodes (cum <= 0.31s)
Showing top 20 nodes out of 155 (cum >= 8.07s)
flat flat% sum% cum cum%
0 0% 0% 58.68s 94.60% runtime.goexit
0.05s 0.081% 0.081% 45.44s 73.25% github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessages
0.16s 0.26% 0.34% 44.23s 71.30% github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessage
0.16s 0.26% 0.6% 44.07s 71.05% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*wsSession).Send
0.05s 0.081% 0.68% 43.82s 70.64% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage
0.01s 0.016% 0.69% 21.67s 34.93% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*flateWriteWrapper).Close
0.03s 0.048% 0.74% 20.19s 32.55% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextWriter
0.07s 0.11% 0.85% 19.79s 31.90% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover
17.56s 28.31% 29.16% 17.56s 28.31% runtime.memclr
0.04s 0.064% 29.23% 15.46s 24.92% compress/flate.(*Writer).Reset
0.03s 0.048% 29.28% 15.42s 24.86% compress/flate.(*compressor).reset
0 0% 29.28% 14.40s 23.21% compress/flate.(*Writer).Flush
0 0% 29.28% 14.40s 23.21% compress/flate.(*compressor).syncFlush
2.62s 4.22% 33.50% 14.01s 22.59% compress/flate.(*compressor).deflate
0.01s 0.016% 33.52% 11.05s 17.81% compress/flate.(*compressor).writeBlock
0.15s 0.24% 33.76% 11.04s 17.80% compress/flate.(*huffmanBitWriter).writeBlock
0.21s 0.34% 34.10% 9.05s 14.59% runtime.systemstack
0.06s 0.097% 34.19% 8.87s 14.30% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*messageWriter).flushFrame
0.07s 0.11% 34.31% 8.81s 14.20% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).write
Most of cpu time spent in WriteMessage:
(pprof) list WriteMessage
Total: 1.03mins
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/conn.go
50ms 43.82s (flat, cum) 70.64% of Total
. . 659:
. . 660:// WriteMessage is a helper method for getting a writer using NextWriter,
. . 661:// writing the message and closing the writer.
. . 662:func (c *Conn) WriteMessage(messageType int, data []byte) error {
. . 663:
50ms 50ms 664: if c.isServer && (c.newCompressionWriter == nil || !c.enableWriteCompression) {
. . 665:
. . 666: // Fast path with no allocations and single frame.
. . 667:
. 20ms 668: if err := c.prepWrite(messageType); err != nil {
. . 669: return err
. . 670: }
. . 671: mw := messageWriter{c: c, frameType: messageType, pos: maxFrameHeaderSize}
. 10ms 672: n := copy(c.writeBuf[mw.pos:], data)
. . 673: mw.pos += n
. . 674: data = data[n:]
. 1.69s 675: return mw.flushFrame(true, data)
. . 676: }
. . 677:
. 20.19s 678: w, err := c.NextWriter(messageType)
. . 679: if err != nil {
. . 680: return err
. . 681: }
. 190ms 682: if _, err = w.Write(data); err != nil {
. . 683: return err
. . 684: }
. 21.67s 685: return w.Close()
. . 686:}
NextWriter:
(pprof) list NextWriter
Total: 1.03mins
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextWriter in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/conn.go
30ms 20.19s (flat, cum) 32.55% of Total
. . 437:// method flushes the complete message to the network.
. . 438://
. . 439:// There can be at most one open writer on a connection. NextWriter closes the
. . 440:// previous writer if the application has not already done so.
. . 441:func (c *Conn) NextWriter(messageType int) (io.WriteCloser, error) {
. 90ms 442: if err := c.prepWrite(messageType); err != nil {
. . 443: return nil, err
. . 444: }
. . 445:
. . 446: mw := &messageWriter{
. . 447: c: c,
. . 448: frameType: messageType,
10ms 280ms 449: pos: maxFrameHeaderSize,
. . 450: }
. 10ms 451: c.writer = mw
. . 452: if c.newCompressionWriter != nil && c.enableWriteCompression && isData(messageType) {
10ms 19.80s 453: w := c.newCompressionWriter(c.writer)
. . 454: mw.compress = true
10ms 10ms 455: c.writer = w
. . 456: }
. . 457: return c.writer, nil
. . 458:}
. . 459:
. . 460:type messageWriter struct {
compressNoContextTakeover:
(pprof) list compressNoContextTakeover
Total: 1.03mins
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/compression.go
70ms 19.79s (flat, cum) 31.90% of Total
. . 33: fr.(flate.Resetter).Reset(io.MultiReader(r, strings.NewReader(tail)), nil)
. . 34: return &flateReadWrapper{fr}
. . 35:}
. . 36:
. . 37:func compressNoContextTakeover(w io.WriteCloser) io.WriteCloser {
. 130ms 38: tw := &truncWriter{w: w}
40ms 3.93s 39: fw, _ := flateWriterPool.Get().(*flate.Writer)
10ms 15.47s 40: fw.Reset(tw)
20ms 260ms 41: return &flateWriteWrapper{fw: fw, tw: tw}
. . 42:}
And now heap profile:
(pprof) top 30 --cum
4794.23MB of 5414.45MB total (88.55%)
Dropped 238 nodes (cum <= 27.07MB)
Showing top 30 nodes out of 46 (cum >= 113.64MB)
flat flat% sum% cum cum%
0 0% 0% 5385.39MB 99.46% runtime.goexit
0 0% 0% 4277.82MB 79.01% sync.(*Pool).Get
0 0% 0% 4277.82MB 79.01% sync.(*Pool).getSlow
0 0% 0% 4182.80MB 77.25% github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessages
0 0% 0% 4181.80MB 77.23% github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessage
0 0% 0% 4181.80MB 77.23% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*wsSession).Send
0 0% 0% 4181.80MB 77.23% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage
8MB 0.15% 0.15% 4168.27MB 76.98% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextWriter
12MB 0.22% 0.37% 4160.27MB 76.84% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover
3792.80MB 70.05% 70.42% 4148.27MB 76.61% compress/flate.NewWriter
0 0% 70.42% 4148.27MB 76.61% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.glob..func1
0.50MB 0.0092% 70.43% 1156.29MB 21.36% net/http.(*conn).serve
0 0% 70.43% 873.42MB 16.13% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*HTTPServer).Logged.func1
0 0% 70.43% 873.42MB 16.13% net/http.HandlerFunc.ServeHTTP
0 0% 70.43% 872.92MB 16.12% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*HTTPServer).WrapShutdown.func1
0 0% 70.43% 872.92MB 16.12% net/http.(*ServeMux).ServeHTTP
0 0% 70.43% 872.92MB 16.12% net/http.serverHandler.ServeHTTP
0 0% 70.43% 866.91MB 16.01% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*HTTPServer).RawWebsocketHandler
0 0% 70.43% 866.91MB 16.01% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*HTTPServer).RawWebsocketHandler-fm
0 0% 70.43% 404.78MB 7.48% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).ReadMessage
355.47MB 6.57% 76.99% 355.47MB 6.57% compress/flate.(*compressor).init
0 0% 76.99% 320.19MB 5.91% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Upgrader).Upgrade
0.50MB 0.0092% 77.00% 292.64MB 5.40% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextReader
1.50MB 0.028% 77.03% 291.64MB 5.39% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.decompressNoContextTakeover
215.85MB 3.99% 81.02% 216.35MB 4.00% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.newConn
159.10MB 2.94% 83.96% 159.10MB 2.94% compress/flate.(*decompressor).Reset
129.04MB 2.38% 86.34% 129.04MB 2.38% compress/flate.NewReader
0 0% 86.34% 129.04MB 2.38% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.glob..func2
119.46MB 2.21% 88.55% 119.46MB 2.21% net/http.newBufioWriterSize
0 0% 88.55% 113.64MB 2.10% io/ioutil.ReadAll
NextWriter:
(pprof) list WriteMessage
Total: 5.29GB
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/conn.go
0 4.08GB (flat, cum) 77.23% of Total
. . 673: mw.pos += n
. . 674: data = data[n:]
. . 675: return mw.flushFrame(true, data)
. . 676: }
. . 677:
. 4.07GB 678: w, err := c.NextWriter(messageType)
. . 679: if err != nil {
. . 680: return err
. . 681: }
. . 682: if _, err = w.Write(data); err != nil {
. . 683: return err
. . 684: }
. 13.53MB 685: return w.Close()
. . 686:}
compressNoContextTakeover:
(pprof) list compressNoContextTakeover
Total: 5.29GB
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/compression.go
12MB 4.06GB (flat, cum) 76.84% of Total
. . 33: fr.(flate.Resetter).Reset(io.MultiReader(r, strings.NewReader(tail)), nil)
. . 34: return &flateReadWrapper{fr}
. . 35:}
. . 36:
. . 37:func compressNoContextTakeover(w io.WriteCloser) io.WriteCloser {
10MB 10MB 38: tw := &truncWriter{w: w}
. 4.05GB 39: fw, _ := flateWriterPool.Get().(*flate.Writer)
. . 40: fw.Reset(tw)
2MB 2MB 41: return &flateWriteWrapper{fw: fw, tw: tw}
. . 42:}
@FZambia Thank you for posting the profile information.
Counters should be added here and here to determine how effective the pool is. Perhaps there's a code path where the flate writer is not returned to the pool.
Possibly related: https://github.com/golang/go/issues/18625
@y3llowcake thanks for pointing on this issue.
I've written test case for Gorilla Websocket:
type testConn struct {
conn *Conn
messages chan []byte
}
func newTestConn(c *Conn, bufferSize int) *testConn {
return &testConn{
conn: c,
messages: make(chan []byte, bufferSize),
}
}
func printss() {
m := runtime.MemStats{}
runtime.ReadMemStats(&m)
fmt.Printf("inuse: %d sys: %d\n", m.StackInuse, m.StackSys)
}
func TestWriteWithCompression(t *testing.T) {
w := ioutil.Discard
done := make(chan struct{})
numConns := 1000
numMessages := 1000
conns := make([]*testConn, numConns)
var wg sync.WaitGroup
for i := 0; i < numConns; i++ {
c := newConn(fakeNetConn{Reader: nil, Writer: w}, false, 1024, 1024)
c.enableWriteCompression = true
c.newCompressionWriter = compressNoContextTakeover
conns[i] = newTestConn(c, 256)
wg.Add(1)
go func(c *testConn) {
defer wg.Done()
i := 0
for i < numMessages {
select {
case <-done:
return
case msg := <-c.messages:
c.conn.WriteMessage(TextMessage, msg)
i++
}
}
}(conns[i])
}
messages := textMessages(100)
for i := 0; i < numMessages; i++ {
if i%100 == 0 {
printss()
}
msg := messages[i%len(messages)]
for _, c := range conns {
c.messages <- msg
}
}
wg.Wait()
}
func textMessages(num int) [][]byte {
messages := make([][]byte, num)
for i := 0; i < num; i++ {
msg := fmt.Sprintf("planet: %d, country: %d, city: %d, street: %d", i, i, i, i)
messages[i] = []byte(msg)
}
return messages
}
It creates 1000 connections with compression enabled, each with buffered message channel. Then in a loop we write message into each connection.
Here is how it behaves with go1.7.4
fz@websocket: go test -test.run=TestWriteWithCompression
inuse: 4259840 sys: 4259840
inuse: 27394048 sys: 27394048
inuse: 246251520 sys: 246251520
inuse: 1048510464 sys: 1048510464
inuse: 1048510464 sys: 1048510464
inuse: 1049034752 sys: 1049034752
inuse: 1049034752 sys: 1049034752
inuse: 1049034752 sys: 1049034752
inuse: 1049034752 sys: 1049034752
inuse: 1049034752 sys: 1049034752
PASS
ok github.com/gorilla/websocket 11.053s
Using Go with commit https://github.com/golang/go/commit/9c3630f578db1d4331b367c3c7d284db299be3a6
fz@websocket: go1.8 test -test.run=TestWriteWithCompression
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
inuse: 4521984 sys: 4521984
PASS
ok github.com/gorilla/websocket 12.023s
Though It's hard to say at moment will this fix solve original problem in this issue or not.
I also tried the same with flate from https://github.com/klauspost/compress by @klauspost which already contains that array copy fix in master:
fz@websocket: go test -test.run=TestWriteWithCompression
inuse: 4358144 sys: 4358144
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
inuse: 4587520 sys: 4587520
PASS
ok github.com/gorilla/websocket 3.426s
But actually even without that fix https://github.com/klauspost/compress library behaves without memory grows... I can't explain this.
Also here is benchmark result using https://github.com/klauspost/compress library:
BenchmarkWriteWithCompression-4 200000 5676 ns/op 149 B/op 3 allocs/op
It's 4x speedup comparing to standard lib compress/flate results:
BenchmarkWriteWithCompression-4 50000 25362 ns/op 128 B/op 3 allocs/op
@garyburd I understand that having non-standard lib package in core could be a wrong step, but maybe we can consider mechanism to let it be plugged somehow by user's code?
even without that fix [it] behaves without memory grows... I can't explain this.
AFAICT, this package uses "level 3" compression (which is a good choice). In my package level 1-4 are a specialized, and do not use the "generic" code, which has the issue.
In Go 1.7 level 1 (Best speed) has a similar specialized function. I would think that if you use that, you will not experience the issue. That might be a solution you can use, so you do not have to import a specialized package (even if I wouldn't mind to give users the option). Performance for level 1 should be very close to my package.
@klauspost thanks for explaining, just tried what you said - yes, with compression level 1 performance is comparable to your library and has no memory problems in go1.7 (in test case above)
@garyburd what do you think about this? I see two solutions that can help us - make compression level exported variable or allow to plug custom flate implementation. Of course we can also wait for go1.8 but a way to improve compression performance still very important. Do you want us to try creating custom build with advices you gave and fixed array copy bug and see how it behaves in production?
@FZambia How about setting compression level to one for now? It's probably the best option for most applications at this time and avoids exposing more API surface area.
@garyburd I agree with @FZambia, having the option to set the compression would help greatly, for my specific use case I have a fan out of 80K messages per sec for 200K users which cause a lot of traffic out, if I will be able to set the compression level I could find the sweet spot between servers numbers and traffic out. Usually, servers cost much less than traffic when the traffic is so high so having this option configurable will be awesome Thanks
OK, let's add the following:
type CompressionOptions {
// Level specifies the compression level for the flate compressor. Valid levels range from
// -2 to 9. Level -2 will use Huffman compression only. Level -1 uses the default compression
// level. Level 0 does not attempt any compression. Levels 1 through 9 range from best
// speed to best compression.
//
// Applications should set this field. The default value of 0 does not attempt any compression.
Level int
}
type Dialer struct {
// CompressionOptions specifies options the client should use for
// per message compression (RFC 7692). If CompressionOptions is nil and
// EnableCompression is nil, then the client does not attempt to negotiate
// compression with the server.
CompressionOptions *CompressionOptions
// EnableCompression specifies if the client should attempt to negotiate
// per message compression (RFC 7692). Setting this value to true does not
// guarantee that compression will be supported. Currently only "no context
// takeover" modes are supported.
//
// Deprecated: Set CompressionOptions to non-nil value to enable compression
// negotiation.
EnableCompression bool
}
Modify Upgrader to match Dialer.
@garyburd I agree that level 1 is better for default value at moment because it fixes memory grows on Go1.7 and compression is so costly. But looks like on fanout level in such a big apps as @joshdvir has saving bandwidth saves a lot of money so having compression level configurable makes sense.
We made a custom build with compression level 1 and counters you suggested and put it into production. Counter values are:
Node 1:
"gorilla_websocket_flate_writer_from_pool": 1453147,
"gorilla_websocket_new_flate_writer": 6702
Node 2:
"gorilla_websocket_flate_writer_from_pool": 1820919,
"gorilla_websocket_new_flate_writer": 3676,
Node 3:
"gorilla_websocket_flate_writer_from_pool": 574187,
"gorilla_websocket_new_flate_writer": 321
...
It's aggregation over 1 minute. So pool looks pretty effective but...
...Compression is still the leader in allocs and CPU profiles and getting from sync.Pool is the most allocation-expensive operation for some reason.
Here is CPU profile now:
(pprof) top 30 --cum
27.28s of 52.42s total (52.04%)
Dropped 414 nodes (cum <= 0.26s)
Showing top 30 nodes out of 137 (cum >= 1.89s)
flat flat% sum% cum cum%
0 0% 0% 50.21s 95.78% runtime.goexit
0.16s 0.31% 0.31% 43.93s 83.80% github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessages
0.21s 0.4% 0.71% 42.52s 81.11% github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessage
0.19s 0.36% 1.07% 42.31s 80.71% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*wsSession).Send
0.21s 0.4% 1.47% 41.87s 79.87% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage
0.01s 0.019% 1.49% 35.43s 67.59% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*flateWriteWrapper).Close
0.03s 0.057% 1.55% 24.69s 47.10% compress/flate.(*Writer).Flush
0 0% 1.55% 24.66s 47.04% compress/flate.(*compressor).syncFlush
0.04s 0.076% 1.62% 24.03s 45.84% compress/flate.(*compressor).encSpeed
0.08s 0.15% 1.77% 18.16s 34.64% compress/flate.(*huffmanBitWriter).writeBlockDynamic
0.12s 0.23% 2.00% 15.03s 28.67% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*messageWriter).flushFrame
0.11s 0.21% 2.21% 14.90s 28.42% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).write
0.91s 1.74% 3.95% 13.21s 25.20% compress/flate.(*huffmanEncoder).generate
0 0% 3.95% 12.72s 24.27% net.(*conn).Write
0.06s 0.11% 4.06% 12.72s 24.27% net.(*netFD).Write
11.78s 22.47% 26.54% 12.16s 23.20% syscall.Syscall
0.05s 0.095% 26.63% 12.09s 23.06% syscall.Write
0.02s 0.038% 26.67% 12.04s 22.97% syscall.write
0.61s 1.16% 27.83% 11.98s 22.85% compress/flate.(*huffmanBitWriter).indexTokens
0 0% 27.83% 10.61s 20.24% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*messageWriter).Close
5.20s 9.92% 37.75% 6.62s 12.63% compress/flate.(*huffmanEncoder).bitCounts
4.77s 9.10% 46.85% 5.44s 10.38% compress/flate.encodeBestSpeed
list WriteMessage
(pprof) list WriteMessage
Total: 52.42s
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/conn.go
210ms 41.87s (flat, cum) 79.87% of Total
. . 659:
. . 660:// WriteMessage is a helper method for getting a writer using NextWriter,
. . 661:// writing the message and closing the writer.
. . 662:func (c *Conn) WriteMessage(messageType int, data []byte) error {
. . 663:
160ms 160ms 664: if c.isServer && (c.newCompressionWriter == nil || !c.enableWriteCompression) {
. . 665:
. . 666: // Fast path with no allocations and single frame.
. . 667:
10ms 40ms 668: if err := c.prepWrite(messageType); err != nil {
. . 669: return err
. . 670: }
. . 671: mw := messageWriter{c: c, frameType: messageType, pos: maxFrameHeaderSize}
. 80ms 672: n := copy(c.writeBuf[mw.pos:], data)
. . 673: mw.pos += n
. . 674: data = data[n:]
10ms 4.43s 675: return mw.flushFrame(true, data)
. . 676: }
. . 677:
. 1.63s 678: w, err := c.NextWriter(messageType)
. . 679: if err != nil {
. . 680: return err
. . 681: }
30ms 100ms 682: if _, err = w.Write(data); err != nil {
. . 683: return err
. . 684: }
. 35.43s 685: return w.Close()
list Close
. . 104:func (w *flateWriteWrapper) Close() error {
. . 105: if w.fw == nil {
. . 106: return errWriteClosed
. . 107: }
. 24.69s 108: err1 := w.fw.Flush()
10ms 130ms 109: flateWriterPool.Put(w.fw)
. . 110: w.fw = nil
. . 111: if w.tw.p != [4]byte{0, 0, 0xff, 0xff} {
. . 112: return errors.New("websocket: internal error, unexpected bytes at end of flate stream")
. . 113: }
. 10.61s 114: err2 := w.tw.w.Close()
. . 115: if err1 != nil {
. . 116: return err1
. . 117: }
. . 118: return err2
. . 119:}
list Flush
. . 711:// In the terminology of the zlib library, Flush is equivalent to Z_SYNC_FLUSH.
. . 712:func (w *Writer) Flush() error {
. . 713: // For more about flushing:
. . 714: // http://www.bolet.org/~pornin/deflate-flush.html
30ms 24.69s 715: return w.d.syncFlush()
. . 716:}
. . 717:
. . 718:// Close flushes and closes the writer.
. . 719:func (w *Writer) Close() error {
. . 720: return w.d.close()
ROUTINE ======================== compress/flate.(*compressor).syncFlush in /Users/fz/go1.7/src/compress/flate/deflate.go
0 24.66s (flat, cum) 47.04% of Total
. . 555:func (d *compressor) syncFlush() error {
. . 556: if d.err != nil {
. . 557: return d.err
. . 558: }
. . 559: d.sync = true
. 24.03s 560: d.step(d)
. . 561: if d.err == nil {
. 490ms 562: d.w.writeStoredHeader(0, false)
. 140ms 563: d.w.flush()
. . 564: d.err = d.w.err
. . 565: }
. . 566: d.sync = false
. . 567: return d.err
. . 568:}
Memory usage is much better now but it still very high, compression allocates a lot:
fz@centrifugo: go tool pprof --alloc_space centrifugo heap_profile_extra
Entering interactive mode (type "help" for commands)
(pprof) top 30 --cum
518.97GB of 541.65GB total (95.81%)
Dropped 314 nodes (cum <= 2.71GB)
Showing top 30 nodes out of 35 (cum >= 3.33GB)
flat flat% sum% cum cum%
0 0% 0% 541.53GB 100% runtime.goexit
0 0% 0% 505.54GB 93.33% github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessages
0 0% 0% 504.45GB 93.13% github.com/centrifugal/centrifugo/libcentrifugo/conns/clientconn.(*client).sendMessage
0 0% 0% 504.45GB 93.13% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*wsSession).Send
0 0% 0% 504.45GB 93.13% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).WriteMessage
6.63GB 1.22% 1.22% 501.75GB 92.63% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextWriter
6.56GB 1.21% 2.44% 495.11GB 91.41% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover
0 0% 2.44% 491.89GB 90.81% sync.(*Pool).Get
0 0% 2.44% 491.89GB 90.81% sync.(*Pool).getSlow
359.74GB 66.42% 68.85% 488.55GB 90.20% compress/flate.NewWriter
0 0% 68.85% 488.55GB 90.20% github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.glob..func1
128.81GB 23.78% 92.63% 128.81GB 23.78% compress/flate.(*compressor).init
0.01GB 0.0019% 92.64% 28.97GB 5.35% net/http.(*conn).serve
0 0% 92.64% 25.18GB 4.65% github.com/centrifugal/centrifugo/libcentrifugo/server/httpserver.(*HTTPServer).Logged.func1
0 0% 92.64% 25.18GB 4.65% net/http.(*ServeMux).ServeHTTP
0 0% 92.64% 25.18GB 4.65% net/http.HandlerFunc.ServeHTTP
0 0% 92.64% 25.18GB 4.65% net/http.serverHandler.ServeHTTP
list NextWriter
(pprof) list NextWriter
Total: 541.65GB
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.(*Conn).NextWriter in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/conn.go
6.63GB 501.75GB (flat, cum) 92.63% of Total
. . 444: }
. . 445:
. . 446: mw := &messageWriter{
. . 447: c: c,
. . 448: frameType: messageType,
6.63GB 6.63GB 449: pos: maxFrameHeaderSize,
. . 450: }
. . 451: c.writer = mw
. . 452: if c.newCompressionWriter != nil && c.enableWriteCompression && isData(messageType) {
. 495.11GB 453: w := c.newCompressionWriter(c.writer)
. . 454: mw.compress = true
. . 455: c.writer = w
. . 456: }
. . 457: return c.writer, nil
. . 458:}
list compressNoContextTakeover
(pprof) list compressNoContextTakeover
Total: 541.65GB
ROUTINE ======================== github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket.compressNoContextTakeover in /Users/fz/go/src/github.com/centrifugal/centrifugo/vendor/github.com/gorilla/websocket/compression.go
6.56GB 495.11GB (flat, cum) 91.41% of Total
. . 44: fr.(flate.Resetter).Reset(io.MultiReader(r, strings.NewReader(tail)), nil)
. . 45: return &flateReadWrapper{fr}
. . 46:}
. . 47:
. . 48:func compressNoContextTakeover(w io.WriteCloser) io.WriteCloser {
4.41GB 4.41GB 49: tw := &truncWriter{w: w}
. 488.55GB 50: fw, _ := flateWriterPool.Get().(*flate.Writer)
. . 51: plugin.Metrics.Counters.Inc("gorilla_websocket_flate_writer_from_pool")
. . 52: fw.Reset(tw)
2.15GB 2.15GB 53: return &flateWriteWrapper{fw: fw, tw: tw}
. . 54:}
--inuse_space shows similar picture just values two order of magnitude less (2.69GB for flateWriterPool.Get().(*flate.Writer) line which is leader).
It's hard to say what can we do with such a big compression overhead...
@garyburd you suggest adding CompressionOptions - I can do pull request with it, but maybe just a global exported variable like DefaultFlateCompressionLevel that we can set on application start or setter method will do the work? We don't need per-connection compression level - and we can eventually do what you suggested if there will be a need later. And no deprecation will be required this way for now.
@FZambia Thank you for testing the pool effectiveness and reporting the profiles.
I don't want to add an API now that might replaced with another API later.
As I think about it more, it's better to add one method:
// SetCompressionLevel sets the flate compression level for the next message.
// Valid levels range from -2 to 9. Level -2 will use Huffman compression only.
// Level -1 uses the default compression level. Level 0 does not attempt any
// compression. Levels 1 through 9 range from best speed to best compression.
func (c *Conn) SetCompressionLevel(n int) error {
}
This is more flexible than other options. The implementation will replace flateWriterPool with flatWriterPools[12]. The flateWriterWrapper will need to store the level so the writer can be returned to the correct pool.
I think that having one method that changes default compression level still makes sense:
var defaultCompressionLevel int = 1
// SetDefaultCompressionLevel sets the flate compression level which will be used by
// default to compress messages when compression negotiated. This function must be
// called once before application starts.
//
// Valid levels range from -2 to 9. Level -2 will use Huffman compression only.
// Level -1 uses the default compression level. Level 0 does not attempt any
// compression. Levels 1 through 9 range from best speed to best compression.
func (c *Conn) SetDefaultCompressionLevel(n int) error {
defaultCompressionLevel = n
}
In most situations I suppose users that need custom compression need to set this default value once.
Then if someone needs compression level per connection/message we can add SetCompressionLevel and a [12]flateWriterPool:
// SetCompressionLevel sets the flate compression level for the next message.
// Valid levels range from -2 to 9. Level -2 will use Huffman compression only.
// Level -1 uses the default compression level. Level 0 does not attempt any
// compression. Levels 1 through 9 range from best speed to best compression.
// If not set default compression level will be used.
func (c *Conn) SetCompressionLevel(n int) error {
}
If it's not called but compression negotiated defaultCompressionLevel will be used. The only caveat I see is that SetDefaultCompressionLevel should be called once before application starts in current implementation but it seems pretty opaque.
Just looked at size of each flate.Writer instance:
package main
import "unsafe"
import "compress/flate"
func main() {
var w flate.Writer
println(unsafe.Sizeof(w))
}
600Kb! This size is surprising for me - I have not even assumed that it's such a big thing:)
Yeah - there is quite a number of tables needed to maintain state, produce huffman tables and buffer output. There isn't much that can be done about it, except for level -2(HuffmanOnly), 0 (No Compression) and 1(Best Speed) in the standard library.
For my own library I have been looking at reducing the memory requirements for encoding options that does not require as many buffers, see https://github.com/klauspost/compress/pull/70 (it still has issues, as can be seen by the crash I logged in the issue)
@FZambia I do not want to use package level variables for settings. The setting should be included with the other settings in Dialer and Upgrader or it should be part of the connection so it can be changed from message to message.
See b0dc45572b148296ddf01989da00a2923d213a56.
@garyburd many thanks for your time helping us with this. Looking at all these profiles do you see any way to reduce compression memory usage and allocations?
If the application sends the same message to multiple clients, then #182 is the next thing to try.
@garyburd #182 could do magic in my use case, where my fan out is between 50K-90K per sec where all messages are the same.
Thank you for your help.
Is this issue fixed by b0dc45572b148296ddf01989da00a2923d213a56 and 804cb600d06b10672f2fbc0a336a7bee507a428e?
@garyburd heap before changes:
After changes (compression level 1 and using PreparedMessage):

The same picture for CPU. So we don't see compression in heap and cpu profiles on first place anymore.
Though memory usage reduced it's still pretty high when using compression - but this can be application problem, we will try to investigate
@garyburd just analized inuse_space profile @joshdvir gave me.
The leaders are gorilla/websocket.newConnBRW (36%), http.newBufioWriterSize (17%), http.newBufioReader (16%). I thought we used a recent improvement you added in #223 but looking at profiles I noticed that read and write buffers from hijack are not reused for some reason. Then I found a bug in Centrifugo code - ReadBufferSize and WriteBufferSize were set to SockJS-go default values (4096) in case of those sizes set to 0 in configuration. Will build with fix and come back with results.
There are some other things in profiles that might interest you - I'll try to upload svg graph visualizations from build with fix.
Here are some graphs using Centrifugo with latest Gorilla Websocket. This is from a Centrifugo node running by @joshdvir with many connections (about 100k) and websocket compression enabled (level 1), go1.7.5
CPU:
Here we see that most of cpu spent on syscalls - read, write - I don't think we can do a lot here, because we send a lot of fanout messages to different sockets. So this looks normal.
Alloc space:
Here we see a big impact of compress/flate.(*decompressor).Reset - I thought a bit how to improve this, but have not found a way..
Inuse space:
@garyburd do you see a way to improve things further? If not then we can live with this I suppose.
P.S. I am going to create many ws connections on local machine with compression enabled/disabled and compare profiles.
I found some tweaks that could be done to the decoder to avoid allocations, and postpone allocation of about 4KB to when it is actually needed.
https://github.com/klauspost/compress/pull/76
I don't see anything obvious to improve in the websocket package.