JavaNSQClient
JavaNSQClient copied to clipboard
Consuming with snappy only receives one message
//curl -d 'message 1' 'http://localhost:4151/put?topic=test_snappy'
//curl -d 'message 2' 'http://localhost:4151/put?topic=test_snappy'
//curl -d 'message 3' 'http://localhost:4151/put?topic=test_snappy'
NSQLookup lookup = new DefaultNSQLookup();
lookup.addLookupAddress("localhost", 4161);
NSQConfig config = new NSQConfig();
config.setCompression(NSQConfig.Compression.SNAPPY);
NSQConsumer consumer = new NSQConsumer(lookup, "test_snappy", "test_consume",
(msg) -> {
System.out.println(new String(msg.getMessage()));
msg.finished();
}, config);
consumer.start();
Thread.sleep(20000);
System.out.println("done");
10:10:01.997 INFO Created connection: Rob-Seeds-MacBook-Pro.local:4150 - Connection.<init>
10:10:02.021 INFO IdentifyResponse: {"max_rdy_count":2500,"version":"0.3.2","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":true,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250} - NSQFeatureDetectionHandler.channelRead0
10:10:02.025 INFO Adding snappy to pipline - NSQFeatureDetectionHandler.installSnappyDecoder
10:10:02.027 INFO IdentifyResponse: OK - NSQFeatureDetectionHandler.channelRead0
10:10:02.029 INFO Server identification: {"max_rdy_count":2500,"version":"0.3.2","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":true,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250} - Connection.<init>
message 1
done
Works as expected when not using snappy compression. Publishing with snappy enabled works fine.
I can reproduce this, looking into it.
After playing around a bit the problem seems to be that the messages are too small. So the server basically waits to get enough data that compress actually has an effect.
There is also this bug: https://github.com/nsqio/nsq/issues/532