kafka
kafka copied to clipboard
KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…
WindowedStore and SessionStore do not implement a strict retention time in general. We should consider to make retention time strict: even if we still have some record in the store (due to the segmented implementation), we might want to filter expired records on-read. This might benefit PAPI users.
This PR, adds the filtering behaviour in the Metered store so that, it gets automatically applied for cases when a custom state store is implemented
@mjsax , @ableegoldman plz review when you get the chance.
Ping @cadonna since he's got a good eye for this part of the code. Also @wcarlson5 @lct45 @showuon for help with reviews
@vamossagar12 , thanks for the PR. Have a quick look, and left some comments. But most importantly, the test coverage is not enough. I saw you changed
RocksDBWindowStore,RocksDBSessionStore, but there are no tests for them. Also, no integration tests for this change. I expected we should have aWindowStoretest and aSessionStoretest to see if we can actually remove the records after retention time. What do you think?Thank you.
@showuon , thank you. The changes in RocksDBWindowStore and RocksDBSessionStore are minimal(only changing the parent class). The real filtering logic is in Metered classes and that's where I have added all the tests by mocking the above 2 classes and so I thought we won't need unit tests for RocksDBWindowStore and RocksDBSessionStore. WDYT?
+1 for integration test though. Would add.
so I thought we won't need unit tests for RocksDBWindowStore and RocksDBSessionStore. WDYT?
Make sense to me.
@showuon , is there an existing integration test for Metered classes? I tried to find one to add the relevant tests but couldnt't find one..
@showuon , is there an existing integration test for Metered classes? I tried to find one to add the relevant tests but couldnt't find one..
Sorry for late reply. Metered store classes are for recording operation metrics, so you should have it when using any built-in state stores, ex: Stores.windowStoreBuilder will have MeteredWindowStore. Thanks.
@showuon , is there an existing integration test for Metered classes? I tried to find one to add the relevant tests but couldnt't find one..
Sorry for late reply. Metered store classes are for recording operation metrics, so you should have it when using any built-in state stores, ex:
Stores.windowStoreBuilderwill haveMeteredWindowStore. Thanks.
Thanks @showuon , I have found a bug with my. implementation. Will correct it and also add integration tests and then it could be reviewed.
@showuon , I have cleaned up the PR and added the integration tests. Plz review whenever you get the chance.
@showuon , plz review whenever you get the chance.
Thanks @showuon , made the suggested changes
@showuon sorry to bother you again on this one, but plz review whenever you get the chance..
- That's correct @showuon, the
wrapped().persistent()may not apply to custom in-memory stores. The only reason that check's there is that the InMemoryStateStore already does filtering, so I didn't want to include that as part of this change. - From my knowledge, any new public interface addition needs a KIP. Since this is going to be an internal interface, I didn't think we need a KIP. Infact, I think, if I were to add it to
WindowStoreorSessionStorethen I believe we would need a KIP and I didn't do it for that reason. What do you think? Maybe @ableegoldman can also chime in on this?
Yeah, any change to the public interface is going to require a KIP. And tbh the state store interface is already messy enough, I think we should try to avoid adding anything on that requires a KIP for this sort of thing, if at all possible
Thanks Sophie for that. @showuon , I think your concern about persistent stores and custom stores is still valid. The reason why I don't want to add the getObservedStreamTime method in SessionStore or WindowStore is that I feel it's not something which should be added to those interfaces. It's an internal level detail about how to track observed stream time.
Having said that, if we want this behaviour to be also available for custom stores(and which is why we chose to add it to MeteredStores), then those custom stores need to implement PersistentStore classes which is not the way it works today, right?
One approach could be to think about custom state stores separately and have this merged is this looks fine. That's because I think custom state stores will need more thinking because of the way the State stores are structured or wrapped. And also, today the Persistent StateStores provided by Kafka Streams don't enforce retention time which IMO is a bigger problem than Custom State Stores. That's my personal opinion and I am open for suggestions. Let me know if that makes sense.
@vamossagar12 , I'm fine that we can merge the Persistent StateStores improvement first, and think about custom state store later. But I'm not the one have the right to merge your code, so may be you should confirm with @ableegoldman or other committers. :)
Haha thanks @showuon :) @ableegoldman , need your input again on this one :D Do you agree with going ahead with merging this PR for Persistent stores and then having a separate ticket for Custom State stores? I can take that one up as well.
@vvcephei @cadonna @mjsax @guozhangwang anyone have any thoughts on this?
Hi @vamossagar12 @ableegoldman I took a quick look at the PR and I have a few questions:
- The fix itself seems to be applied only to persistent stores (built-in or customized), and not in-memory, assuming the latter has done its duties. But for customized in-memory stores it may not be the case. Is that right?
- I'm a it concerned about applying the filter logic at the metered store, for its complexity introduced. Instead, I'm wondering if we could enforce the retention period restriction into the inner store, i.e. 1) the built-in stores, at the
segmented storelayer would enforce this rule based on its observed stream time, and 2) we just assume customized stores themselves would enforce this rule, and hence do not have the second guard at the outer meteredStore layer.
@guozhangwang
- The in memory default state stores already does the filtering. Original thought was to make the Persistent Store be able to also enforce the same retention and hence the checks everywhere that you see.
- The idea of adding in MeteredStore was that the enforcement would be applicable to any custom store that's implemented. But turns out, it isn't that simple( I realised it after so many code changes lol). For persistent stores, we could push it down to the inner segmented stores(similar to how the in-memory stores do it today). If we can assume that custom state stores would take care of it, then no point having it in Metered store which goes against the original idea but should be ok IMO.
Did not look into this PR. Left the following comment on the JIRA:
I just got a report, that
InMemoryWindowStoredoes not enforce strict retention for IQ if caching is enabled. We should add corresponding tests before we close this ticket, to ensure that caching does not break strict retention time.
Can we address this in this PR? Based on the PR description we move the check into the metered store, and thus it should fix IQ (and thus the caching issue)?
Did not look into this PR. Left the following comment on the JIRA:
I just got a report, that
InMemoryWindowStoredoes not enforce strict retention for IQ if caching is enabled. We should add corresponding tests before we close this ticket, to ensure that caching does not break strict retention time.Can we address this in this PR? Based on the PR description we move the check into the metered store, and thus it should fix IQ (and thus the caching issue)?
Sure @mjsax , I would include it as part of the PR. I think I am already handling it for Persistent stores so I can extend it. Having said that, there have been discussions on this PR that would be worthwhile to move the enforcement down to the individual persistent stores as this implementation of mine seems to have added some complexity. Would like to know your thoughts on this whenever you get a chance to look at the PR.
@vvcephei @cadonna @mjsax @guozhangwang anyone have any thoughts on this?
hi all... it would be great if we can have some decision on this one. Whenever you get the chance, plz weigh in your thoughts.
Personally I'd suggest we enforce the rentention at the inner store, which means:
- for built-in in-memory stores, enforce at the InMemoryXXStore level.
- for built-in persistent stores, enforce at the SegmentStore level (which is used by all window stores).
- for user customized stores, they should enforced the retention themselves: this is via
WindowStoreSupplier#retentionPeriodcall which is accessible to users.
The rationale of doing that is:
- enforcing at the highest layer means that lower layers would still return more data than necessary which then need to be filtered at a per-record basis, which is less efficient.
- when more records are returned from the inner layer, it would also impact our intermediate layers such as polluting the caching layer.
Personally I'd suggest we enforce the rentention at the inner store, which means:
- for built-in in-memory stores, enforce at the InMemoryXXStore level.
- for built-in persistent stores, enforce at the SegmentStore level (which is used by all window stores).
- for user customized stores, they should enforced the retention themselves: this is via
WindowStoreSupplier#retentionPeriodcall which is accessible to users.The rationale of doing that is:
- enforcing at the highest layer means that lower layers would still return more data than necessary which then need to be filtered at a per-record basis, which is less efficient.
- when more records are returned from the inner layer, it would also impact our intermediate layers such as polluting the caching layer.
Thanks @guozhangwang for the response. While I agree with doing this enforcement at the inner most layer due to all the reasons mentioned, one of the problems highlighted on this PR by Mathias is that InMemoryWindowStore does not enforce strict retention for IQ if caching is enabled. While InMemoryWindowStore does enable strict retention but when combined with the caching store, it breaks. That's where doing it on MeteredStore might be beneficial.
While InMemoryWindowStore does enable strict retention but when combined with the caching store, it breaks. That's where doing it on MeteredStore might be beneficial.
I see that, and personally I think this is a bug in the caching layer that we should be fixing --- in fact, the cache itself should better be trimmed by retention so that we can improve on our cache hit as well. I think it is fine to not piggy-back that fix in this PR if the scope suddenly become much larger, as I feel that the issue @mjsax raised worth being fixed by its own PR.
While InMemoryWindowStore does enable strict retention but when combined with the caching store, it breaks. That's where doing it on MeteredStore might be beneficial.
I see that, and personally I think this is a bug in the caching layer that we should be fixing --- in fact, the cache itself should better be trimmed by retention so that we can improve on our cache hit as well. I think it is fine to not piggy-back that fix in this PR if the scope suddenly become much larger, as I feel that the issue @mjsax raised worth being fixed by its own PR.
I see.. So what you are suggesting is is that as part of this PR, I move all logic of strict enforcement to the innermost layer and fix the caching issue at the caching layer making the Metered layer cleaner. I think it makes sense because I feel, this PR has introduced some complexity which can be taken away. I can start with that unless @mjsax or @ableegoldman have a different opinion? Also, I looked at the ChangeLoggingXXXStores and in those implementations, it seems to be delegating to the underneath wrapped() store so I think we might be good there.
While InMemoryWindowStore does enable strict retention but when combined with the caching store, it breaks. That's where doing it on MeteredStore might be beneficial.
I see that, and personally I think this is a bug in the caching layer that we should be fixing --- in fact, the cache itself should better be trimmed by retention so that we can improve on our cache hit as well. I think it is fine to not piggy-back that fix in this PR if the scope suddenly become much larger, as I feel that the issue @mjsax raised worth being fixed by its own PR.
I see.. So what you are suggesting is is that as part of this PR, I move all logic of strict enforcement to the innermost layer and fix the caching issue at the caching layer making the Metered layer cleaner. I think it makes sense because I feel, this PR has introduced some complexity which can be taken away. I can start with that unless @mjsax or @ableegoldman have a different opinion? Also, I looked at the ChangeLoggingXXXStores and in those implementations, it seems to be delegating to the underneath wrapped() store so I think we might be good there.
hi.. just wanted to know if i am good to start with moving the logic to the inner stores?
@mjsax @ableegoldman could you chime in here?
@mjsax @ableegoldman could you chime in here?
@mjsax @ableegoldman could you chime in here?
hi.. bumping this thread again..
@mjsax @ableegoldman could you chime in here?
@guozhangwang could we also decide upon the next steps here?