zio-amqp
zio-amqp copied to clipboard
[question] How to maintain a global connection?
I’m using the ZIO-HTTP
framework, and I found that the ZIO-AMQP
Connection must be in scope to be effective. Suppose I want a unique global connection after the program starts, and then use this connection to create and destroy some channels in some scope. What should I do?
Currently, I’m using ZLayer.scoped
to get a ZLayer[Any, Throwable, Connection]
, but this will create a new connection every time I provide this layer.
I may still be a bit confused about the use of scope and layer, so is there any suggestion for the scenario I presented?
Apologies for the late reply. It seems like notification was turned off for me. Are you able to provide a simple example of what you currently have?
From my understanding here, you want to create a single connection and reuse it across your code. This should be as simple as providing the connection layer from your main or highest possible level in your code.
Zlayer allows you to provide dependencies to your logic using the ZIO env variable, the scoped variant makes it so that once the provided layer is out of scope, it will terminate said dependency
It seems like the issue you have is that you are providing the rabbit layer multiple times where you should do it only once
Tku for your reply. Currently, my code is like below: I have a connLayer like:
val connLayer: ZLayer[Any, Throwable, Connection] = ZLayer.scoped {
for {
config <- ZIO.config(RabbitMqConfig.config)
amqpConfig <- ZIO.attempt(
new AMQPConfig(
user = config.user,
password = config.password,
vhost = config.vhost,
heartbeatInterval = AMQPConfig.default.heartbeatInterval,
ssl = AMQPConfig.default.ssl,
host = config.host,
port = config.port,
connectionTimeout = 60.seconds
)
)
connection <- Amqp.connect(amqpConfig)
} yield connection
}
I have a consume fiber like :
def consume: ZIO[Any, Throwable, Unit] = ZIO.scoped {
for {
channel <- AmqpService.getChannel
config <- ZIO.config(RabbitMqConfig.config)
_ <- channel.basicQos(1, global = false)
_ <- channel
.consume(queue = QueueName(config.importWave3Queue), consumerTag = ConsumerTag(consumerTag))
.retry(Schedule.fibonacci(1.second) && Schedule.recurs(10))
.mapZIO(record => doConsume(record, channel))
.runDrain
} yield ()
}.provide(
Scope.default,
AmqpConnection.connLayer
)
And when I start my app, I will do
val initJob = for {
_ <- xxxx.consume.retry(Schedule.forever).forkDaemon
....
} yield ()
I think I make a mistake about the connection scope, so it will connect every time when getting the messages