kafka-streams-scala
kafka-streams-scala copied to clipboard
support for ValueTransformerWithKey
Currently, KStreamS#transformValues only has support for ValueTransformer. ValueTransformerWithKey support is needed.
Wrote an extension to support it
// Add ValueTransformerWithKeySupplier
import com.lightbend.kafka.scala.streams.KStreamS
implicit class KStreamSWithValueTransformerKey[K,V](inner: KStreamS[K,V]) extends KStreamS[K,V](inner.inner){
def transformValuesWithKey[VR](valueTransformerSupplierWithKey: () => ValueTransformerWithKey[K, V, VR],
stateStoreNames: String*): KStreamS[K, VR] = {
val valueTransformerWithKeySupplierJ: ValueTransformerWithKeySupplier[K, V, VR] = () => valueTransformerSupplierWithKey()
new KStreamS(inner.inner.transformValues[VR](valueTransformerWithKeySupplierJ, stateStoreNames: _*))
}
}
Not submitting a pull request since this only applies to Kafka 1.1.0 and the current branches are for Kafka 1.0.0. Let me know if a branch for Kafka 1.1.0 is opened.
def transformValuesWithKey[VR](valueTransformerWithKeySupplier: () => ValueTransformerWithKey[K, V, VR],
stateStoreNames: String*): KStreamS[K, VR] = {
val valueTransformerWithKeySupplierJ: ValueTransformerWithKeySupplier[K, V, VR] = () => valueTransformerWithKeySupplier()
new KStreamS(inner.transformValues[VR](valueTransformerWithKeySupplierJ, stateStoreNames: _*))
}
would do
Please submit a PR to apache/kafka. streams-scala
is on master
now .. https://github.com/apache/kafka/tree/trunk/streams/streams-scala .. and will be released with 2.0