RxNetty icon indicating copy to clipboard operation
RxNetty copied to clipboard

rxnetty proxy in 0.4x with binary data...

Open ArekCzarnik opened this issue 8 years ago • 1 comments

currently im working on a simple proxy solution with rxnetty (the code is below). If the response is a text all works fine, but if the response is a image the reduce with strings does not work. If i work only with response.writeBytes i have strange issues. How i can work only with ByteBuffer ? or make better solution ?

public class ProxyHandler implements RequestHandler<ByteBuf, ByteBuf> {

    private final HttpClient<ByteBuf, ByteBuf> client;

    public ProxyHandler(final RuntimeConfiguration runtimeConfiguration) {
        this.client = RxNetty.<ByteBuf, ByteBuf>createHttpClient(runtimeConfiguration.getProxyUri(), runtimeConfiguration.getProxyPort());
    }

    public Observable<Void> handle(final HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
        return request.getContent().compose(new RequestExecutor(client, request.getUri(), request.getHttpMethod(), request.getHeaders()))
                .doOnNext(clientResponse -> response.setStatus(clientResponse.getStatus()))
                .flatMap(HttpClientResponse::getContent)
                .map(byteBuf -> byteBuf.toString(Charset.defaultCharset()))
                .reduce((s1, s2) -> s1 + s2)
                .flatMap(response::writeStringAndFlush);
    }


    class RequestExecutor implements Observable.Transformer<ByteBuf, HttpClientResponse<ByteBuf>> {
        private final HttpClient<ByteBuf, ByteBuf> client;
        private final String uri;
        private final HttpMethod httpMethod;
        private final HttpRequestHeaders headers;

        public RequestExecutor(final HttpClient<ByteBuf, ByteBuf> client,
                               final String uri,
                               final HttpMethod httpMethod, final HttpRequestHeaders headers) {
            this.client = client;
            this.uri = uri;
            this.httpMethod = httpMethod;
            this.headers = headers;
        }

        @Override
        public Observable<HttpClientResponse<ByteBuf>> call(Observable<ByteBuf> byteBufferPayload) {
            return byteBufferPayload.map(buf -> buf.toString(Charset.defaultCharset()))
                    .reduce((s1, s2) -> s1 + s2)
                    .flatMap(payload -> {
                        final HttpClientRequest<ByteBuf> request =
                                HttpClientRequest.<ByteBuf>create(httpMethod, uri)
                                        .withContent(payload);
                        Observable.from(this.headers.entries())
                                .forEach(entry -> request.getHeaders().add(entry.getKey(), entry.getValue()));
                        return client.submit(request);
                    });
        }

    }
}

ArekCzarnik avatar Mar 14 '16 20:03 ArekCzarnik

i have a solution , not really perfekt. i copy the ByteBuffer to simple byte[] array and after that i concat the bytes. Any better solution ?`

public Observable<Void> handle(final HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
    return request.getContent().compose(new RequestExecutor(client, request.getUri(), request.getHttpMethod(), request.getHeaders()))
            .doOnNext(clientResponse -> response.setStatus(clientResponse.getStatus()))
            .doOnNext(clientResponse -> clientResponse.getHeaders().names().forEach(name -> response.getHeaders().add(name,clientResponse.getHeaders().get(name))))
            .flatMap(HttpClientResponse::getContent)
            .flatMap(buffer -> Observable.just(bufferToBytes(buffer)))
            .reduce((byte1, byte2) -> Bytes.concat(byte1,byte2))
            .flatMap(response::writeBytesAndFlush);
}

private  static byte[] bufferToBytes(ByteBuf buffer) {
    byte[] plainBytes = new byte[buffer.capacity()];
    for (int i = 0; i < buffer.capacity(); i++) {
        plainBytes[i] = buffer.getByte(i);
    }
    return plainBytes;
}

`

ArekCzarnik avatar Mar 15 '16 10:03 ArekCzarnik