kafka-streams-scala icon indicating copy to clipboard operation
kafka-streams-scala copied to clipboard

support for ValueTransformerWithKey

Open hashangayasri opened this issue 6 years ago • 3 comments

Currently, KStreamS#transformValues only has support for ValueTransformer. ValueTransformerWithKey support is needed.

hashangayasri avatar Jun 21 '18 05:06 hashangayasri

Wrote an extension to support it

  // Add ValueTransformerWithKeySupplier
  import com.lightbend.kafka.scala.streams.KStreamS
  implicit class KStreamSWithValueTransformerKey[K,V](inner: KStreamS[K,V]) extends KStreamS[K,V](inner.inner){
    def transformValuesWithKey[VR](valueTransformerSupplierWithKey: () => ValueTransformerWithKey[K, V, VR],
                                   stateStoreNames: String*): KStreamS[K, VR] = {

      val valueTransformerWithKeySupplierJ: ValueTransformerWithKeySupplier[K, V, VR] = () => valueTransformerSupplierWithKey()
      new KStreamS(inner.inner.transformValues[VR](valueTransformerWithKeySupplierJ, stateStoreNames: _*))
    }
  }

hashangayasri avatar Jun 21 '18 05:06 hashangayasri

Not submitting a pull request since this only applies to Kafka 1.1.0 and the current branches are for Kafka 1.0.0. Let me know if a branch for Kafka 1.1.0 is opened.

  def transformValuesWithKey[VR](valueTransformerWithKeySupplier: () => ValueTransformerWithKey[K, V, VR],
                                 stateStoreNames: String*): KStreamS[K, VR] = {

    val valueTransformerWithKeySupplierJ: ValueTransformerWithKeySupplier[K, V, VR] = () => valueTransformerWithKeySupplier()
    new KStreamS(inner.transformValues[VR](valueTransformerWithKeySupplierJ, stateStoreNames: _*))
  }

would do

hashangayasri avatar Jun 21 '18 05:06 hashangayasri

Please submit a PR to apache/kafka. streams-scala is on master now .. https://github.com/apache/kafka/tree/trunk/streams/streams-scala .. and will be released with 2.0

debasishg avatar Jun 21 '18 05:06 debasishg