reactor-netty
reactor-netty copied to clipboard
Exception in a Flux created by a subscriber will be ignored by reactor-netty
Following link https://pivotal.io/security should be used to report security related issues
Expected behavior
Retry works
Actual behavior
Exception ignored by FluxReceive
Steps to reproduce
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.io.IOException;
import java.time.Duration;
import java.util.logging.Level;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.http.client.HttpClient;
import reactor.ipc.netty.resources.LoopResources;
import reactor.retry.Retry;
public class PollingTest {
private static final Logger logger = LoggerFactory.getLogger(PollingTest.class);
private Flux<Integer> createFlux() {
return Flux.just(1, 2, 3, 4, 5, 6, 7);
}
@Test
public void polling() throws InterruptedException {
HttpClient client =
HttpClient.builder()
.options(
builder -> {
LoopResources channelResources = LoopResources.create("my-loop", 2, false);
builder.loopResources(channelResources);
builder.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000);
builder.disablePool();
})
.build();
Retry<Object> ioError =
Retry.anyOf(IOException.class)
.retryMax(Integer.MAX_VALUE)
.fixedBackoff(Duration.ofSeconds(1));
client
.get(
"http://releases.ubuntu.com/16.04.4/ubuntu-16.04.4-desktop-amd64.iso",
request -> {
request
.context()
.addHandlerFirst(new IdleStateHandler(1, 0, 0))
.addHandlerLast(
new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof IdleStateEvent) {
ctx.close();
} else {
super.userEventTriggered(ctx, evt);
}
}
});
return request.send();
})
.doOnError(e -> logger.error("Error!", e))
.flatMapMany(NettyInbound::receive)
.log(logger.getName(), Level.SEVERE, SignalType.ON_ERROR, SignalType.CANCEL)
.retryWhen(ioError)
.subscribe(
byteBuf -> {
logger.info("Msg: {}", byteBuf);
Flux.just(1,2,3)
.onErrorReturn(8)
.subscribe(
i -> {
throw new RuntimeException(i + " error");
});
});
Thread.sleep(Duration.ofMinutes(5).toMillis())
Reactor Netty version
0.7.7.RELEASE
JVM version (e.g. java -version
)
java version "1.8.0_144" Java(TM) SE Runtime Environment (build 1.8.0_144-b01) Java HotSpot(TM) 64-Bit Server VM (build 25.144-b01, mixed mode)
OS version (e.g. uname -a
)
Darwin 15.6.0 Darwin Kernel Version 15.6.0: Tue Jan 30 11:45:51 PST 2018; root:xnu-3248.73.8~1/RELEASE_X86_64 x86_64
Hey @dantesun , throwing in the subscribe might not work here because there is no more handler downstream to pass the error to (and it will bubble up internally due to CallbackNotImplemented in subscribe). The only thing we can do at least in 0.8 is to pass that error to a generic error consumer assigned to the client, but it won't be passed back in the pipeline. That would look like this
HttpClient client = HttpClient.newConnection()
.tcpConfiguration(tcp -> tcp.runOn(LoopResources.create("my-loop", 2, false))
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000));
client.doOnError(e -> logger.error(e))
.doOnRequest((req, c) -> c.addHandlerFirst(new IdleStateHandler(1, 0, 0))
.addHandlerLast(new ChannelDuplexHandler() {
@Override
public void userEventTriggered(
ChannelHandlerContext ctx,
Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
ctx.close();
}
else {
super.userEventTriggered(ctx, evt);
}
}
}))
.get()
.uri("http://releases.ubuntu.com/16.04.4/ubuntu-16.04.4-desktop-amd64.iso")
.responseContent()
.log(logger.getName())
.retry(IOException.class::isInstance)
.subscribe(byteBuf -> {
logger.info("Msg: {}", byteBuf);
Flux.just(1, 2, 3)
.onErrorReturn(8)
.subscribe(i -> {
throw new RuntimeException(i + " error");
});
});