spring-cloud-stream
spring-cloud-stream copied to clipboard
I want to consume KTable and materialize with optimization
When I consume KTable, it is materialized as KeyValueStore automatically.
fun process() = BiConsumer<KStream<String, String>, KTable<String, String>> { input, table ->
input.join(table) { value1, value2 ->
Pair(value1, value2)
}.peek { key, value -> println("$key: $value") }
}
If I set topology.optimization to all then, changelog topic is not created but reuse consuming topic as changelog topic. Because Kafka Streams join semantics, I want to change the type of state store to VersionedKeyValueStore. If I materialize state store manually like this, it creates changelog topic although I set topology.optimization to all.
fun process() = BiConsumer<KStream<String, String>, KTable<String, String>> { input, table ->
val storedTable = table
.toStream()
.groupByKey()
.aggregate(
{ byteArrayOf() },
{ _, value, _ -> value.toByteArray() },
Materialized.`as`(store)
)
val store = Stores.persistentVersionedKeyValueStore("kafka-streams-test-store", Duration.ofDays(1))
val stream = input.join(storedTable) { value1, value2 ->
Pair(value1, value2)
}.peek { key, value -> println("$key, ${value?.first}, ${value?.second?.decodeToString()") }
}
Is there any way to change the type of state store with not creating changelog topic via optimization?
related so: https://stackoverflow.com/questions/77806593/how-to-change-state-store-type-of-ktable-from-keyvaluestore-to-versionedkeyvalue