RxNetty
RxNetty copied to clipboard
rxnetty proxy in 0.4x with binary data...
currently im working on a simple proxy solution with rxnetty (the code is below). If the response is a text all works fine, but if the response is a image the reduce with strings does not work. If i work only with response.writeBytes i have strange issues. How i can work only with ByteBuffer ? or make better solution ?
public class ProxyHandler implements RequestHandler<ByteBuf, ByteBuf> {
private final HttpClient<ByteBuf, ByteBuf> client;
public ProxyHandler(final RuntimeConfiguration runtimeConfiguration) {
this.client = RxNetty.<ByteBuf, ByteBuf>createHttpClient(runtimeConfiguration.getProxyUri(), runtimeConfiguration.getProxyPort());
}
public Observable<Void> handle(final HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
return request.getContent().compose(new RequestExecutor(client, request.getUri(), request.getHttpMethod(), request.getHeaders()))
.doOnNext(clientResponse -> response.setStatus(clientResponse.getStatus()))
.flatMap(HttpClientResponse::getContent)
.map(byteBuf -> byteBuf.toString(Charset.defaultCharset()))
.reduce((s1, s2) -> s1 + s2)
.flatMap(response::writeStringAndFlush);
}
class RequestExecutor implements Observable.Transformer<ByteBuf, HttpClientResponse<ByteBuf>> {
private final HttpClient<ByteBuf, ByteBuf> client;
private final String uri;
private final HttpMethod httpMethod;
private final HttpRequestHeaders headers;
public RequestExecutor(final HttpClient<ByteBuf, ByteBuf> client,
final String uri,
final HttpMethod httpMethod, final HttpRequestHeaders headers) {
this.client = client;
this.uri = uri;
this.httpMethod = httpMethod;
this.headers = headers;
}
@Override
public Observable<HttpClientResponse<ByteBuf>> call(Observable<ByteBuf> byteBufferPayload) {
return byteBufferPayload.map(buf -> buf.toString(Charset.defaultCharset()))
.reduce((s1, s2) -> s1 + s2)
.flatMap(payload -> {
final HttpClientRequest<ByteBuf> request =
HttpClientRequest.<ByteBuf>create(httpMethod, uri)
.withContent(payload);
Observable.from(this.headers.entries())
.forEach(entry -> request.getHeaders().add(entry.getKey(), entry.getValue()));
return client.submit(request);
});
}
}
}
i have a solution , not really perfekt. i copy the ByteBuffer to simple byte[] array and after that i concat the bytes. Any better solution ?`
public Observable<Void> handle(final HttpServerRequest<ByteBuf> request, final HttpServerResponse<ByteBuf> response) {
return request.getContent().compose(new RequestExecutor(client, request.getUri(), request.getHttpMethod(), request.getHeaders()))
.doOnNext(clientResponse -> response.setStatus(clientResponse.getStatus()))
.doOnNext(clientResponse -> clientResponse.getHeaders().names().forEach(name -> response.getHeaders().add(name,clientResponse.getHeaders().get(name))))
.flatMap(HttpClientResponse::getContent)
.flatMap(buffer -> Observable.just(bufferToBytes(buffer)))
.reduce((byte1, byte2) -> Bytes.concat(byte1,byte2))
.flatMap(response::writeBytesAndFlush);
}
private static byte[] bufferToBytes(ByteBuf buffer) {
byte[] plainBytes = new byte[buffer.capacity()];
for (int i = 0; i < buffer.capacity(); i++) {
plainBytes[i] = buffer.getByte(i);
}
return plainBytes;
}
`