rabbitmq-kotlin
rabbitmq-kotlin copied to clipboard
Kotlin coroutine based library for RabbitMQ
The White Rabbit
The White Rabbit is a fast and asynchronous RabbitMQ (AMQP) client library based on Kotlin coroutines. Currently the following features are supported:
- Queue and exchange manipulations
- Message publishing with confirmation
- Message consuming with acknowledgment
- Transactional publishing and consuming
- RPC pattern
Adding to project
Gradle
repositories {
jcenter()
}
compile 'com.viartemev:the-white-rabbit:$version'
Maven
<repositories>
<repository>
<id>jcenter</id>
<url>https://jcenter.bintray.com/</url>
</repository>
</repositories>
<dependency>
<groupId>com.viartemev</groupId>
<artifactId>the-white-rabbit</artifactId>
<version>${version}</version>
</dependency>
Usage notes and examples
Use one of the extension methods on com.rabbitmq.client.Connection
to get a channel you need:
connection.channel {
/*
The plain channel with consumer acknowledgments, supports:
-- queue and exchange manipulations
-- asynchronous consuming
-- RPC pattern
*/
}
connection.confirmChannel { //
/*
Channel with publisher confirmations, additionally supports:
-- asynchronous message publishing
*/
}
connection.txChannel { // transactional support
/*
Supports transactional publishing and consuming.
*/
}
Queue and exchange manipulations
Asynchronous exchange declaration
connection.channel.declareExchange(ExchangeSpecification(EXCHANGE_NAME))
Asynchronous queue declaration
connection.channel.declareQueue(QueueSpecification(QUEUE_NAME))
Asynchronous queue binding to an exchange
connection.channel.bindQueue(BindQueueSpecification(EXCHANGE_NAME, QUEUE_NAME))
Asynchronous message publishing with confirmation
connection.confirmChannel {
publish {
val messages = (1..n).map { createMessage("Hello #$it") }
publishWithConfirmAsync(coroutineContext, messages).awaitAll()
}
}
or
connection.confirmChannel {
publish {
coroutineScope {
val messages = (1..n).map { createMessage("Hello #$it") }
messages.map { async { publishWithConfirm(it) } }
}
}
}
Asynchronous message consuming with acknowledgement
Consume only n-messages:
connection.channel {
consume(QUEUE_NAME, PREFETCH_COUNT) {
(1..n).map { async { consumeMessageWithConfirm({ println(it) }) } }.awaitAll()
}
}
Transactional publishing and consuming
RabbitMQ and AMQP itself offer rather scarce support for transaction. When considering using transactions you should be aware that:
- a transaction could only span one channel and one queue;
-
com.rabbitmq.client.Channel
is not thread-safe; - channel can be either in confirm mode or in transaction mode at a time;
- transactions cannot be nested into each other;
The library provides a convenient way to perform transactional publishing and receiving based on transaction
extension function. This function commits a transaction upon normal execution of the block and rolls it back if a RuntimeException
occurs. Exceptions are always propagated further. Coroutines are not used for publishing though, since there are no any asynchronous operations involved.
connection.txChannel {
transaction {
val message = createMessage(queue = oneTimeQueue, body = "Hello from tx")
publish(message)
}
}
RPC pattern
connection.channel {
val message = RabbitMqMessage(MessageProperties.PERSISTENT_BASIC, "Hello world".toByteArray())
coroutineScope {
(1..10).map {
async {
rpc {
call(requestQueueName = "rpc_request", message = message)
.also { println("Reply: ${String(it.body)}") }
}
}
}.awaitAll()
}
}