streamly
streamly copied to clipboard
Reserve outputQueue buffer before executing an action
Currently we do not reserve the buffer in outputQueue before executing an action. Instead we execute the action and yield the result to the outputQueue, at the time of yielding if we find that the we are above maxBuffer we stop the worker. This is not a precise mechanism, as we could go way above the maxBuffer threshold. To make it precise we need to reserve the buffer before we execute an action.
We already have an implementation of this sort in the form of maxYields implementation. it reserves a yield in the outputQueue before executing an action. We need to do the same for maxBuffer. We will have to make changes at exactly the same places. In fact we can abstract this mechanism, therefore maxYields and maxBuffer both can be implemented under same abstractions, using common hooks.
This covers #98, if we implement buffer reservation that issue won't arise.
This may lead to a performance overhead and we have maxBuffer limit enabled by default. This will not affect the unlimited buffer case though. One way to mitigate performance overhead could be to reserve the buffer in batches depending on the throughput of the worker, this could be statically configurable as well so that we do not incur the overhead of measuring it. Maybe we can have a config option of buffer pre reservation or post check, but it may not be worth it?
If we end up not implementing this due to perf overhead then we would have to fix #98.
I am going to start working towards fixing this as this could lead to hogging of memory by free running workers. The bad part is that the GHC memory allocator never returns memory to the system, so once we even temporarily consume a lot of memory it never goes back.
@harendra-kumar Has this been fixed? Can this issue be closed now?
I think this has only been implemented for Parallel streams. We still need to do it for async/wAsync/ahead stream as well. See the usage of decrementBufferLimit and incrementBufferLimit, those are the primitives that are used for buffer reservation.