spring-cloud-stream icon indicating copy to clipboard operation
spring-cloud-stream copied to clipboard

Batch-Capable Producer Bindings

Open carolmorneau opened this issue 1 year ago • 4 comments

(Current Batching Pattern) Batch-Accumulator Producer Bindings

Currently, the common batching pattern used by producer bindings is to accumulate messages at the producer binding based on batchSize and batchTimeout configuration and then bulk send to the target system. In a system where the consumer binding is also operating in batch-mode, this could look like the following:

image

The main challenges with the above are:

  • batch coordination between consumer and producer bindings is complicated. Both bindings need to operate on the same batch (aka work-unit) for acknowledgments to work. This gets complex on failure paths.
  • configuration for batchSize and batchTimeout exist at both bindings and need to be consistent. It is hard to reason about the two timeouts which are independent from each others.

(Proposed Enhancement) Batch-Capable Producer Bindings

What if a batch could be delivered to the producer binding as a whole and be handled all at once?

image

In the above:

  1. Consumer binding accumulates a work-unit (a batch) based on its batchSize and batchTimeout configuration. It could be a partial batch.
  2. The batch travels through the functions as a whole
  3. The batch is served to the producer binding as a whole
  4. The producer binding detects that the message is a batch and processes it as a whole. Once the whole batch is successfully processed, the producer binding returns successfully. If anything goes wrong, an exception is thrown.

Benefits of this design:

  • no coordination between consumer and producer bindings required. The batch (or partial batch) that arrives at the producer binding is simply what needs to be processed for that invocation.
  • batch related configuration only exists at the consumer binding. The consumer is the component responsible to create a batch (work-unit).
  • supporting batching at the producer binding is somewhat simpler with this design:
    • Full batch knowledge at every invocation.
    • no batchSize and no batchTimeout to implement

Worth noting that this design does not couple the producer binding to the consumer binding. The producer binding benefits from an already accumulated batch by the consumer binding; however, the producer binding remains completely independent from the consumer binding.

Spring Defined Header For Batched Messages

Batched messages already exist in Spring Cloud Stream. Here is a sample batch message which is produced by the Solace consumer binding:

image
  • The payload is of type ArrayList (in this sample, the batch is of size 3)
  • Top level headers are common to the entire batch
  • The header with key solace_scst_batchedHeaders holds actual message headers for every message within the batch.

Batch messages would benefit from a Spring defined header that is not binder specific. Instead of solace_scst_batchedHeaders, a batched message could use scst_batchedHeaders which would be defined by Spring. This header would standardize the batch message format and would indicate to producer bindings that the message is a batch message and that it should be processed as such.

Final Note

Note that the proposed design can be implemented today with no issue, however, it is not a design currently endorsed by Spring. The goal of this issue is to bless this design and provide standards so that it could be implemented in any binders in a consistent manner.

carolmorneau avatar Jul 03 '24 19:07 carolmorneau

Basically what you are asking (and I agree) is that effectively Consumer batch configuration take precedence over Producer based batch configuration if both present. This makes perfect sense to me. In fact as I am thinking about it. . . the Producer batch configuration would only make sense for cases where you are the source of stream (i.e., Supplier, StreamBridge etc). . . where nothing really coming in (consumed).

olegz avatar Oct 08 '24 06:10 olegz

You are right, the proposed design removes the need for batch configuration at the producer. With the proposal, binders could provide 2 producer implementations:

IMPL-1: Producer is an accumulator (existing design where a producer has batch-size and batch-timeout config options) IMPL-2: Producer is batch ready/capable (my apologies, finding a name for it has been challenging :) )

Details regarding IMPL-2:

  • for a batch of size n, the producer is called only once with a single batched message rather than n times for each individual message.
  • producer doesn't need to accumulate because the received batched message is already formed.
  • the received batch could be of various sizes. The producer can inspect the batched message and get the size if it needs to.

Binders are free to implement either implementation or both. Producer bindings could

  • provide a config option to toggle between IMPL-1 and IMPL-2 or
  • dynamically detect whether an incoming message is batched or not by looking for a newly defined Spring header ( potentially named scst_batchedHeaders). If the header is present, treat the message as a batched message (IMPL-2), if the header is not present, treat the message as an individual message (IMPL-1).

In summary, this is more than giving precedence to consumer binding batch configuration. The proposal attempts to define a new producer binding contract which is complementary to the existing contract. Interestingly, this can technically be implemented by binders today; however, this new contract would need to be defined/documented before implementations can be contributed back.

carolmorneau avatar Oct 08 '24 14:10 carolmorneau

@carolmorneau I just pushed a branch with the initial commit of a class that defines scst_batchHeaders as well as a builder to build batch Message taking RabbitMq structure as an example. - https://github.com/spring-cloud/spring-cloud-stream/tree/GH-2969 Please take a look and see what is missing

olegz avatar Oct 15 '24 14:10 olegz

Thank you @olegz A few suggestions:

  • Would it make sense to define scst_batchHeaders within BinderHeaders?
  • This method public BatchMessageBuilder addHeader(String key, Object value) could potentially be renamed to addRootHeader(...) or something which differentiates from actual message headers.
  • Just an idea, but a utility to iterate through each message within a batch message could also be useful

carolmorneau avatar Oct 22 '24 18:10 carolmorneau

@carolmorneau sorry for late response. Your suggestions seem reasonable so, i should update the branch tomorrow

olegz avatar Oct 30 '24 18:10 olegz

@carolmorneau please take a look at the updated version and let me know your thoughts

olegz avatar Nov 01 '24 06:11 olegz

@olegz Have you pushed your commit on the GH-2969 branch? I can't seem to see it https://github.com/spring-cloud/spring-cloud-stream/compare/main...GH-2969

carolmorneau avatar Nov 01 '24 14:11 carolmorneau

All done, sorry, made some mistake when pushed it originally - https://github.com/spring-cloud/spring-cloud-stream/tree/GH-2969

olegz avatar Nov 02 '24 05:11 olegz

Thank you @olegz, this looks good to us. This will allow us to standardize on scst_batchedHeaders.

Since this issue is also about getting the proposed design Spring approved, will there eventually be some related documentation updates?

For instance, some examples where a producer binding can receive a whole batched message at once and is capable of processing it in a single invocation.

carolmorneau avatar Nov 04 '24 19:11 carolmorneau

@carolmorneau Sure, we can eventually update the docs, but we need reference something that is using it and for now our binders do not and I don't see them using it in a forceable future given that the corresponding kafka/rabbit headers are created upstream by the frameworks other then SCST. Perhaps there could be some usage of the utility class even today, but let's see how it goes.

For now I would like to merge it to main so it can be included in the upcoming RC1 Please let me know if you have any objections.

olegz avatar Nov 05 '24 11:11 olegz

Sounds good, this is good to merge Thank you @olegz

carolmorneau avatar Nov 05 '24 13:11 carolmorneau

@carolmorneau I am closing it. Feel free to open a new issue if any enhancements are needed

olegz avatar Dec 19 '24 11:12 olegz