spring-cloud-stream icon indicating copy to clipboard operation
spring-cloud-stream copied to clipboard

Kafka stream state stores cannot be found after upgrade to 3.2.5

Open andrashatvani opened this issue 2 years ago • 2 comments

Describe the issue The following code works with SCS 3.2.4:

@Service
class TechnischerplatzAdresseHandler(private val interactiveQueryService: InteractiveQueryService) {
    private val technischerplatzAdresseStore: ReadOnlyKeyValueStore<String, TechnischerplatzAdresse>
        get() = interactiveQueryService.getStore("technischerplatzadresse-store")

    suspend fun get(request: ServerRequest) = withContext(Dispatchers.IO) {
        technischerplatzAdresseStore["1234"]?.let {
            ServerResponse.ok().build().awaitSingle()
        } ?: ServerResponse.notFound().buildAndAwait()
    }
}

But it raises an exception with SCS 3.2.5:

Caused by: org.apache.kafka.streams.errors.UnknownStateStoreException: Cannot get state store technischerplatzadresse-store because no such store is registered in the topology.
	at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1603)
	at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.getStateStoreFromKafkaStreams(InteractiveQueryService.java:121)
	at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getQueryableStore$0(InteractiveQueryService.java:109)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:209)
	at org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.getQueryableStore(InteractiveQueryService.java:88)
	at at.wrwks.bef.projekt.domain.technischerplatzadresse.TechnischerplatzAdresseHandler$get$2.invokeSuspend(TechnischerplatzAdresseHandler.kt:39)

To Reproduce Try to load a state store as described

Version of the framework Spring Cloud ~~2020.0.4~~ 2021.0.4 with Spring Cloud Stream 3.2.5

Expected behavior Screenshots

Additional context With debugging I got down to StreamsMetadataState#isInitialized() where clusterMetadata.topics().isEmpty() == true which it shouldn't be. It is located in kafka-streams:3.1.2 on line 321. The topology hasn't changed and the store is there.

andrashatvani avatar Oct 05 '22 14:10 andrashatvani

@andrashatvani Do you mean Spring Cloud version 2021.0.4? Because 2020.0.x is based on Boot 2.4 and Spring Cloud Stream 3.1.x etc. There we do not see the issue. We were able to reproduce what you see on 3.2.5 which looks like some sort of regression issue. We will investigate and get back to you on this. Thanks for reporting.

sobychacko avatar Oct 05 '22 23:10 sobychacko

@sobychacko Yes, I meant 2021.0.4 - corrected in the original description too.

andrashatvani avatar Oct 06 '22 04:10 andrashatvani

Hi @andrashatvani ,

We have identified the issue and are working on a fix.

In summary, we are relying on the StreamsMetadataState to actually have the metadata. However, as you pointed out above, it thinks it is not initialized (which forces a short circuit empty return). The StreamsMetadataState.topologyMetadata does in fact have the source topics for the store. This could be a bug in KS but even if it were we would not want to wait on that fix. The plan is to adjust the code here and then determine if there is a bug to file in KS. If there is a bug, we would consider re-adjusting this code to use the StreamsMetadataState once fixed.

FWIW: The code in question was introduced as an effort to fix #2445. We do have integration tests for these cases, however, the KS StreamsMetadataState code behaves properly in that setup. We will also make sure the tests are doing what they are supposed to be doing.

onobc avatar Oct 07 '22 21:10 onobc

Workaround

Hi @andrashatvani

As a workaround you should be able to set the application.server property.

spring.cloud.stream.kafka.streams.binder.configuration.application.server=<server>:<port>

Typically this is only needed in a multi-instance KafkaStreams application but the mechanism we are using to distinguish the state stores requires this to be set in all cases (that is part of the bug).

Let me know if this unblocks you.

onobc avatar Oct 09 '22 03:10 onobc

@onobc Thanks for the update. As we have dozens of services we'll stick to 2021.0.3 until the version with the fix will be released.

andrashatvani avatar Oct 09 '22 08:10 andrashatvani

The issue was reported against 3.2.x and was fixed in main. Re-opening until fix is backported to 3.2.x.

onobc avatar Oct 13 '22 21:10 onobc