kafka-streams-scala icon indicating copy to clipboard operation
kafka-streams-scala copied to clipboard

case classes

Open mazorigal opened this issue 6 years ago • 12 comments

Hi,

is there any example how to use kafka-streams-scala with case classes ?

Thanks,

mazorigal avatar Jan 22 '18 11:01 mazorigal

Can u please clarify what u mean by use kafka-streams-scala with case classes ? Maybe an example of what u want to do ..

debasishg avatar Jan 22 '18 13:01 debasishg

I mean to use case classes as the value of the message for the between steps in the stream processing topology. meaning if I have different transformation steps (for example .map), I would like to use case classes as the message value datatype between those steps, instead of constantly SerDe between case class and string.

mazorigal avatar Jan 22 '18 14:01 mazorigal

Currently we don't have higher level APIs towards this end.

debasishg avatar Jan 22 '18 14:01 debasishg

could you perhapses still share then the idea how that can be used in the meanwhile ? I see that in the https://github.com/aseigneurin/kafka-streams-scala repo, base on which kafka-streams-scala was inspired, there is some example with case classes, does its same principal ?

mazorigal avatar Jan 22 '18 14:01 mazorigal

The example with User case class applies to the current library as well. You need to define the Serde for the case class and pass it along. I was talking about higher level abstractions where the entire Serde thingy will not be exposed to the user.

debasishg avatar Jan 23 '18 12:01 debasishg

Would you support the higher level abstractions ? any roadmap to follow ?

mazorigal avatar Jan 23 '18 13:01 mazorigal

@mazorigal To give you some pointers I'm using spray-json serialization for my case classes for "internal" messaging (see https://gist.github.com/jeroenr/2895de32accd440c2558261a49952cab) and I'm using a Serde implementation (see https://gist.github.com/jeroenr/8b0cc0a4ce3b4d521de28267867bc003) based on avro4s (https://github.com/sksamuel/avro4s) to consume from Kafka Connect, which uses the avro format.

Hope this helps :)

jeroenr avatar Feb 20 '18 10:02 jeroenr

Thanks, its indeed helpful small question: Since the case classes are used for "internal" messaging, why not to use case class serialisation to bytes directly, without doing it via JSON ? the nice JSON output I guess needed anyway only in the output topic which might be consumed by other apps.

mazorigal avatar Feb 20 '18 10:02 mazorigal

Yeah I use JSON for doing joins, for instance. Generally it's easier to filter and transform on. For internal state store I'm using Array[Byte] for the values.

jeroenr avatar Feb 20 '18 10:02 jeroenr

Hi @mazorigal -

In the latest release 0.2.0 we have implicit serdes implementation, where u can define a case class, define its serde, have an implicit in scope and then the serde gets used all over without being explicit about it. Please have a look at https://github.com/lightbend/kafka-streams-scala/blob/develop/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala#L37-L61 ..

Does this answer your question ?

debasishg avatar Mar 14 '18 17:03 debasishg

thanks! few questions:

so if I would like to have my specific serde for case classes, all I need is to extend the custom serde class with StatelessScalaSerde trait and implement:

 def serialize(data: T): Array[Byte]
 def deserialize(data: Array[Byte]): Option[T]

?

Does StatelessScalaSerde is the one to be always used for the custom serde extension ? I see that there are multiple traits here: https://github.com/lightbend/kafka-streams-scala/blob/74aabcd6369beef6915fbb77e92bdbf419da0403/src/main/scala/com/lightbend/kafka/scala/streams/ScalaSerde.scala

Just to make sure, the signature of the custom serde class the example is specific for AVRO serde implementation ?

[T >: Null : SchemaFor : FromRecord : ToRecord]

thnaks,

mazorigal avatar Mar 15 '18 15:03 mazorigal

@mazorigal Serde: is the Java Kafka Stream implementation ScalaSerde: is a Serde with no configuration and nothing to close (it's actually stateless) StatelessScalaSerde: is allows you to implement a custom Serde without implementing Serializer and Deserializer.

All of this is confusing to me and I've opened a PR to add better helpers: https://github.com/lightbend/kafka-streams-scala/pull/70

joan38 avatar May 03 '18 10:05 joan38