example-kafkastreams icon indicating copy to clipboard operation
example-kafkastreams copied to clipboard

interested in minor changes for polymorphism?

Open phderome opened this issue 6 years ago • 0 comments

With this and a few more minor changes

daemonThread {
  sleep(3.seconds) // scalastyle:off
  startStreams(deduplicate(Serdes.String(), Serdes.String()))
}

I can change

Stores.windowStoreBuilder(
      Stores.persistentWindowStore(storeName, retention, segments, window, retainDuplicates),
      Serdes.String(),
      Serdes.String()
    )

to

Stores.windowStoreBuilder(
        Stores.persistentWindowStore(storeName, retention, segments, window, retainDuplicates),
        sk, // variable for Serdes<K>
        sv // variable for Serdes<V>
      )

and make the example fully generic in K, V (though still initialized as String, String).

Also we could do this to prevent injecting values that are assumed to be String and leave that at the edge of the program (start up). The AnyRef type constraint is so that I don't attempt to rework the type of GenericProducer which assumes AnyRef for keys and values.

def duplicates[K <: AnyRef, V <: AnyRef](producer: GenericProducer,
                                           keyGen: Int => K,
                                           valGen: Int => V): Unit

phderome avatar Nov 26 '17 16:11 phderome