RxNetty icon indicating copy to clipboard operation
RxNetty copied to clipboard

RXNetty 0.4 proxy redirect issue

Open ArekCzarnik opened this issue 8 years ago • 1 comments

Hi @NiteshKant,

i found a issue in my proxy handler if the backend send redirect 302 to the client.Have you a workaround or fix for me ?

Thanks, Arek

This is my code:


import com.google.common.primitives.Bytes;
import de.rewe.digital.catalan.domain.RuntimeConfiguration;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.client.RxClient;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import rx.Observable;
import java.util.concurrent.TimeUnit;

import static io.netty.handler.codec.http.HttpMethod.GET;

public class ProxyHandler implements RequestHandler<ByteBuf, ByteBuf> {

    private final HttpClient<ByteBuf, ByteBuf> client;
    private final RxClient.ClientConfig clientConfig;

    public ProxyHandler(final RuntimeConfiguration runtimeConfiguration) {
        this.client = RxNetty.<ByteBuf, ByteBuf>createHttpClient("http://httpbin.org", 80);
        this.clientConfig = new HttpClient.HttpClientConfig.Builder().setFollowRedirect(false)
                .responseSubscriptionTimeout(1, TimeUnit.SECONDS)
                .readTimeout(120, TimeUnit.SECONDS).build();
    }

    public Observable<Void> handle(final HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
        return request.getContent().compose(new RequestExecutor(client, request))
                .doOnNext(clientResponse -> response.setStatus(clientResponse.getStatus()))
                .doOnNext(clientResponse -> clientResponse.getHeaders().names().forEach(name -> response.getHeaders().add(name, clientResponse.getHeaders().get(name))))
                .flatMap(HttpClientResponse::getContent)
                .map(ProxyHandler::bufferToBytes)
                .reduce((byte1, byte2) -> Bytes.concat(byte1, byte2))
                .flatMap(response::writeBytesAndFlush);
    }


    class RequestExecutor implements Observable.Transformer<ByteBuf, HttpClientResponse<ByteBuf>> {
        private final HttpClient<ByteBuf, ByteBuf> client;
        private final HttpServerRequest<ByteBuf> request;

        RequestExecutor(final HttpClient<ByteBuf, ByteBuf> client,
                               final HttpServerRequest<ByteBuf> request) {
            this.client = client;
            this.request = request;
        }

        @Override
        public Observable<HttpClientResponse<ByteBuf>> call(Observable<ByteBuf> byteBufferPayload) {
            return byteBufferPayload
                    .map(buffer -> bufferToBytes(buffer))
                    .reduce((byte1, byte2) -> Bytes.concat(byte1, byte2))
                    .flatMap(payload -> {
                        final HttpClientRequest<ByteBuf> clientRequest = HttpClientRequest.create(request.getHttpMethod(), request.getUri());
                        if ((payload.length > 0) && (request.getHttpMethod() != GET)) {
                            clientRequest.withContent(payload);
                        }
                        Observable.from(request.getHeaders().entries())
                                .forEach(entry -> clientRequest.getHeaders().add(entry.getKey(), entry.getValue()));
                        return client.submit(clientRequest, clientConfig);
                    });
        }
    }

    private static byte[] bufferToBytes(ByteBuf buffer) {
        if (buffer.hasArray()) {
            return buffer.array();
        }
        byte[] plainBytes = new byte[buffer.capacity()];
        for (int i = 0; i < buffer.capacity(); i++) {
            plainBytes[i] = buffer.getByte(i);
        }
        return plainBytes;
    }

}

And this is the Exception:

java.util.NoSuchElementException: Sequence contains no elements.
    .at rx.internal.operators.OperatorSingle$ParentSubscriber.onCompleted(OperatorSingle.java:131).
    .at rx.internal.operators.OperatorTakeLastOne$ParentSubscriber.onCompleted(OperatorTakeLastOne.java:107).
    .at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:124)..at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:635).
    .at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:547)..at rx.internal.operators.OperatorMerge$MergeSubscriber.onCompleted(OperatorMerge.java:268).
    .at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43).
    .at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:635).
    .at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:547).
    .at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:836).
    .at io.reactivex.netty.protocol.http.UnicastContentSubject$AutoReleaseByteBufOperator$1.onCompleted(UnicastContentSubject.java:251).
    .at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140).
    .at rx.internal.operators.BufferUntilSubscriber.emit(BufferUntilSubscriber.java:145).
    .at rx.internal.operators.BufferUntilSubscriber.onCompleted(BufferUntilSubscriber. java:158).
    .at io.reactivex.netty.protocol.http.UnicastContentSubject.onCompleted(UnicastContentSubject.java:273).
    .at io.reactivex.netty.protocol.http.client.ClientRequestResponseConverter$ResponseState.sendOnComplete(ClientRequestResponseConverter.java:382).
    .at io.reactivex.netty.protocol.http.client.ClientRequestResponseConverter$ResponseState.access$500(ClientRequestResponseConverter.java:339).
    .at io.reactivex.netty.protocol.http.client.ClientRequestResponseConverter.channelRead(ClientRequestResponseConverter.java:157).
    .at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83).
    .at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:163).
    .at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:155).
    .at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103).
    .at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83).
    .at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:163).
    .at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext. java:155).
    .at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:276).
    .at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263).
    .at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83).
    .at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:163).
    .at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:155).
    .at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233).
    .at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil. java:83).
    .at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:163).
    .at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:155).
    .at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86).
    .at io.reactivex.netty.metrics.BytesInspector.channelRead(BytesInspector.java:59).
    .at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:83).
    .at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:163).
    .at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:155).
    .at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:950).
    .at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:125).
    .at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:510).
    .at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:467).
    .at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:381).
    .at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353).
    .at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742).
    .at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137).
    .at java.lang.Thread.run(Thread.java:745).

ArekCzarnik avatar Apr 12 '16 14:04 ArekCzarnik

this solved my problem:


    public Observable<Void> handle(final HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
        return request.getContent().compose(new RequestExecutor(client, request))
                .doOnNext(clientResponse -> response.setStatus(clientResponse.getStatus()))
                .doOnNext(clientResponse -> clientResponse.getHeaders().names().forEach(name -> response.getHeaders().add(name, clientResponse.getHeaders().get(name))))
                .flatMap(ProxyHandler::handleContent)
                .flatMap(buffer -> Observable.just(ByteBufUtil.bufferToBytes(buffer)))
                .reduce((byte1, byte2) -> Bytes.concat(byte1, byte2))
                .flatMap(response::writeBytesAndFlush);
    }

private static Observable<? extends ByteBuf> handleContent(final HttpClientResponse<ByteBuf> clientResponse) {
        if (clientResponse.getStatus().code() > 300) {
            return Observable.just(UnpooledByteBufAllocator.DEFAULT.buffer());
        }
        return clientResponse.getContent();
    }

ArekCzarnik avatar Apr 18 '16 06:04 ArekCzarnik