zio-amqp icon indicating copy to clipboard operation
zio-amqp copied to clipboard

[question] How to maintain a global connection?

Open sxh-lsc opened this issue 1 year ago • 3 comments

I’m using the ZIO-HTTP framework, and I found that the ZIO-AMQP Connection must be in scope to be effective. Suppose I want a unique global connection after the program starts, and then use this connection to create and destroy some channels in some scope. What should I do?

Currently, I’m using ZLayer.scoped to get a ZLayer[Any, Throwable, Connection], but this will create a new connection every time I provide this layer.

I may still be a bit confused about the use of scope and layer, so is there any suggestion for the scenario I presented?

sxh-lsc avatar Jan 31 '24 03:01 sxh-lsc

Apologies for the late reply. It seems like notification was turned off for me. Are you able to provide a simple example of what you currently have?

From my understanding here, you want to create a single connection and reuse it across your code. This should be as simple as providing the connection layer from your main or highest possible level in your code.

Zlayer allows you to provide dependencies to your logic using the ZIO env variable, the scoped variant makes it so that once the provided layer is out of scope, it will terminate said dependency

Adriani277 avatar Mar 29 '24 17:03 Adriani277

It seems like the issue you have is that you are providing the rabbit layer multiple times where you should do it only once

Adriani277 avatar Mar 29 '24 17:03 Adriani277

Tku for your reply. Currently, my code is like below: I have a connLayer like:

val connLayer: ZLayer[Any, Throwable, Connection] = ZLayer.scoped {
    for {
      config <- ZIO.config(RabbitMqConfig.config)

      amqpConfig <- ZIO.attempt(
        new AMQPConfig(
          user = config.user,
          password = config.password,
          vhost = config.vhost,
          heartbeatInterval = AMQPConfig.default.heartbeatInterval,
          ssl = AMQPConfig.default.ssl,
          host = config.host,
          port = config.port,
          connectionTimeout = 60.seconds
        )
      )
      connection <- Amqp.connect(amqpConfig)
    } yield connection
  }

I have a consume fiber like :

  def consume: ZIO[Any, Throwable, Unit] = ZIO.scoped {
    for {
      channel <- AmqpService.getChannel
      config <- ZIO.config(RabbitMqConfig.config)
      _ <- channel.basicQos(1, global = false)
      _ <- channel
        .consume(queue = QueueName(config.importWave3Queue), consumerTag = ConsumerTag(consumerTag))
        .retry(Schedule.fibonacci(1.second) && Schedule.recurs(10))
        .mapZIO(record => doConsume(record, channel))
        .runDrain
    } yield ()
  }.provide(
    Scope.default,
    AmqpConnection.connLayer
  )

And when I start my app, I will do

    val initJob = for {
      _ <- xxxx.consume.retry(Schedule.forever).forkDaemon
     ....
    } yield ()

I think I make a mistake about the connection scope, so it will connect every time when getting the messages

sxh-lsc avatar Apr 02 '24 04:04 sxh-lsc