spring-cloud-stream-binder-aws-kinesis
spring-cloud-stream-binder-aws-kinesis copied to clipboard
shard-iterator-type=TRIM_HORIZON not working
Spring Boot 3.1.0 spring-cloud-stream-binder-kinesis 4.0.0 Java 17 Localstack 2.1.0
Properties looks like:
spring.cloud.stream.bindings.input-in-0.destination=xxx-stream spring.cloud.stream.bindings.input-in-0.group=xxx-group spring.cloud.stream.bindings.input-in-0.content-type=text/plain
Tried both of these:
spring.cloud.stream.kinesis.bindings.input-in-0.consumer.shard-iterator-type=TRIM_HORIZON spring.cloud.stream.kinesis.bindings.input.consumer.shard-iterator-type=TRIM_HORIZON
But my consumer only gets called for new messages. If I use the AWS cli get-records against Localstack with TRIM_HORIZON, it gets all the messages, so nothing is broken there.
You probably are missing this condition:
NOTE: When
TRIM_HORIZONshard iterator type is used, we need to take into account the time lag which happens during pointing theShardIteratorto the last untrimmed record in the shard in the system (the oldest data record in the shard). So thegetRecords()will move from that point to the last point, which takes time. It is by default 1 day, and it can be extended to 7 days. This happens only for new consumer groups. Any subsequent starts of the consumer in the same group are adjusted according the stored checkpoint viaAFTER_SEQUENCE_NUMBERiterator type.
See docs: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc#kinesis-consumer-properties
I wonder now if this behavior really makes sense when we are explicit.
It is still TRIM_HORIZON for new groups by default, but for existing group it really should be an AFTER_SEQUENCE_NUMBER by default and an explicit TRIM_HORIZON must change the logic as it is really what we have asked.
Not sure though if we can make this breaking change in the current 4.0.1. Perhaps better to defer until 4.1.0...