spring-cloud-stream
spring-cloud-stream copied to clipboard
Kafka stream state stores cannot be found after upgrade to 3.2.5
Describe the issue The following code works with SCS 3.2.4:
@Service
class TechnischerplatzAdresseHandler(private val interactiveQueryService: InteractiveQueryService) {
private val technischerplatzAdresseStore: ReadOnlyKeyValueStore<String, TechnischerplatzAdresse>
get() = interactiveQueryService.getStore("technischerplatzadresse-store")
suspend fun get(request: ServerRequest) = withContext(Dispatchers.IO) {
technischerplatzAdresseStore["1234"]?.let {
ServerResponse.ok().build().awaitSingle()
} ?: ServerResponse.notFound().buildAndAwait()
}
}
But it raises an exception with SCS 3.2.5:
Caused by: org.apache.kafka.streams.errors.UnknownStateStoreException: Cannot get state store technischerplatzadresse-store because no such store is registered in the topology.
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1603)
at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.getStateStoreFromKafkaStreams(InteractiveQueryService.java:121)
at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getQueryableStore$0(InteractiveQueryService.java:109)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:209)
at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.getQueryableStore(InteractiveQueryService.java:88)
at at.wrwks.bef.projekt.domain.technischerplatzadresse.TechnischerplatzAdresseHandler$get$2.invokeSuspend(TechnischerplatzAdresseHandler.kt:39)
To Reproduce Try to load a state store as described
Version of the framework Spring Cloud ~~2020.0.4~~ 2021.0.4 with Spring Cloud Stream 3.2.5
Expected behavior Screenshots
Additional context
With debugging I got down to StreamsMetadataState#isInitialized()
where clusterMetadata.topics().isEmpty() == true
which it shouldn't be. It is located in kafka-streams:3.1.2
on line 321.
The topology hasn't changed and the store is there.
@andrashatvani Do you mean Spring Cloud version 2021.0.4
? Because 2020.0.x
is based on Boot 2.4
and Spring Cloud Stream 3.1.x
etc. There we do not see the issue. We were able to reproduce what you see on 3.2.5
which looks like some sort of regression issue. We will investigate and get back to you on this. Thanks for reporting.
@sobychacko Yes, I meant 2021.0.4 - corrected in the original description too.
Hi @andrashatvani ,
We have identified the issue and are working on a fix.
In summary, we are relying on the StreamsMetadataState
to actually have the metadata. However, as you pointed out above, it thinks it is not initialized (which forces a short circuit empty return). The StreamsMetadataState.topologyMetadata
does in fact have the source topics for the store. This could be a bug in KS but even if it were we would not want to wait on that fix. The plan is to adjust the code here and then determine if there is a bug to file in KS. If there is a bug, we would consider re-adjusting this code to use the StreamsMetadataState
once fixed.
FWIW: The code in question was introduced as an effort to fix #2445. We do have integration tests for these cases, however, the KS StreamsMetadataState
code behaves properly in that setup. We will also make sure the tests are doing what they are supposed to be doing.
Workaround
Hi @andrashatvani
As a workaround you should be able to set the application.server property.
spring.cloud.stream.kafka.streams.binder.configuration.application.server=<server>:<port>
Typically this is only needed in a multi-instance KafkaStreams application but the mechanism we are using to distinguish the state stores requires this to be set in all cases (that is part of the bug).
Let me know if this unblocks you.
@onobc Thanks for the update. As we have dozens of services we'll stick to 2021.0.3 until the version with the fix will be released.
The issue was reported against 3.2.x and was fixed in main. Re-opening until fix is backported to 3.2.x.