reactor-netty icon indicating copy to clipboard operation
reactor-netty copied to clipboard

Exception in a Flux created by a subscriber will be ignored by reactor-netty

Open dantesun opened this issue 6 years ago • 1 comments

Following link https://pivotal.io/security should be used to report security related issues

Expected behavior

Retry works

Actual behavior

Exception ignored by FluxReceive

Steps to reproduce

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.io.IOException;
import java.time.Duration;
import java.util.logging.Level;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.http.client.HttpClient;
import reactor.ipc.netty.resources.LoopResources;
import reactor.retry.Retry;

public class PollingTest {

  private static final Logger logger = LoggerFactory.getLogger(PollingTest.class);

  private Flux<Integer> createFlux() {
    return Flux.just(1, 2, 3, 4, 5, 6, 7);
  }

  @Test
  public void polling() throws InterruptedException {
    HttpClient client =
        HttpClient.builder()
            .options(
                builder -> {
                  LoopResources channelResources = LoopResources.create("my-loop", 2, false);
                  builder.loopResources(channelResources);
                  builder.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000);
                  builder.disablePool();
                })
            .build();

    Retry<Object> ioError =
        Retry.anyOf(IOException.class)
            .retryMax(Integer.MAX_VALUE)
            .fixedBackoff(Duration.ofSeconds(1));

    client
        .get(
            "http://releases.ubuntu.com/16.04.4/ubuntu-16.04.4-desktop-amd64.iso",
            request -> {
              request
                  .context()
                  .addHandlerFirst(new IdleStateHandler(1, 0, 0))
                  .addHandlerLast(
                      new ChannelDuplexHandler() {
                        @Override
                        public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
                            throws Exception {
                          if (evt instanceof IdleStateEvent) {
                            ctx.close();
                          } else {
                            super.userEventTriggered(ctx, evt);
                          }
                        }
                      });
              return request.send();
            })
        .doOnError(e -> logger.error("Error!", e))
        .flatMapMany(NettyInbound::receive)
        .log(logger.getName(), Level.SEVERE, SignalType.ON_ERROR, SignalType.CANCEL)
        .retryWhen(ioError)
        .subscribe(
            byteBuf -> {
              logger.info("Msg: {}", byteBuf);
              Flux.just(1,2,3)
                  .onErrorReturn(8)
                  .subscribe(
                      i -> {
                          throw new RuntimeException(i + " error");
                      });
            });
    Thread.sleep(Duration.ofMinutes(5).toMillis())

Reactor Netty version

0.7.7.RELEASE

JVM version (e.g. java -version)

java version "1.8.0_144" Java(TM) SE Runtime Environment (build 1.8.0_144-b01) Java HotSpot(TM) 64-Bit Server VM (build 25.144-b01, mixed mode)

OS version (e.g. uname -a)

Darwin 15.6.0 Darwin Kernel Version 15.6.0: Tue Jan 30 11:45:51 PST 2018; root:xnu-3248.73.8~1/RELEASE_X86_64 x86_64

dantesun avatar May 19 '18 16:05 dantesun

Hey @dantesun , throwing in the subscribe might not work here because there is no more handler downstream to pass the error to (and it will bubble up internally due to CallbackNotImplemented in subscribe). The only thing we can do at least in 0.8 is to pass that error to a generic error consumer assigned to the client, but it won't be passed back in the pipeline. That would look like this

HttpClient client = HttpClient.newConnection()
		              .tcpConfiguration(tcp -> tcp.runOn(LoopResources.create("my-loop", 2, false))
		                                                          .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000));

		client.doOnError(e -> logger.error(e))
                      .doOnRequest((req, c) -> c.addHandlerFirst(new IdleStateHandler(1, 0, 0))
		                                .addHandlerLast(new ChannelDuplexHandler() {
			                                @Override
			                                public void userEventTriggered(
					                                ChannelHandlerContext ctx,
					                                Object evt) throws Exception {
				                                if (evt instanceof IdleStateEvent) {
					                                ctx.close();
				                                }
				                                else {
					                                super.userEventTriggered(ctx, evt);
				                                }
			                                }
		                                }))
		      .get()
		      .uri("http://releases.ubuntu.com/16.04.4/ubuntu-16.04.4-desktop-amd64.iso")
		      .responseContent()
		      .log(logger.getName())
		      .retry(IOException.class::isInstance)
		      .subscribe(byteBuf -> {
			      logger.info("Msg: {}", byteBuf);
			      Flux.just(1, 2, 3)
			          .onErrorReturn(8)
			          .subscribe(i -> {
				          throw new RuntimeException(i + " error");
			          });
		      });

smaldini avatar May 20 '18 08:05 smaldini