Update batching mechanisms to allow upper bounded policy
Ran into this while testing some of our scenarios. We push a lot of data of various sizes up to 100k through kafka. The data comes in "chunks" which we split into smaller pieces which are then batched up and send via http_client. The HTTP endpoint we are sending data to has a maximum payload limit ~500k. We need to make sure that we are efficient (less round trips, so we try to send as much data as possible) so we try to batch up as much as we can without going over the limit.
Using benthos it makes it a lot more challenging. Looking at this code:
https://github.com/benthosdev/benthos/blob/main/internal/component/output/batcher/batcher.go#L109
It looks like it should be flushing multiple times for a single batch, but all it does is set a flag to flush at the end of the transaction. This is problematic for us since we keep hitting our payload limit(s).