jetty.project icon indicating copy to clipboard operation
jetty.project copied to clipboard

ContentSourceByteBuffer, ContentSourceString, ChunkAccumulator potential resource leaks

Open scrat98 opened this issue 1 year ago • 1 comments

Jetty version(s) 12.0.8

Enhancement Description ContentSourceByteBuffer, ContentSourceString, ChunkAccumulator, ContentSourceConsumer implementations use provided Promise and call promise.failed/succeeded, but don't take into the account that user defined callback may fail and chunk/source won't be released/failed in such case.

if (Content.Chunk.isFailure(chunk))
{
	promise.failed(chunk.getFailure()); // throw exception
	if (!chunk.isLast())
	    source.fail(chunk.getFailure());
	return;
}
// some action that throws exception
chunk.release();

The good reference implementation from my point of view is ContentCopier, that's for succeeded and failed events always release resources

scrat98 avatar May 06 '24 15:05 scrat98

@scrat98 again note #11598 which deprecates ChunkAccumulator. It is still draft status, but we are working on improving these areas of the code.

gregw avatar May 07 '24 03:05 gregw

@gregw I had reviewed that issue briefly, but as far as I understood the main goal to simplify and unify RBB public API. But current issue is intended to show that there are multiple implementations that do the same "chunk iterating loop"(start-demand-read-process chunk-repeat) and have the same issue about potential resource leaks.

This issue relates to the https://github.com/jetty/jetty.project/issues/11758. The suggestion was to use the same "ChunkIterator" for all "asXXX"(asString/asByteArray and so on) functions. ChunkIterator could be current Flow.Publisher implementation for example. To show the basic idea:

fun asString(source: Content.Source): CompletableFuture<String> {
  val buffer = StringBuilder()
  val callback = Promise.Completable<String>()
  iterate(source,
    onChunk = { buffer.append(it.byteBuffer.asCharBuffer()) },
    onComplete = {
      val result = buffer.toString()
      callback.succeeded(result)
    },
    onError = {
      callback.failed(it)
    }
  )
  return callback
}

fun iterate(
  source: Content.Source,
  onChunk: Consumer<Content.Chunk>,
  onComplete: Runnable,
  onError: Consumer<Throwable>,
) {
  ChunkIterator(source, onChunk, onComplete, onError).iterate()
}

private data class ChunkIterator(
  private val source: Content.Source,
  private val onChunk: Consumer<Content.Chunk>,
  private val onComplete: Runnable,
  private val onError: Consumer<Throwable>,
) {
  private val started = AtomicBoolean(false)

  fun iterate() {
    if (!started.compareAndSet(false, true)) {
      throw IllegalStateException("Iterator already started")
    }
    processNextChunks()
  }

  private fun processNextChunks() {
    while (true) {
      val chunk = source.read()
      if (chunk == null) {
        source.demand(this::processNextChunks)
        return
      }
      runCatching {
        processNextChunk(chunk)
      }.onSuccess {
        chunk.release()
      }.onFailure {
        chunk.release()
        source.fail(it)
        onError.accept(it)
      }
    }
  }

  private fun processNextChunk(chunk: Content.Chunk) {
    if (Content.Chunk.isFailure(chunk)) {
      throw chunk.failure
    }
    onChunk.accept(chunk)
    if (chunk.isLast) {
      onComplete.run()
    }
  }
}

scrat98 avatar May 07 '24 22:05 scrat98