jetty.project
jetty.project copied to clipboard
ContentSourceByteBuffer, ContentSourceString, ChunkAccumulator potential resource leaks
Jetty version(s) 12.0.8
Enhancement Description ContentSourceByteBuffer, ContentSourceString, ChunkAccumulator, ContentSourceConsumer implementations use provided Promise and call promise.failed/succeeded, but don't take into the account that user defined callback may fail and chunk/source won't be released/failed in such case.
if (Content.Chunk.isFailure(chunk))
{
promise.failed(chunk.getFailure()); // throw exception
if (!chunk.isLast())
source.fail(chunk.getFailure());
return;
}
// some action that throws exception
chunk.release();
The good reference implementation from my point of view is ContentCopier, that's for succeeded and failed events always release resources
@scrat98 again note #11598 which deprecates ChunkAccumulator. It is still draft status, but we are working on improving these areas of the code.
@gregw I had reviewed that issue briefly, but as far as I understood the main goal to simplify and unify RBB public API. But current issue is intended to show that there are multiple implementations that do the same "chunk iterating loop"(start-demand-read-process chunk-repeat) and have the same issue about potential resource leaks.
This issue relates to the https://github.com/jetty/jetty.project/issues/11758. The suggestion was to use the same "ChunkIterator" for all "asXXX"(asString/asByteArray and so on) functions. ChunkIterator could be current Flow.Publisher implementation for example. To show the basic idea:
fun asString(source: Content.Source): CompletableFuture<String> {
val buffer = StringBuilder()
val callback = Promise.Completable<String>()
iterate(source,
onChunk = { buffer.append(it.byteBuffer.asCharBuffer()) },
onComplete = {
val result = buffer.toString()
callback.succeeded(result)
},
onError = {
callback.failed(it)
}
)
return callback
}
fun iterate(
source: Content.Source,
onChunk: Consumer<Content.Chunk>,
onComplete: Runnable,
onError: Consumer<Throwable>,
) {
ChunkIterator(source, onChunk, onComplete, onError).iterate()
}
private data class ChunkIterator(
private val source: Content.Source,
private val onChunk: Consumer<Content.Chunk>,
private val onComplete: Runnable,
private val onError: Consumer<Throwable>,
) {
private val started = AtomicBoolean(false)
fun iterate() {
if (!started.compareAndSet(false, true)) {
throw IllegalStateException("Iterator already started")
}
processNextChunks()
}
private fun processNextChunks() {
while (true) {
val chunk = source.read()
if (chunk == null) {
source.demand(this::processNextChunks)
return
}
runCatching {
processNextChunk(chunk)
}.onSuccess {
chunk.release()
}.onFailure {
chunk.release()
source.fail(it)
onError.accept(it)
}
}
}
private fun processNextChunk(chunk: Content.Chunk) {
if (Content.Chunk.isFailure(chunk)) {
throw chunk.failure
}
onChunk.accept(chunk)
if (chunk.isLast) {
onComplete.run()
}
}
}