RxNetty
RxNetty copied to clipboard
RXNetty 0.4 proxy redirect issue
Hi @NiteshKant,
i found a issue in my proxy handler if the backend send redirect 302 to the client.Have you a workaround or fix for me ?
Thanks, Arek
This is my code:
import com.google.common.primitives.Bytes;
import de.rewe.digital.catalan.domain.RuntimeConfiguration;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.client.RxClient;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import rx.Observable;
import java.util.concurrent.TimeUnit;
import static io.netty.handler.codec.http.HttpMethod.GET;
public class ProxyHandler implements RequestHandler<ByteBuf, ByteBuf> {
private final HttpClient<ByteBuf, ByteBuf> client;
private final RxClient.ClientConfig clientConfig;
public ProxyHandler(final RuntimeConfiguration runtimeConfiguration) {
this.client = RxNetty.<ByteBuf, ByteBuf>createHttpClient("http://httpbin.org", 80);
this.clientConfig = new HttpClient.HttpClientConfig.Builder().setFollowRedirect(false)
.responseSubscriptionTimeout(1, TimeUnit.SECONDS)
.readTimeout(120, TimeUnit.SECONDS).build();
}
public Observable<Void> handle(final HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
return request.getContent().compose(new RequestExecutor(client, request))
.doOnNext(clientResponse -> response.setStatus(clientResponse.getStatus()))
.doOnNext(clientResponse -> clientResponse.getHeaders().names().forEach(name -> response.getHeaders().add(name, clientResponse.getHeaders().get(name))))
.flatMap(HttpClientResponse::getContent)
.map(ProxyHandler::bufferToBytes)
.reduce((byte1, byte2) -> Bytes.concat(byte1, byte2))
.flatMap(response::writeBytesAndFlush);
}
class RequestExecutor implements Observable.Transformer<ByteBuf, HttpClientResponse<ByteBuf>> {
private final HttpClient<ByteBuf, ByteBuf> client;
private final HttpServerRequest<ByteBuf> request;
RequestExecutor(final HttpClient<ByteBuf, ByteBuf> client,
final HttpServerRequest<ByteBuf> request) {
this.client = client;
this.request = request;
}
@Override
public Observable<HttpClientResponse<ByteBuf>> call(Observable<ByteBuf> byteBufferPayload) {
return byteBufferPayload
.map(buffer -> bufferToBytes(buffer))
.reduce((byte1, byte2) -> Bytes.concat(byte1, byte2))
.flatMap(payload -> {
final HttpClientRequest<ByteBuf> clientRequest = HttpClientRequest.create(request.getHttpMethod(), request.getUri());
if ((payload.length > 0) && (request.getHttpMethod() != GET)) {
clientRequest.withContent(payload);
}
Observable.from(request.getHeaders().entries())
.forEach(entry -> clientRequest.getHeaders().add(entry.getKey(), entry.getValue()));
return client.submit(clientRequest, clientConfig);
});
}
}
private static byte[] bufferToBytes(ByteBuf buffer) {
if (buffer.hasArray()) {
return buffer.array();
}
byte[] plainBytes = new byte[buffer.capacity()];
for (int i = 0; i < buffer.capacity(); i++) {
plainBytes[i] = buffer.getByte(i);
}
return plainBytes;
}
}
And this is the Exception:
java.util.NoSuchElementException: Sequence contains no elements.
.at rx.internal.operators.OperatorSingle$ParentSubscriber.onCompleted(OperatorSingle.java:131).
.at rx.internal.operators.OperatorTakeLastOne$ParentSubscriber.onCompleted(OperatorTakeLastOne.java:107).
.at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:124)..at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:635).
.at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:547)..at rx.internal.operators.OperatorMerge$MergeSubscriber.onCompleted(OperatorMerge.java:268).
.at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43).
.at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:635).
.at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:547).
.at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:836).
.at io.reactivex.netty.protocol.http.UnicastContentSubject$AutoReleaseByteBufOperator$1.onCompleted(UnicastContentSubject.java:251).
.at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140).
.at rx.internal.operators.BufferUntilSubscriber.emit(BufferUntilSubscriber.java:145).
.at rx.internal.operators.BufferUntilSubscriber.onCompleted(BufferUntilSubscriber. java:158).
.at io.reactivex.netty.protocol.http.UnicastContentSubject.onCompleted(UnicastContentSubject.java:273).
.at io.reactivex.netty.protocol.http.client.ClientRequestResponseConverter$ResponseState.sendOnComplete(ClientRequestResponseConverter.java:382).
.at io.reactivex.netty.protocol.http.client.ClientRequestResponseConverter$ResponseState.access$500(ClientRequestResponseConverter.java:339).
.at io.reactivex.netty.protocol.http.client.ClientRequestResponseConverter.channelRead(ClientRequestResponseConverter.java:157).
.at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83).
.at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:163).
.at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:155).
.at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103).
.at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83).
.at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:163).
.at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext. java:155).
.at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:276).
.at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263).
.at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83).
.at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:163).
.at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:155).
.at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233).
.at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil. java:83).
.at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:163).
.at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:155).
.at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86).
.at io.reactivex.netty.metrics.BytesInspector.channelRead(BytesInspector.java:59).
.at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83).
.at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:163).
.at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:155).
.at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:950).
.at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:125).
.at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:510).
.at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:467).
.at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:381).
.at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353).
.at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742).
.at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137).
.at java.lang.Thread.run(Thread.java:745).
this solved my problem:
public Observable<Void> handle(final HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
return request.getContent().compose(new RequestExecutor(client, request))
.doOnNext(clientResponse -> response.setStatus(clientResponse.getStatus()))
.doOnNext(clientResponse -> clientResponse.getHeaders().names().forEach(name -> response.getHeaders().add(name, clientResponse.getHeaders().get(name))))
.flatMap(ProxyHandler::handleContent)
.flatMap(buffer -> Observable.just(ByteBufUtil.bufferToBytes(buffer)))
.reduce((byte1, byte2) -> Bytes.concat(byte1, byte2))
.flatMap(response::writeBytesAndFlush);
}
private static Observable<? extends ByteBuf> handleContent(final HttpClientResponse<ByteBuf> clientResponse) {
if (clientResponse.getStatus().code() > 300) {
return Observable.just(UnpooledByteBufAllocator.DEFAULT.buffer());
}
return clientResponse.getContent();
}