spring-amqp icon indicating copy to clipboard operation
spring-amqp copied to clipboard

The CachingConnectionFactory findOpenChannel message lock contention is severe

Open missence opened this issue 1 year ago • 3 comments

          I get **22,900/second** with this app...
@SpringBootApplication
public class Gh644Application implements CommandLineRunner {

	public static void main(String[] args) {
		SpringApplication.run(Gh644Application.class, args).close();
	}

	@Autowired
	private RabbitTemplate template;

	@Autowired
	private RabbitListenerEndpointRegistry registry;

	private final CountDownLatch latch = new CountDownLatch(1_000_000);

	@Override
	public void run(String... arg0) throws Exception {
		for (int i = 0; i < 1_000_000; i++) {
			template.convertAndSend("perf", "foo");
		}
		StopWatch watch = new StopWatch();
		watch.start();
		this.registry.start();
		this.latch.await();
		watch.stop();
		System.out.println(watch.getTotalTimeMillis() + " rate: " + 1_000_000_000.0 / watch.getTotalTimeMillis());
	}

	@Bean
	public Queue perf() {
		return new Queue("perf", false, false, true);
	}

	@RabbitListener(queues = "perf")
	public void listen(Message message) {
		this.latch.countDown();
	}

}

and

spring.rabbitmq.listener.simple.prefetch=100
spring.rabbitmq.listener.simple.transaction-size=50
spring.rabbitmq.listener.simple.auto-startup=false

When I remove the transaction-size property (so we send an ack per message), I got 17,500/sec.

When I used the native consumer, I got very similar results (17k/sec).

...
//		this.registry.start();

		ConnectionFactory cf = new ConnectionFactory();
		Connection conn = cf.newConnection();
		Channel channel = conn.createChannel();
		channel.basicQos(100);
		channel.basicConsume("perf", new DefaultConsumer(channel) {

			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				latch.countDown();
				channel.basicAck(envelope.getDeliveryTag(), false);
			}

		});
		this.latch.await();
		watch.stop();
		System.out.println(watch.getTotalTimeMillis() + " rate: " + 1_000_000_000.0 / watch.getTotalTimeMillis());
		channel.close();
		conn.close();

So there must be something else going on in your code.

Originally posted by @garyrussell in https://github.com/spring-projects/spring-amqp/issues/644#issuecomment-321827812

Your example is single-threaded, and if you change it to multi-threaded, you will have a lock race. When executing the CachingConnectionFactory findOpenChannel method, a lock wait will occur, and the greater the concurrency, the longer the wait

missence avatar Oct 12 '23 10:10 missence