squbs icon indicating copy to clipboard operation
squbs copied to clipboard

Circuit Breaker hangs on Half Open state

Open arturgalenom opened this issue 3 years ago • 0 comments

Hello there,

I am trying to use Circuit Break with Akka Streams, but after some time changing from Open state to Half Open state due errors yet not recovered, the circuit breaker stops retry and hangs forever at half state. I don't know if I am doing something wrong.

The meaningful code of my configuration go bellow:

static class CBListenerActor extends AbstractActor {

        private final LoggerWrapper log;

        CBListenerActor() {
            log = new LoggerWrapper(LoggerFactory.getLogger(CBListenerActor.class), "SQS Circuit Breaker", getSelf().path().name());
        }

        @Override public Receive createReceive() {
            return receiveBuilder()
                    .matchEquals(Open.instance(), msg -> {
                        log.info("OPENED CIRCUIT BREAK");
                    })
                    .matchEquals(Closed.instance(), msg -> {
                        log.info("CLOSED CIRCUIT BREAK");
                    })
                    .matchEquals(HalfOpen.instance(), msg -> {
                        log.info("HALF OPENED CIRCUIT BREAK");
                    })
                    .build();
        }
    }
final CircuitBreakerState state =
                AtomicCircuitBreakerState.create(
                        getClass().getSimpleName(),
                        5,
                        Duration.ofSeconds(10),
                        Duration.ofSeconds(1),
                        Duration.ofSeconds(4),
                        2,
                        system.dispatcher(),
                        system.scheduler());

        final var props = Props.create(CBListenerActor.class);
        final var actorName = getClass().getSimpleName() + CBListenerActor.class.getSimpleName() + "-" + new Random().nextInt();
        final var subscriber = this.system.actorOf(props, actorName);
        state.subscribe(subscriber, Open.instance());
        state.subscribe(subscriber, Closed.instance());
        state.subscribe(subscriber, HalfOpen.instance());

        final CircuitBreakerSettings<ProcessResult, ProcessResult, Message> settings = CircuitBreakerSettings
                .<ProcessResult, ProcessResult, Message>create(state)
                .withUniqueIdMapper(Message::messageId)
                .withFailureDecider(tryResult -> {
                    if (tryResult.isSuccess()) {
                        return !tryResult.get().isSuccess();
                    }
                    return true;
                });

        final var circuitBreaker = CircuitBreaker.create(settings);

        final var flow =
                Flow.<Pair<ProcessResult, Message>>create().map(elem -> elem);


        SqsSourceSettings sqsSourceSettings =
                SqsSourceSettings.create()
                        .withWaitTime(getSqsWaitTime())
                        .withMaxBatchSize(getSqsBatchSize())
                        .withMaxBufferSize(getSqsBufferSize())
                        .withCloseOnEmptyReceive(false);

        final var join = circuitBreaker.join(flow);

        SqsSource.create(getSqsUrl(), sqsSourceSettings, sqsClient)
                .mapAsync(getSqsParallelism(),
                        (msg) -> CompletableFuture.supplyAsync(() -> executeProcess(msg)).thenApply(result -> Pair.create(result, msg)))
                .via(join)
                .map(pair -> {
                    final var processResult = pair.first().getOrElse(() -> new ProcessResult(false, false, null));
                    if (processResult.isSuccess()) {
                        return MessageAction.delete(processResult.getMessage());
                    } else if (processResult.isRetry()) {
                        return MessageAction.ignore(processResult.getMessage());
                    }
                    return MessageAction.delete(processResult.getMessage());
                })
                .runWith(SqsAckSink.create(getSqsUrl(), SqsAckSettings.create(), sqsClient), materializer);

Small piece of the log generated by CBListenerActor (I've deleted some parts of the log to don't expose too much information):

{"timestamp":"","level":"INFO","message":"OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--1740369079"}}
{"timestamp":"","level":"INFO","message":"HALF OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--1740369079"}}
{"timestamp":"","level":"INFO","message":"OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--1252266457"}}
{"timestamp":"","level":"INFO","message":"HALF OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--1252266457"}}
{"timestamp":"","level":"INFO","message":"OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor-343052081"}}
{"timestamp":"","level":"INFO","message":"HALF OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor-343052081"}}
{"timestamp":"","level":"INFO","message":"OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--983211955"}}
{"timestamp":"","level":"INFO","message":"HALF OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--983211955"}}
{"timestamp":"","level":"INFO","message":"OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--1740369079"}}
{"timestamp":"","level":"INFO","message":"HALF OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--1740369079"}}
{"timestamp":"","level":"INFO","message":"OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--1252266457"}}
{"timestamp":"","level":"INFO","message":"HALF OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--1252266457"}}

(Here never more happens changes from Open State to HalfOpen State to make a retry. I've waited for many many minutes.)

Well, Am I doing something wrong? Is that the normal behavior?

arturgalenom avatar Mar 02 '21 22:03 arturgalenom