RxNetty icon indicating copy to clipboard operation
RxNetty copied to clipboard

What could cause the RxNetty client to return an empty stream ?

Open Crystark opened this issue 9 years ago • 8 comments

I'm using a fairly simple RxNetty client:

    public Service(String host, int port, int maxConnection) {
        endpoint = RxNetty
            .<ByteBuf, ByteBuf> newHttpClientBuilder(host, port)
            .withMaxConnections(maxConnection)
            .build();
    }

    public Observable<Response> submit(Observable<Request> o) {
        return o
            .flatMap(br -> endpoint.submit(HttpClientRequest.createPost(br.uri()).withContent(br.bytes())))
            .flatMap(r -> r.getContent()
                .doOnNext(bb -> bb.retain().touch("ByteBuf from proxy response"))
                .switchIfEmpty(Observable.just(null))
                .map(c -> new Response(r.getStatus(), c, r.getHeaders().entries()))
            );
    }

The thing is, I often get an empty observable from endpoint.submit and i'm not sure what could be causing that. I expect it to always return a HttpClientResponse<ByteBuf>.

What may cause it to return an empty stream ?

Note: I checked and my input Observable (o) is never empty.

Crystark avatar Jun 01 '16 16:06 Crystark

I expect it to always return a HttpClientResponse<ByteBuf>

Yes, HttpClient.submit() should either give a response or error, but it should not be empty. Is this reproducible?

NiteshKant avatar Jun 01 '16 20:06 NiteshKant

No, I haven't been able to reproduce it yet, only observe it in production. It isn't recent. I've tried switching to previous RxNetty version and still have the same behavior.

Crystark avatar Jun 02 '16 07:06 Crystark

I currently fixed it using switchIfEmpty(DEFAULT_RESPONSE.doOnNext(r -> metricFallback.increment())) so I can track how often it happens. Visually, we oberved approximatly 5 of those per second for 1500QPS so that seems rare.

Just so you know, I've changed a bit the code to make sure:

    public Observable<Response> submit(Observable<Request> o) {
        return o
            .switchIfEmpty(Observable.<Request> empty().doOnCompleted(() -> L.warn("BEFORE")))
            .flatMap(br -> endpoint.submit(HttpClientRequest.createPost(br.uri()).withContent(br.bytes()))
                .switchIfEmpty(Observable.<HttpClientResponse<ByteBuf>> empty().doOnCompleted(() -> L.warn("INSIDE"))))
            .switchIfEmpty(Observable.<HttpClientResponse<ByteBuf>> empty().doOnCompleted(() -> L.warn("AFTER")))
            .flatMap(r -> r.getContent()
                .doOnNext(bb -> bb.retain().touch("ByteBuf from proxy response"))
                .switchIfEmpty(Observable.just(null))
                .map(c -> new Response(r.getStatus(), c, r.getHeaders().entries()))
            );
    }

This always shows

INSIDE
AFTER
INSIDE
AFTER
INSIDE
AFTER

Crystark avatar Jun 02 '16 08:06 Crystark

Also FYI, this is the same RxNetty client that has the pool exhausted problem (#503)

Crystark avatar Jun 02 '16 08:06 Crystark

I haven't had time to spend on this matter lately, but i just thought i'd update this thread with my observed metrics. I get those empty observable about 0.1% of the time (50 for 50k QPS).

Crystark avatar Jul 20 '16 15:07 Crystark

We have changed our http client to use rxnetty. Intermittently we get Observable.empty response. Do you still see this?

guptaabhiishek avatar Dec 12 '18 16:12 guptaabhiishek

Hi Abhishek, two questions :

Do you have confirmation the server is sending bytes? Do you have reproduction code?

jamesgorman2 avatar Dec 12 '18 22:12 jamesgorman2

Hi Abhishek, two questions : Do you have confirmation the server is sending bytes? Do you have reproduction code? I am afraid I cann't share code As this is against or company policy. This behavior is only when I run tests in CI(jenkins) then randomly client Observable returns empty however server is still processing the request which i could see in the server log. I cann't reproduce on my local as its totally random on any of our REST api call.

guptaabhiishek avatar Dec 13 '18 00:12 guptaabhiishek