Batch-Capable Producer Bindings
(Current Batching Pattern) Batch-Accumulator Producer Bindings
Currently, the common batching pattern used by producer bindings is to accumulate messages at the producer binding based on batchSize and batchTimeout configuration and then bulk send to the target system. In a system where the consumer binding is also operating in batch-mode, this could look like the following:
The main challenges with the above are:
- batch coordination between consumer and producer bindings is complicated. Both bindings need to operate on the same batch (aka work-unit) for acknowledgments to work. This gets complex on failure paths.
- configuration for
batchSizeandbatchTimeoutexist at both bindings and need to be consistent. It is hard to reason about the two timeouts which are independent from each others.
(Proposed Enhancement) Batch-Capable Producer Bindings
What if a batch could be delivered to the producer binding as a whole and be handled all at once?
In the above:
- Consumer binding accumulates a work-unit (a batch) based on its
batchSizeandbatchTimeoutconfiguration. It could be a partial batch. - The batch travels through the functions as a whole
- The batch is served to the producer binding as a whole
- The producer binding detects that the message is a batch and processes it as a whole. Once the whole batch is successfully processed, the producer binding returns successfully. If anything goes wrong, an exception is thrown.
Benefits of this design:
- no coordination between consumer and producer bindings required. The batch (or partial batch) that arrives at the producer binding is simply what needs to be processed for that invocation.
- batch related configuration only exists at the consumer binding. The consumer is the component responsible to create a batch (work-unit).
- supporting batching at the producer binding is somewhat simpler with this design:
- Full batch knowledge at every invocation.
- no
batchSizeand nobatchTimeoutto implement
Worth noting that this design does not couple the producer binding to the consumer binding. The producer binding benefits from an already accumulated batch by the consumer binding; however, the producer binding remains completely independent from the consumer binding.
Spring Defined Header For Batched Messages
Batched messages already exist in Spring Cloud Stream. Here is a sample batch message which is produced by the Solace consumer binding:
- The payload is of type ArrayList (in this sample, the batch is of size 3)
- Top level headers are common to the entire batch
- The header with key
solace_scst_batchedHeadersholds actual message headers for every message within the batch.
Batch messages would benefit from a Spring defined header that is not binder specific. Instead of solace_scst_batchedHeaders, a batched message could use scst_batchedHeaders which would be defined by Spring. This header would standardize the batch message format and would indicate to producer bindings that the message is a batch message and that it should be processed as such.
Final Note
Note that the proposed design can be implemented today with no issue, however, it is not a design currently endorsed by Spring. The goal of this issue is to bless this design and provide standards so that it could be implemented in any binders in a consistent manner.
Basically what you are asking (and I agree) is that effectively Consumer batch configuration take precedence over Producer based batch configuration if both present. This makes perfect sense to me. In fact as I am thinking about it. . . the Producer batch configuration would only make sense for cases where you are the source of stream (i.e., Supplier, StreamBridge etc). . . where nothing really coming in (consumed).
You are right, the proposed design removes the need for batch configuration at the producer. With the proposal, binders could provide 2 producer implementations:
IMPL-1: Producer is an accumulator (existing design where a producer has batch-size and batch-timeout config options)
IMPL-2: Producer is batch ready/capable (my apologies, finding a name for it has been challenging :) )
Details regarding IMPL-2:
- for a batch of size
n, the producer is called only once with a single batched message rather thanntimes for each individual message. - producer doesn't need to accumulate because the received batched message is already formed.
- the received batch could be of various sizes. The producer can inspect the batched message and get the size if it needs to.
Binders are free to implement either implementation or both. Producer bindings could
- provide a config option to toggle between IMPL-1 and IMPL-2 or
- dynamically detect whether an incoming message is batched or not by looking for a newly defined Spring header ( potentially named
scst_batchedHeaders). If the header is present, treat the message as a batched message (IMPL-2), if the header is not present, treat the message as an individual message (IMPL-1).
In summary, this is more than giving precedence to consumer binding batch configuration. The proposal attempts to define a new producer binding contract which is complementary to the existing contract. Interestingly, this can technically be implemented by binders today; however, this new contract would need to be defined/documented before implementations can be contributed back.
@carolmorneau I just pushed a branch with the initial commit of a class that defines scst_batchHeaders as well as a builder to build batch Message taking RabbitMq structure as an example. - https://github.com/spring-cloud/spring-cloud-stream/tree/GH-2969
Please take a look and see what is missing
Thank you @olegz A few suggestions:
- Would it make sense to define
scst_batchHeaderswithin BinderHeaders? - This method
public BatchMessageBuilder addHeader(String key, Object value)could potentially be renamed toaddRootHeader(...)or something which differentiates from actual message headers. - Just an idea, but a utility to iterate through each message within a batch message could also be useful
@carolmorneau sorry for late response. Your suggestions seem reasonable so, i should update the branch tomorrow
@carolmorneau please take a look at the updated version and let me know your thoughts
@olegz Have you pushed your commit on the GH-2969 branch? I can't seem to see it https://github.com/spring-cloud/spring-cloud-stream/compare/main...GH-2969
All done, sorry, made some mistake when pushed it originally - https://github.com/spring-cloud/spring-cloud-stream/tree/GH-2969
Thank you @olegz, this looks good to us. This will allow us to standardize on scst_batchedHeaders.
Since this issue is also about getting the proposed design Spring approved, will there eventually be some related documentation updates?
For instance, some examples where a producer binding can receive a whole batched message at once and is capable of processing it in a single invocation.
@carolmorneau Sure, we can eventually update the docs, but we need reference something that is using it and for now our binders do not and I don't see them using it in a forceable future given that the corresponding kafka/rabbit headers are created upstream by the frameworks other then SCST. Perhaps there could be some usage of the utility class even today, but let's see how it goes.
For now I would like to merge it to main so it can be included in the upcoming RC1 Please let me know if you have any objections.
Sounds good, this is good to merge Thank you @olegz
@carolmorneau I am closing it. Feel free to open a new issue if any enhancements are needed