kafka-streams
kafka-streams copied to clipboard
Questions about KTables
Hi,
I'm very new to Kafka and streaming platforms so I apologize if my understanding is not correct. One use-case I see for using KTables is to handle external queries (e.g. from a GUI) without having to store the state of my application in a separated database (thus, Kafka is the single source of truth).
The thing I don't understand is the purpose of methods such as consumeUntilCount() or consumeUntilMs(). If I want the table to represent the current state of my application, I have to read all the messages from the topic, from the oldest to the newest message, don't I? And as consumeUntilLastOffset() is not implemented yet, how am I supposed to do this?
Hi @jonathan-roy kafka streams can be use to solve a lot of problems in different use-cases. Most of the time, especially when trying to build on demand sets or maps from the streams, for example via KTables as you just described, this adds a lot of complexity which is most likely not necessary. A slightly different approach with local DB + Kafka topic as event bus and backup might be a better solution.
However, regarding your question: A table is a snapshot of a stream at a certain time, therefore it will never contain "all events of a stream" - which is why there has not been much interest in developing a consumeUntilEnd method yet. I see your need for consumeUntilLastOffset though, which can just a bit tricky to implement because assigned partitions can change any time on rebalances. Il mark this as feature request.
Thank you for your answer @krystianity.
I thought a table was used to continually represent the current state of the application, not the state at a certain time. E.g. if a topic contains the following records representing users:
{"id": "1", "name": "Jonathan", "country": "France"}{"id": "2", "name": "Christian", "country": "Germany"}{"id": "1", "name": "Jonathan", "country": "Spain"}
then the corresponding table would be:
{
"1": {"id": "1", "name": "Jonathan", "country": "Spain"},
"2": {"id": "2", "name": "Christian", "country": "Germany"}
}
This way, I would not have to store that current state into a local DB.
I would really like to see this feature implemented. Since most is at the core of the stream processing here, it would be simple enough do something like:
- Extend the
KStorageclass withnext,complete,errormethods - Extend the
KTableclass with asubscribemethod which takes an object implementing the appropriate interface (havingnext,complete,errormethods at the least. - Call
subscribein theKTableconstructor after callingmap, passing the storage instance. This could use thesubscribemethod onstream$to connect the storage instance via itsnext,completeanderrormethods. (https://github.com/cujojs/most/blob/HEAD/docs/api.md#subscribe)
This would allow the store itself to keep abreast of changes in the KTable stream, and also be queryable at runtime without closing the connection to Kafka. I also think this design closely follows the existing designs and paradigms currently used in kafka-streams.
Changes to the finalise method would be required as well, but I think beyond that the above would roughly outline a non-breaking update that could make the entire lib:
- more powerful and,
- more closely resembling the features provided by the Java Kafka Streams API.
I've actually made a fork, with a working implementation - https://github.com/johnhalbert/kafka-streams - I wanted to see if I could get feedback on this.
Also, before creating the fork, I was working on this problem outside of the lib and adapted what I was doing there to create a minimal example of how this could be used. See here - https://github.com/johnhalbert/kafka-streams-level-example. You can basically ignore the README.md, as it's not so pertinent to this ticket.
Ultimately, if something like this makes it into the lib, it would be extremely beneficial to be able to use other implementations of KStorage (for example, one with persisted storage using LevelDB like what I was working on in the example I posted here). This could really provide the opportunity to make the project more robust in general (e.g. persisting state to disk, and recovering from disk in the event the application crashes, rather than replaying everything from Kafka to recreate a KTable, etc.).
I appreciate any feedback. Thanks for making such a good implementation for Node!
@rob3000 I wanted to get feedback before just submitting a pull request. If you think the suggestion here works, I can make a pull request from my fork.
Hey @johnhalbert the suggestion looks good! One thing to note is that we have started to move the repo over to typescript and are just waiting on the following PR to be fixed: https://github.com/nodefluent/kafka-streams/pull/167