example-kafkastreams
example-kafkastreams copied to clipboard
interested in minor changes for polymorphism?
With this and a few more minor changes
daemonThread {
sleep(3.seconds) // scalastyle:off
startStreams(deduplicate(Serdes.String(), Serdes.String()))
}
I can change
Stores.windowStoreBuilder(
Stores.persistentWindowStore(storeName, retention, segments, window, retainDuplicates),
Serdes.String(),
Serdes.String()
)
to
Stores.windowStoreBuilder(
Stores.persistentWindowStore(storeName, retention, segments, window, retainDuplicates),
sk, // variable for Serdes<K>
sv // variable for Serdes<V>
)
and make the example fully generic in K, V
(though still initialized as String, String
).
Also we could do this to prevent injecting values that are assumed to be String and leave that at the edge of the program (start up). The AnyRef
type constraint is so that I don't attempt to rework the type of GenericProducer which assumes AnyRef for keys and values.
def duplicates[K <: AnyRef, V <: AnyRef](producer: GenericProducer,
keyGen: Int => K,
valGen: Int => V): Unit