SyliusResourceBundle icon indicating copy to clipboard operation
SyliusResourceBundle copied to clipboard

[client] Don't close connections when HttpObjectAggregator is on the pipeline

Open alexander-yakushev opened this issue 5 years ago • 4 comments

This is a fix to #393 by @rborer. (s/on-closed s #(d/success! c true)) means that the request was completed early? which leads to a connection being disposed from the pool rather than released back to it (this). I guess, this is not intentional and just copy-n-paste error. We also don't need to create a stream because the aggregator already waited for the whole response and aggregated it for us.

alexander-yakushev avatar Feb 11 '19 07:02 alexander-yakushev

Is there a reason to change that?

At the moment, if the user places an HttpObjectAggregator in the pipeline and configures the client to :raw-stream? true (motivation notwithstanding, see below), the response body stream would never close and the actual body content would be silently dropped. So I'd say that would be bug in Aleph, especially given that HttpObjectAggregator is a core Netty feature which is explicitly recommended in the docs.

The primary reasons to use a raw stream are to 1) minimize copying

Indeed, but this is still given even when an HttpObjectAggregator is present: The non-raw (cooked!) client-handler would make yet another copy of the bytebuf which has already been accumulated by the aggregator.

and 2) process bodies asynchronously, right?

I would argue that this aspect is orthogonal, i.e. you could just as well process the bodies asynchronously (and incrementally) when using the cooked handler. The essential difference is the reduced amount of copying and thus the responsibility for releasing the buffers.

In what scenario would we want to use raw streams and the HttpObjectAggregator?

My usecase on the server-side is to use the aggregator as a means to enforce max body lengths as early as possible. Of course, this only works for non-streaming uses, i.e. where the max body length is reasonably low (which in my case it is). Doing this properly on Aleph's layer is pretty involved. It also adds some convenience downstream to receive the full body as an aggregated buffer then.


UPDATE: Sorry, I accidentally hit a shortcut which prematurely submitted the comment so I didn't yet proofread everything :grimacing: (OK, done now)

DerGuteMoritz avatar Aug 09 '22 11:08 DerGuteMoritz

@DerGuteMoritz Well, supporting max length correctly alone seems sufficient justification.

this is still given even when an HttpObjectAggregator is present

Yes, that's my point. If you're using raw streams to minimize copying overhead, using an HttpObjectAggregator adds its own copying overhead, working against that goal, right?

I would argue that this aspect is orthogonal, i.e. you could just as well process the bodies asynchronously (and incrementally) when using the cooked handler

Well, this isn't 100% true. The raw handler gives you a manifold stream of byte buffers, but the regular handler gives you a java.io.InputStream. It's awkward, but doable, to read an InputStream that's being written to asynchronously (since you have to call .available() and implement your own backpressure if you've reached the end, but know there's more coming).

It's the "incremental" part that's currently impossible with the regular handler. It gives you a java.io.InputStream wrapping a Manifold stream, but only when the transfer encoding is chunked. When it's not chunked, Aleph just keeps collecting the body data up to the buffer capacity before sending any of it to the user's handler.

So, they're not totally orthogonal. (But maybe they should be? ehh)

Another consideration: if you're desperate for speed, the aggregator forces you to wait until the whole message is ready. Imagine, say, you received 100 streamed JSON objects grouped in 1 HTTP message. Or a huge XML doc in the body that you want to parse incrementally with SAX. You may want to act ASAP on something you find early in the data, and not be forced to wait.

Anyway...

Other than max length restrictions, I can't think of a good reason a user would want to combine the two. We may want to add some docs warning people that setting a max length restriction may change the performance characteristics of raw streams.

KingMob avatar Aug 09 '22 14:08 KingMob

Yes, that's my point. If you're using raw streams to minimize copying overhead, using an HttpObjectAggregator adds its own copying overhead, working against that goal, right?

Are you sure about this? I would expect the HttpObjectAggregator to use CompositeByteBuf under the hood to avoid copying. I haven't dived into the rabbit hole, but I would expect something along those lines [1]

[1] : https://github.com/netty/netty/blob/4.1/codec/src/main/java/io/netty/handler/codec/MessageAggregator.java#L274-L323

