spring-amqp
spring-amqp copied to clipboard
The CachingConnectionFactory findOpenChannel message lock contention is severe
I get **22,900/second** with this app...
@SpringBootApplication
public class Gh644Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(Gh644Application.class, args).close();
}
@Autowired
private RabbitTemplate template;
@Autowired
private RabbitListenerEndpointRegistry registry;
private final CountDownLatch latch = new CountDownLatch(1_000_000);
@Override
public void run(String... arg0) throws Exception {
for (int i = 0; i < 1_000_000; i++) {
template.convertAndSend("perf", "foo");
}
StopWatch watch = new StopWatch();
watch.start();
this.registry.start();
this.latch.await();
watch.stop();
System.out.println(watch.getTotalTimeMillis() + " rate: " + 1_000_000_000.0 / watch.getTotalTimeMillis());
}
@Bean
public Queue perf() {
return new Queue("perf", false, false, true);
}
@RabbitListener(queues = "perf")
public void listen(Message message) {
this.latch.countDown();
}
}
and
spring.rabbitmq.listener.simple.prefetch=100
spring.rabbitmq.listener.simple.transaction-size=50
spring.rabbitmq.listener.simple.auto-startup=false
When I remove the transaction-size
property (so we send an ack per message), I got 17,500/sec.
When I used the native consumer, I got very similar results (17k/sec).
...
// this.registry.start();
ConnectionFactory cf = new ConnectionFactory();
Connection conn = cf.newConnection();
Channel channel = conn.createChannel();
channel.basicQos(100);
channel.basicConsume("perf", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
latch.countDown();
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
this.latch.await();
watch.stop();
System.out.println(watch.getTotalTimeMillis() + " rate: " + 1_000_000_000.0 / watch.getTotalTimeMillis());
channel.close();
conn.close();
So there must be something else going on in your code.
Originally posted by @garyrussell in https://github.com/spring-projects/spring-amqp/issues/644#issuecomment-321827812
Your example is single-threaded, and if you change it to multi-threaded, you will have a lock race. When executing the CachingConnectionFactory findOpenChannel method, a lock wait will occur, and the greater the concurrency, the longer the wait