kafka-streams icon indicating copy to clipboard operation
kafka-streams copied to clipboard

Questions about KTables

Open jonathan-roy opened this issue 7 years ago • 5 comments

Hi,

I'm very new to Kafka and streaming platforms so I apologize if my understanding is not correct. One use-case I see for using KTables is to handle external queries (e.g. from a GUI) without having to store the state of my application in a separated database (thus, Kafka is the single source of truth).

The thing I don't understand is the purpose of methods such as consumeUntilCount() or consumeUntilMs(). If I want the table to represent the current state of my application, I have to read all the messages from the topic, from the oldest to the newest message, don't I? And as consumeUntilLastOffset() is not implemented yet, how am I supposed to do this?

jonathan-roy avatar Jun 04 '18 11:06 jonathan-roy

Hi @jonathan-roy kafka streams can be use to solve a lot of problems in different use-cases. Most of the time, especially when trying to build on demand sets or maps from the streams, for example via KTables as you just described, this adds a lot of complexity which is most likely not necessary. A slightly different approach with local DB + Kafka topic as event bus and backup might be a better solution.

However, regarding your question: A table is a snapshot of a stream at a certain time, therefore it will never contain "all events of a stream" - which is why there has not been much interest in developing a consumeUntilEnd method yet. I see your need for consumeUntilLastOffset though, which can just a bit tricky to implement because assigned partitions can change any time on rebalances. Il mark this as feature request.

krystianity avatar Jun 06 '18 09:06 krystianity

Thank you for your answer @krystianity.

I thought a table was used to continually represent the current state of the application, not the state at a certain time. E.g. if a topic contains the following records representing users:

  • {"id": "1", "name": "Jonathan", "country": "France"}
  • {"id": "2", "name": "Christian", "country": "Germany"}
  • {"id": "1", "name": "Jonathan", "country": "Spain"}

then the corresponding table would be:

{
    "1": {"id": "1", "name":  "Jonathan", "country": "Spain"},
    "2": {"id": "2", "name":  "Christian", "country": "Germany"}
}

This way, I would not have to store that current state into a local DB.

jonathan-roy avatar Jun 07 '18 07:06 jonathan-roy

I would really like to see this feature implemented. Since most is at the core of the stream processing here, it would be simple enough do something like:

  1. Extend the KStorage class with next, complete, error methods
  2. Extend the KTable class with a subscribe method which takes an object implementing the appropriate interface (having next, complete, error methods at the least.
  3. Call subscribe in the KTable constructor after calling map, passing the storage instance. This could use the subscribe method on stream$ to connect the storage instance via its next, complete and error methods. (https://github.com/cujojs/most/blob/HEAD/docs/api.md#subscribe)

This would allow the store itself to keep abreast of changes in the KTable stream, and also be queryable at runtime without closing the connection to Kafka. I also think this design closely follows the existing designs and paradigms currently used in kafka-streams.

Changes to the finalise method would be required as well, but I think beyond that the above would roughly outline a non-breaking update that could make the entire lib:

  1. more powerful and,
  2. more closely resembling the features provided by the Java Kafka Streams API.

I've actually made a fork, with a working implementation - https://github.com/johnhalbert/kafka-streams - I wanted to see if I could get feedback on this.

Also, before creating the fork, I was working on this problem outside of the lib and adapted what I was doing there to create a minimal example of how this could be used. See here - https://github.com/johnhalbert/kafka-streams-level-example. You can basically ignore the README.md, as it's not so pertinent to this ticket.

Ultimately, if something like this makes it into the lib, it would be extremely beneficial to be able to use other implementations of KStorage (for example, one with persisted storage using LevelDB like what I was working on in the example I posted here). This could really provide the opportunity to make the project more robust in general (e.g. persisting state to disk, and recovering from disk in the event the application crashes, rather than replaying everything from Kafka to recreate a KTable, etc.).

I appreciate any feedback. Thanks for making such a good implementation for Node!

johnhalbert avatar Jul 22 '20 04:07 johnhalbert

@rob3000 I wanted to get feedback before just submitting a pull request. If you think the suggestion here works, I can make a pull request from my fork.

johnhalbert avatar Sep 10 '20 14:09 johnhalbert

Hey @johnhalbert the suggestion looks good! One thing to note is that we have started to move the repo over to typescript and are just waiting on the following PR to be fixed: https://github.com/nodefluent/kafka-streams/pull/167

rob3000 avatar Sep 11 '20 00:09 rob3000