How is ResilientStream supposed to work in the context of RMQ?
I have a test program that consumes from a queue and then throws an exception. It does not Ack/Nack/Reject anything explicitly.
When I wrap it in ResilientStream.run and publish a message to the queue I can see that the program is restarted once. The msg in the queue becomes "unacked". When I terminate the program the message becomes "ready" in the queue and just stays there. The program does not seem to pull it again after the restart delay.
Is this the expected behaviour? My expectation was that with each restart it would also pull a message from RMQ. Otherwise what's the point of restarting it if it won't be doing anything anymore? Can this behaviour be achieved somehow?
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
It depends on how you construct your program I believe.
Restarting just the consumer stream from createAckerConsumer means re-using the same connection and the same channel. The broker is still awaiting an ack from that channel.
If you what to use ResilientStream your stream should also spawn and release the channel I think.
I believe restarting the channel would be enough, if not include also the creation of the connection under the resilient stream.
Try something like this:
val config: Fs2RabbitConfig = ???
val run: IO[Nothing] = RabbitClient
.default[IO](config)
.resource
.evalMap { connection =>
ResilientStream.run {
for {
given AMQPChannel <- fs2.Stream.resource(connection.createConnectionChannel)
(acker, stream) <- fs2.Stream.eval(connection.createAckerConsumer(QueueName("queue_name")))
program <- stream.evalMap(msg => IO(println(msg)) >> acker(Ack(msg.deliveryTag)))
} yield program
}
}
.useForever
Let me know if this works and how ( just channel ? )