spring-cloud-stream icon indicating copy to clipboard operation
spring-cloud-stream copied to clipboard

I want to consume KTable and materialize with optimization

Open snskshn opened this issue 1 year ago • 0 comments

When I consume KTable, it is materialized as KeyValueStore automatically.

fun process() = BiConsumer<KStream<String, String>, KTable<String, String>> { input, table ->
    input.join(table) { value1, value2 ->
        Pair(value1, value2)
    }.peek { key, value -> println("$key: $value") }
}

If I set topology.optimization to all then, changelog topic is not created but reuse consuming topic as changelog topic. Because Kafka Streams join semantics, I want to change the type of state store to VersionedKeyValueStore. If I materialize state store manually like this, it creates changelog topic although I set topology.optimization to all.

fun process() = BiConsumer<KStream<String, String>, KTable<String, String>> { input, table ->
    val storedTable = table
        .toStream()
        .groupByKey()
        .aggregate(
            { byteArrayOf() },
            { _, value, _ -> value.toByteArray() },
            Materialized.`as`(store)
        )
    val store = Stores.persistentVersionedKeyValueStore("kafka-streams-test-store", Duration.ofDays(1))
    val stream = input.join(storedTable) { value1, value2 ->
        Pair(value1, value2)
    }.peek { key, value -> println("$key, ${value?.first}, ${value?.second?.decodeToString()") }
}

Is there any way to change the type of state store with not creating changelog topic via optimization?

related so: https://stackoverflow.com/questions/77806593/how-to-change-state-store-type-of-ktable-from-keyvaluestore-to-versionedkeyvalue

snskshn avatar Jan 19 '24 08:01 snskshn