kafka-streams-scala
kafka-streams-scala copied to clipboard
case classes
Hi,
is there any example how to use kafka-streams-scala with case classes ?
Thanks,
Can u please clarify what u mean by use kafka-streams-scala with case classes ? Maybe an example of what u want to do ..
I mean to use case classes as the value of the message for the between steps in the stream processing topology. meaning if I have different transformation steps (for example .map), I would like to use case classes as the message value datatype between those steps, instead of constantly SerDe between case class and string.
Currently we don't have higher level APIs towards this end.
could you perhapses still share then the idea how that can be used in the meanwhile ? I see that in the https://github.com/aseigneurin/kafka-streams-scala repo, base on which kafka-streams-scala was inspired, there is some example with case classes, does its same principal ?
The example with User
case class applies to the current library as well. You need to define the Serde
for the case class and pass it along. I was talking about higher level abstractions where the entire Serde
thingy will not be exposed to the user.
Would you support the higher level abstractions ? any roadmap to follow ?
@mazorigal To give you some pointers I'm using spray-json serialization for my case classes for "internal" messaging (see https://gist.github.com/jeroenr/2895de32accd440c2558261a49952cab) and I'm using a Serde implementation (see https://gist.github.com/jeroenr/8b0cc0a4ce3b4d521de28267867bc003) based on avro4s (https://github.com/sksamuel/avro4s) to consume from Kafka Connect, which uses the avro format.
Hope this helps :)
Thanks, its indeed helpful small question: Since the case classes are used for "internal" messaging, why not to use case class serialisation to bytes directly, without doing it via JSON ? the nice JSON output I guess needed anyway only in the output topic which might be consumed by other apps.
Yeah I use JSON for doing joins, for instance. Generally it's easier to filter and transform on. For internal state store I'm using Array[Byte] for the values.
Hi @mazorigal -
In the latest release 0.2.0
we have implicit serdes implementation, where u can define a case class, define its serde, have an implicit in scope and then the serde gets used all over without being explicit about it. Please have a look at https://github.com/lightbend/kafka-streams-scala/blob/develop/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala#L37-L61 ..
Does this answer your question ?
thanks! few questions:
so if I would like to have my specific serde for case classes, all I need is to extend the custom serde class with StatelessScalaSerde trait and implement:
def serialize(data: T): Array[Byte]
def deserialize(data: Array[Byte]): Option[T]
?
Does StatelessScalaSerde
is the one to be always used for the custom serde extension ? I see that there are multiple traits here:
https://github.com/lightbend/kafka-streams-scala/blob/74aabcd6369beef6915fbb77e92bdbf419da0403/src/main/scala/com/lightbend/kafka/scala/streams/ScalaSerde.scala
Just to make sure, the signature of the custom serde class the example is specific for AVRO serde implementation ?
[T >: Null : SchemaFor : FromRecord : ToRecord]
thnaks,
@mazorigal
Serde
: is the Java Kafka Stream implementation
ScalaSerde
: is a Serde with no configuration and nothing to close (it's actually stateless)
StatelessScalaSerde
: is allows you to implement a custom Serde without implementing Serializer
and Deserializer
.
All of this is confusing to me and I've opened a PR to add better helpers: https://github.com/lightbend/kafka-streams-scala/pull/70