example-kafkastreams icon indicating copy to clipboard operation
example-kafkastreams copied to clipboard

possible PR for stricter types (Processor API) for stores

Open phderome opened this issue 6 years ago • 2 comments

Hiding Strings in distinct types that we'd use internally would help. In code we use evpv-store and pv-store only once as Strings and immediately to hide them in distinct classes. Also we force each ProcessorSupplier to be named by construction instead of waiting for the time we specify it to Kafka JAVA API (NamedProcessorSupplier case class). Ideally, we'd define our own addProcessor and addStateStore methods as extension methods (Scala or C# concept) to Topology so that we specify fewer parameters (i.e. we cannot provide the name and the supplier as two distinct parameters as I still do below).

  case class PvStoreName(value: String) extends AnyVal

  case class EvPvStoreName(value: String) extends AnyVal

  case class NamedProcessorSupplier[K, V](name: String, supplier: ProcessorSupplier[K, V])

evPvStoreBuilder returns the name (not as String), ideally it should return a single object as case class (a NamedStore just like the NamedProcessorSupplier idea).

def clickstreamJoinProcessorApi(): Topology = {
    // don't use Strings in our API, use stricter types to enforce constraints
    val (pvStoreName, pvStore) = pvStoreBuilder(PvStoreName("pv-store"), PvWindow)
    val (evPvStoreName, evPvStore) = evPvStoreBuilder(EvPvStoreName("evpv-store"), EvPvWindow)

    val namedPvWindowProcessor = NamedProcessorSupplier("pv-window-processor",
      () => new PvWindowProcessor(pvStoreName)
    )

    val namedEvJoinProcessor = NamedProcessorSupplier("ev-join-processor",
      () => new EvJoinProcessor(pvStoreName, evPvStoreName, PvWindow, EvPvWindow)
    )

    val namedEvPvMapProcessor = NamedProcessorSupplier("ev-pv-processor",
      () => new EvPvMapProcessor()
    )

    val builder =  new Topology()
      // sources
      .addSource(PvTopic, PvTopic)
      .addSource(EvTopic, EvTopic)
    builder
      // window for page views
      .addProcessor(namedPvWindowProcessor.name, namedPvWindowProcessor.supplier, PvTopic)
      // join on (clientId + pvId + evId) and deduplicate
      .addProcessor(namedEvJoinProcessor.name, namedEvJoinProcessor.supplier, EvTopic)
    builder
      // map key again into clientId
      .addProcessor(namedEvPvMapProcessor.name, namedEvPvMapProcessor.supplier, namedEvJoinProcessor.name)
    builder
      // stores
      .addStateStore(pvStore, namedPvWindowProcessor.name, namedEvJoinProcessor.name)
      .addStateStore(evPvStore, namedEvJoinProcessor.name)
      // sink
      .addSink(EvPvTopic, EvPvTopic, namedEvPvMapProcessor.name) // complete topology

  }

Below should perhaps return a case class rather than a Tuple (less safe).

  def evPvStoreBuilder(storeName: EvPvStoreName, storeWindow: FiniteDuration): (EvPvStoreName, StoreBuilder[WindowStore[EvPvKey, EvPv]]) = {

Similarly

  def pvStoreBuilder(storeName: PvStoreName, storeWindow: FiniteDuration): (PvStoreName, StoreBuilder[WindowStore[ClientKey, Pv]]) = {

phderome avatar Nov 26 '17 19:11 phderome

One step forward, Topology could be also extended using implicit class to handle NamedProcessorSupplier directly. But this is rather a workaround than desired design. I don't really like string based wiring between sources, sinks, processors and state stores.

Let's imagine the following DSL inspired by Akka Streams with explicit wiring:

val pvSource = (...) => Source[ClientKey, Pv]
val evSource = (...) => Source[ClientKey, Ev]

val evPvSink = (...) => Sink[ClientKey, EvPv]

val pvStore = (...) => WindowStore[PvKey, Pv]
val evPvStore = (...) => WindowStore[EvPvKey, EvPv]

val pvWindowProcessor = (pvStore) => Processor[ClientKey, Pv]
val evJoinProcessor = (pvStore, evPvStore) => Processor[ClientKey, Ev, Pv]
val evPvMapProcessor = (...) => Processor[ClientKey, EvPv]

evSource ~> evJoinProcessor ~> evPvMapProcessor ~> evPvSink
pvSource ~> pvWindowProcessor ~> evJoinProcessor

With a little help of generic types it should be feasible to get compile time errors for:

evJoinProcessor ~> pvWindowProcessor
evSource ~> evPvSink

mkuthan avatar Nov 26 '17 22:11 mkuthan

Let me know if you'd like extension methods on Topology with a "clean" API for name constraints (on a feature branch), that seems easy enough for me.

I might potentially look at your ultimate solution in the next while, but don't count on it as my time is rather limited and so is my limited experience with generic types or Shapeless (odds are about 10% or less).

If you know about the functional Scala db access library Quill, it has recently been extended to provide support for structured Spark API (DataFrames and Datasets) and I'd guess it uses type dependent techniques. There's even someone who suggested providing an interface between KSQL and Quill but that looks like a pet wish list for now.

On a side note, it seems like the solution you have for lazy serialization of KafkaProducer within Spark is now no longer necessary as per Spark 2.2 which integrates very naturally with Kafka (https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html)?

phderome avatar Nov 27 '17 12:11 phderome