nats.go
nats.go copied to clipboard
nc.Request speed test
To the latest version
ping host time=1.111 ms speed 1000 mbps
test
node 1
nc.QueueSubscribe("test", "job_workers", func(msg *nats.Msg) {
msg.Respond([]byte("ok"))
})
node 2 times := time.Now() res, err := nc.Request("test", []byte("hello"), time.Second*1) log.Println(string(res.Data), err, times.Sub(time.Now()))
result 4.064834ms I think this is not very fast
test big data
times := time.Now() buf := make([]byte, 100000) res, err := nc.Request("test", buf, time.Second*1) log.Println(string(res.Data), err, times.Sub(time.Now()))
100 kib
18.530951ms
I think it's not fast ?
After testing a little, I noticed a very strange behavior.
buf := make([]byte, 30000000) times := time.Now() nc.Publish("test", buf) log.Println(times.Sub(time.Now()))
801ns
buf := make([]byte, 3000000) times := time.Now() nc.Publish("test", buf) log.Println(times.Sub(time.Now()))
366ns
buf := make([]byte, 300000) times := time.Now() nc.Publish("test", buf) log.Println(times.Sub(time.Now()))
6.823769ms
buf := make([]byte, 30000) times := time.Now() nc.Publish("test", buf) log.Println(times.Sub(time.Now()))
977ns
send 30000000 bytes = 801ns send 3000000 bytes = 366ns send 300000 bytes = 6.823769ms send 30000 bytes = 977ns
It was the same result. If some number when the nuts starts to work slowly.
try send buffer 300000 bytes
You should run these in a loop and take the avg at least. Running a single one will show variability. For instance with just a single one the first Request() we spin up the global response subscription.
For the 6ms anomaly my bet is that is GC. Run test with GC off and see if you get it again.
for { buf := make([]byte, 3000000) times := time.Now() nc.Publish("test", buf) log.Println(times.Sub(time.Now())) time.Sleep(100 * time.Millisecond) }
2019/09/12 02:52:54 -856ns 2019/09/12 02:52:54 -846ns 2019/09/12 02:52:54 -869ns 2019/09/12 02:52:54 -711ns 2019/09/12 02:52:54 -920ns 2019/09/12 02:52:54 -918ns 2019/09/12 02:52:55 -907ns 2019/09/12 02:52:55 -846ns 2019/09/12 02:52:55 -1.185µs 2019/09/12 02:52:55 -924ns 2019/09/12 02:52:55 -941ns
for { buf := make([]byte, 300000) times := time.Now() nc.Publish("test", buf) log.Println(times.Sub(time.Now())) time.Sleep(100 * time.Millisecond) }
2019/09/12 02:53:31 -8.127566ms 2019/09/12 02:53:31 -387.087µs 2019/09/12 02:53:31 -397.892µs 2019/09/12 02:53:31 -397.643µs 2019/09/12 02:53:31 -561.26µs 2019/09/12 02:53:31 -605.109µs 2019/09/12 02:53:31 -276.006µs 2019/09/12 02:53:31 -285.92µs 2019/09/12 02:53:31 -525.268µs 2019/09/12 02:53:31 -7.526305ms 2019/09/12 02:53:32 -7.962263ms 2019/09/12 02:53:32 -9.250239ms 2019/09/12 02:53:32 -8.548932ms 2019/09/12 02:53:32 -6.36826ms 2019/09/12 02:53:32 -6.419914ms
loop test, ok I true off GC
GOGC=off
for { buf := make([]byte, 3000000) times := time.Now() nc.Publish("test", buf) log.Println(times.Sub(time.Now())) time.Sleep(100 * time.Millisecond) }
2019/09/12 02:56:36 -481ns 2019/09/12 02:56:37 -1.015µs 2019/09/12 02:56:37 -1µs 2019/09/12 02:56:37 -887ns 2019/09/12 02:56:37 -964ns 2019/09/12 02:56:37 -1.159µs 2019/09/12 02:56:37 -1.277µs 2019/09/12 02:56:37 -1.105µs 2019/09/12 02:56:37 -953ns 2019/09/12 02:56:37 -1.088µs 2019/09/12 02:56:37 -952ns 2019/09/12 02:56:38 -920ns 2019/09/12 02:56:38 -1.184µs 2019/09/12 02:56:38 -968ns 2019/09/12 02:56:38 -984ns 2019/09/12 02:56:38 -1.243µs 2019/09/12 02:56:38 -944ns
for { buf := make([]byte, 300000) times := time.Now() nc.Publish("test", buf) log.Println(times.Sub(time.Now())) time.Sleep(100 * time.Millisecond) }
2019/09/12 02:57:47 -6.761753ms 2019/09/12 02:57:47 -381.104µs 2019/09/12 02:57:47 -722.048µs 2019/09/12 02:57:47 -444.247µs 2019/09/12 02:57:47 -391.912µs 2019/09/12 02:57:47 -543.44µs 2019/09/12 02:57:47 -579.934µs 2019/09/12 02:57:47 -8.716466ms 2019/09/12 02:57:48 -9.052397ms 2019/09/12 02:57:48 -10.38664ms 2019/09/12 02:57:48 -5.444815ms 2019/09/12 02:57:48 -7.04377ms 2019/09/12 02:57:48 -8.384894ms 2019/09/12 02:57:48 -6.333591ms 2019/09/12 02:57:48 -5.64616ms 2019/09/12 02:57:48 -6.314187ms 2019/09/12 02:57:48 -9.340573ms 2019/09/12 02:57:49 -8.683643ms 2019/09/12 02:57:49 -7.88775ms 2019/09/12 02:57:49 -8.662241ms 2019/09/12 02:57:49 -9.346188ms
magic
Can you share the whole program in a gist? Also what version of Go? What server version?
You have a problem since you are doing time.Since(time.Now()) and don't see where you use times
. Usually you should use time.Since(startTime)
go version go1.12.9 darwin/amd64
package main
import (
"log"
"time"
nats "github.com/nats-io/nats.go"
)
func main() {
var urls = "any"
opts := []nats.Option{nats.Name("any"), nats.Token("any")}
nc, err := nats.Connect(urls, opts...)
if err != nil {
log.Println(err)
}
buf := make([]byte, 300000)
for {
times := time.Now()
nc.Publish("test", buf)
log.Println(times.Sub(time.Now()))
time.Sleep(100 * time.Millisecond)
}
select {}
}
time.Since(startTime) same speed result
2019/09/12 03:04:20 9.862577ms 2019/09/12 03:04:20 9.725442ms 2019/09/12 03:04:20 8.069759ms 2019/09/12 03:04:20 7.675831ms 2019/09/12 03:04:20 7.815554ms 2019/09/12 03:04:20 7.33363ms 2019/09/12 03:04:21 8.232618ms 2019/09/12 03:04:21 10.043181ms
Sorry, so you do start time - current time, it is a bit strange, but ok.
only bug 300000 size not 30000 not 3000000 it is very strange for me to see.
nats bug demo on demo.nats.io
package main
import ( "log" "time"
nats "github.com/nats-io/nats.go" )
func main() { var urls = "demo.nats.io" opts := []nats.Option{nats.Name("aaa.test")} nc, err := nats.Connect(urls, opts...) if err != nil { log.Println(err) } buf := make([]byte, 300000) for { times := time.Now() nc.Publish("testers", buf) log.Println(time.Since(times)) time.Sleep(100 * time.Millisecond) } select {} }
2019/09/12 03:12:59 952.344037ms 2019/09/12 03:13:00 89.122275ms 2019/09/12 03:13:00 87.365309ms 2019/09/12 03:13:00 86.130758ms 2019/09/12 03:13:00 276.998µs 2019/09/12 03:13:00 330.673µs 2019/09/12 03:13:00 351.024µs 2019/09/12 03:13:00 327.955µs 2019/09/12 03:13:00 318.397µs 2019/09/12 03:13:01 303.901µs 2019/09/12 03:13:01 295.743µs 2019/09/12 03:13:01 198.36µs 2019/09/12 03:13:01 73.506249ms 2019/09/12 03:13:01 4.702034ms 2019/09/12 03:13:01 287.715µs 2019/09/12 03:13:01 90.059µs 2019/09/12 03:13:01 80.191µs 2019/09/12 03:13:02 170.594002ms 2019/09/12 03:13:02 242.827µs 2019/09/12 03:13:02 243.7µs 2019/09/12 03:13:02 138.109µs 2019/09/12 03:13:02 128.463µs
Are you using master for nats.go or a release version?
master
I re get master and try now same bug
go get github.com/nats-io/nats.go stat github.com/nats-io/nats.go: no such file or directory
go run t2.go
2019/09/12 03:21:51 989.007998ms 2019/09/12 03:21:51 129.696299ms 2019/09/12 03:21:51 57.686817ms 2019/09/12 03:21:52 79.717µs 2019/09/12 03:21:52 180.369µs 2019/09/12 03:21:52 93.472µs 2019/09/12 03:21:52 143.104µs 2019/09/12 03:21:52 276.577543ms 2019/09/12 03:21:52 93.960648ms 2019/09/12 03:21:53 95.93479ms 2019/09/12 03:21:53 93.343343ms 2019/09/12 03:21:53 96.256µs 2019/09/12 03:21:53 149.916µs
Publish() is async. There is no guarantee of anything happening after you call Publish. Also you should check the error returned by Publish. There is a maximum payload for the demo server (and all NATS servers default to this) of 1MB. So anything over that is failing. If you want to see how long it takes to have the publish processed by the server. Try this.
nc, err := nats.Connect("demo.nats.io")
...
rttStart := time.Now()
nc.Flush()
rtt := time.Since(rttStart)
start := time.Now()
err := nc.Publish(buf)
nc.Flush()
fmt.Printf("time to publish is %v\n", time.Since(start) - rtt)
I did not check code but should be close..
go get github.com/nats-io/nats.go/
I download zip release 1.8
bug more stale big time
2019/09/12 03:27:59 959.714995ms 2019/09/12 03:27:59 88.025755ms 2019/09/12 03:27:59 87.146521ms 2019/09/12 03:28:00 278.678185ms 2019/09/12 03:28:00 91.920812ms 2019/09/12 03:28:00 91.650462ms 2019/09/12 03:28:00 281.461477ms 2019/09/12 03:28:01 90.247984ms 2019/09/12 03:28:01 117.519154ms 2019/09/12 03:28:01 249.165835ms 2019/09/12 03:28:01 93.730097ms 2019/09/12 03:28:02 89.551815ms 2019/09/12 03:28:02 116.460083ms 2019/09/12 03:28:02 248.662944ms 2019/09/12 03:28:02 86.849819ms 2019/09/12 03:28:03 88.152979ms 2019/09/12 03:28:03 87.551449ms 2019/09/12 03:28:03 117.076905ms 2019/09/12 03:28:03 93.186587ms 2019/09/12 03:28:04 248.073315ms 2019/09/12 03:28:04 90.923666ms 2019/09/12 03:28:04 88.280471ms 2019/09/12 03:28:04 87.822888ms 2019/09/12 03:28:04 119.988178ms
I drop folder and re download master go get github.com/nats-io/nats.go/
same problem
2019/09/12 03:30:24 979.584915ms 2019/09/12 03:30:24 130.025865ms 2019/09/12 03:30:24 58.084435ms 2019/09/12 03:30:25 86.934704ms 2019/09/12 03:30:25 233.618µs 2019/09/12 03:30:25 204.391µs 2019/09/12 03:30:25 189.736624ms 2019/09/12 03:30:25 250.559µs 2019/09/12 03:30:25 196.024µs 2019/09/12 03:30:25 266.681µs 2019/09/12 03:30:26 228.82µs
if I change size 30000 or 3000000 no bug only size = 300000
I try build same bug
go build t2.go ./t2 2019/09/12 03:33:10 962.183493ms 2019/09/12 03:33:10 90.493279ms 2019/09/12 03:33:10 87.501768ms 2019/09/12 03:33:10 86.164323ms 2019/09/12 03:33:10 215.895µs 2019/09/12 03:33:11 103.982µs 2019/09/12 03:33:11 462.054625ms 2019/09/12 03:33:11 87.866027ms 2019/09/12 03:33:12 281.999896ms 2019/09/12 03:33:12 88.00666ms
@deepch there is also a latency framework here: https://github.com/nats-io/latency-tests sharing in case might help to test or compare the results.
@deepch Thank you. I will investigate this tomorrow and get back to you. I do see it too and there is probably an explanation. Will keep you posted.
Thank you very much for the prompt and quality support.
Just to reiterate on what @derekcollison mentioned, the case with 3000000
(3,000,000) is "fast" because the server simply closes the connection since it is above the default 1MB limit, and since you don't check Publish() error, it loops very quickly.
But I will continue looking into this tomorrow to make sure that there is nothing wrong.
Is this function not nc.Publish blocking? Do I have to get the result so much different in time? The only question is this.
200000 demo.nats.io
2019/09/12 03:52:23 962.422377ms
2019/09/12 03:52:23 126.567043ms 2019/09/12 03:52:23 88.082844ms 2019/09/12 03:52:24 50.898884ms 2019/09/12 03:52:24 135.909µs 2019/09/12 03:52:24 262.96µs 2019/09/12 03:52:24 179.119µs 2019/09/12 03:52:24 306.264µs 2019/09/12 03:52:24 330.299µs 2019/09/12 03:52:24 295.582µs 2019/09/12 03:52:24 305.786µs 2019/09/12 03:52:24 347.827µs 2019/09/12 03:52:25 81.885µs 2019/09/12 03:52:26 1.143189432s 2019/09/12 03:52:26 475.863506ms 2019/09/12 03:52:27 128.062146ms 2019/09/12 03:52:27 90.380679ms 2019/09/12 03:52:27 91.81321ms 2019/09/12 03:52:27 132.723807ms 2019/09/12 03:52:27 87.870723ms 2019/09/12 03:52:28 91.629429ms
200000 local net
2019/09/12 03:53:15 8.481095ms
2019/09/12 03:53:15 258.731µs 2019/09/12 03:53:15 493.368µs 2019/09/12 03:53:15 438.499µs 2019/09/12 03:53:16 59.272042ms 2019/09/12 03:53:16 8.063727ms 2019/09/12 03:53:16 5.393905ms 2019/09/12 03:53:16 5.002602ms 2019/09/12 03:53:16 6.069069ms 2019/09/12 03:53:16 7.212727ms 2019/09/12 03:53:16 4.592831ms
I think that time should not be so big, especially on the local network. And the larger the package, the more noticeable
Publish() is async. There is no guarantee of anything happening after you call Publish. Also you should check the error returned by Publish. There is a maximum payload for the demo server (and all NATS servers default to this) of 1MB. So anything over that is failing. If you want to see how long it takes to have the publish processed by the server. Try this.
nc, err := nats.Connect("demo.nats.io") ... rttStart := time.Now() nc.Flush() rtt := time.Since(rttStart) start := time.Now() err := nc.Publish(buf) nc.Flush() fmt.Printf("time to publish is %v\n", time.Since(start) - rtt)
I did not check code but should be close..
Publish() is async ok But I understand correctly that it can block my program Perhaps this method is not suitable for my task of sending a large number of messages per second. if I just need to send a message in the stream, then apparently I need to use the channel?
let's say I have a loop in which delay is important, and if I call a function that causes large delays, it will slow down, of course I can write a pool and process these tasks, but doesn't that do it internally? I understand that the delivery guarantee is important, but there is no way to send it to me faster, the guarantee is not important.
NATS is well suited for high speed messaging. Using our demo server to do performance tests probably not the best idea.
Publish() can flush the buffers if you are overflowing them, but usually is totally async and let's another Go routine flush to the underlying socket.
Also have you tried the latency framework that @wallyqs pointed you to?
And finally, what is it that you are trying to achieve?
I test now
I want to send a large number of big messages without blocking. Considering that my messages are very large, I think I realized that the point is
defaultBufSize = 32768
2019/09/12 05:02:53 16.897677ms
2019/09/12 05:02:53 15.791674ms 2019/09/12 05:02:53 17.406349ms 2019/09/12 05:02:54 18.225375ms 2019/09/12 05:02:54 13.645058ms 2019/09/12 05:02:54 14.447358ms 2019/09/12 05:02:54 15.467449ms 2019/09/12 05:02:54 19.002983ms 2019/09/12 05:02:54 14.294515ms 2019/09/12 05:02:54 12.386862ms 2019/09/12 05:02:54 10.842388ms 2019/09/12 05:02:55 14.162707ms 2019/09/12 05:02:55 17.366928ms 2019/09/12 05:02:55 13.004184ms
I try change it to
defaultBufSize = 300000 * 2
result
2019/09/12 05:00:04 54.762µs
2019/09/12 05:00:04 65.024µs 2019/09/12 05:00:04 101.435µs 2019/09/12 05:00:04 64.964µs 2019/09/12 05:00:04 67.841µs 2019/09/12 05:00:04 63.709µs 2019/09/12 05:00:04 100.792µs 2019/09/12 05:00:05 92.84µs 2019/09/12 05:00:05 122.229µs 2019/09/12 05:00:05 69.867µs 2019/09/12 05:00:05 45.589µs 2019/09/12 05:00:05 53.604µs
the result is fully stable.
I think for my message size the buffer is too small.
You are absolutely right. Perhaps due to the size of my data, a large flush is too frequent.
in appearance it solves my problem, I do not know if this is the right solution
Thank you so much for your help.
Interesting, thanks for that info. I believe we depend on a bufio that when you send something bigger it just goes through and I guess does a send in place.
Make sure you are measuring not just the Publish() call which now will always be async with your larger buffer to all the messages being processed by the server.
There is also a good benchmark program here that you can use..
https://github.com/nats-io/nats.go/tree/master/bench
Thank you, I will investigate this in detail.
Publish() always be async - this is exactly what I wanted to achieve It would be cool if there was a sending channel buffered BindRequestChan ;)
What I am saying those is if Publish() is always async but your application takes another Nms to actually move the messages out of your app to the server, not sure that makes sense TBH. Eventually you have to get the messages out.
I understand that we are talking about transmission over the network and also understand the differences between asynс and no blocking
Non-blocking sending is what I need.
my problem if I use
for { result := Job1() nc.Publish("ch", result)<--- loop slow block 5-10-100 ms ..... Job2() .... }
beter for me
for { result := Job1() sendCH <-result <----- no block my main payload work Job2() ....... }