arnaudgeiser avatar Aug 09 '22 20:08 arnaudgeiser

I would argue that this aspect is orthogonal, i.e. you could just as well process the bodies asynchronously (and incrementally) when using the cooked handler

I tend to see it the same way. Using raw-stream? true you will basically ending up having a manifold.stream with a single ByteBuf on it where the responsibility to release it belongs to you (most likely on the direct memory unless specified). On the other case, you will end up having your InputStream with the full content on it, allocated on the heap.

To me, that's the significant distinction here.

arnaudgeiser avatar Aug 09 '22 20:08 arnaudgeiser

Are you sure about this? I would expect the HttpObjectAggregator to use CompositeByteBuf under the hood to avoid copying.

@arnaudgeiser I took a closer look. You're right, it will reuse them (up to 1024 ByteBufs), so the only real overhead would be whatever other bookkeeping it's doing.

Using raw-stream? true you will basically ending up having a manifold.stream with a single ByteBuf

A single ByteBuf? Won't we get one for each call to channelRead? We might just get one with HttpObjectAggregator, but would it be true in general, especially for large messages and/or slow connections?


It turns out I'm wrong about "It's awkward, but doable, to read an InputStream that's being written to asynchronously (since you have to call .available() and implement your own backpressure if you've reached the end, but know there's more coming)."

While the general contract of InputStream.read() can be ambiguous as to what a -1 return val means (e.g., a FileInputStream.read() == -1 on a growing file can't distinguish between "temporarily caught up to writer" and "we're done for good"), I took a closer look at byte-streams's pushback byte stream, and its .read() will block until either data is available from the manifold stream, or it's closed, at which point, it returns -1.


I'm still not seeing why you guys think the stream type is orthogonal to processing asynchronously/incrementally, though. I guess the cooked handler running in a different thread is a form of async, even with the synchronous java.io.InputStream API, but I was thinking more along the lines of Manifold processing of the body.

And incremental processing is only possible with chunked encoding for the cooked handler; otherwise, Aleph buffers it all up until the message is done. (Hell, since Aleph waits for the full message, theoretically, we could dynamically insert HttpObjectAggregator into the pipeline for non-chunked requests and delete a third of the code in aleph.http.server/ring-handler with identical output).

KingMob avatar Aug 10 '22 09:08 KingMob

Using raw-stream? true you will basically ending up having a manifold.stream with a single ByteBuf

A single ByteBuf? Won't we get one for each call to channelRead? We might just get one with HttpObjectAggregator, but would it be true in general, especially for large messages and/or slow connections?

AFAIUI @arnaudgeiser was specifically referring to the case when an HttpObjectAggregator is present in the pipeline. This is also what I was referring to when I wrote: "It also adds some convenience downstream to receive the full body as an aggregated buffer then."

I'm still not seeing why you guys think the stream type is orthogonal to processing asynchronously/incrementally, though. I guess the cooked handler running in a different thread is a form of async, even with the synchronous java.io.InputStream API, but I was thinking more along the lines of Manifold processing of the body.

Note that the client (which this PR is about) returns a Manifold stream in both modes! A server option to make the cooked handler return a Manifold stream would make sense, too, I'd say. That's why I consider it orthogonal, just happens to be complected right now :smile:

(Hell, since Aleph waits for the full message, theoretically, we could dynamically insert HttpObjectAggregator into the pipeline for non-chunked requests and delete a third of the code in aleph.http.server/ring-handler with identical output).

Heh, interesting idea :smile:

DerGuteMoritz avatar Aug 17 '22 14:08 DerGuteMoritz

Side note: I find the word "complect/decomplect" kind of silly. Plenty of better-known alternatives already exist: intertwined, mingled, mixed, entangled, enmeshed, etc. Likewise for the undoing process: disentangle, separate, untangle, unknot, extract, tease apart, etc. I just don't think it adds enough of anything to justify its use.

If we're going for silliness, I vastly prefer Ted Nelson's prior art term from Project Xanadu: "intertwingle". 😜

KingMob avatar Aug 18 '22 06:08 KingMob