kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

Open vamossagar12 opened this issue 4 years ago • 51 comments

WindowedStore and SessionStore do not implement a strict retention time in general. We should consider to make retention time strict: even if we still have some record in the store (due to the segmented implementation), we might want to filter expired records on-read. This might benefit PAPI users.

This PR, adds the filtering behaviour in the Metered store so that, it gets automatically applied for cases when a custom state store is implemented

vamossagar12 avatar Aug 13 '21 12:08 vamossagar12

@mjsax , @ableegoldman plz review when you get the chance.

vamossagar12 avatar Aug 13 '21 12:08 vamossagar12

Ping @cadonna since he's got a good eye for this part of the code. Also @wcarlson5 @lct45 @showuon for help with reviews

ableegoldman avatar Aug 28 '21 01:08 ableegoldman

@vamossagar12 , thanks for the PR. Have a quick look, and left some comments. But most importantly, the test coverage is not enough. I saw you changed RocksDBWindowStore, RocksDBSessionStore, but there are no tests for them. Also, no integration tests for this change. I expected we should have a WindowStore test and a SessionStore test to see if we can actually remove the records after retention time. What do you think?

Thank you.

@showuon , thank you. The changes in RocksDBWindowStore and RocksDBSessionStore are minimal(only changing the parent class). The real filtering logic is in Metered classes and that's where I have added all the tests by mocking the above 2 classes and so I thought we won't need unit tests for RocksDBWindowStore and RocksDBSessionStore. WDYT? +1 for integration test though. Would add.

vamossagar12 avatar Aug 28 '21 17:08 vamossagar12

so I thought we won't need unit tests for RocksDBWindowStore and RocksDBSessionStore. WDYT?

Make sense to me.

showuon avatar Aug 28 '21 23:08 showuon

@showuon , is there an existing integration test for Metered classes? I tried to find one to add the relevant tests but couldnt't find one..

vamossagar12 avatar Aug 30 '21 11:08 vamossagar12

@showuon , is there an existing integration test for Metered classes? I tried to find one to add the relevant tests but couldnt't find one..

Sorry for late reply. Metered store classes are for recording operation metrics, so you should have it when using any built-in state stores, ex: Stores.windowStoreBuilder will have MeteredWindowStore. Thanks.

showuon avatar Sep 02 '21 03:09 showuon

@showuon , is there an existing integration test for Metered classes? I tried to find one to add the relevant tests but couldnt't find one..

Sorry for late reply. Metered store classes are for recording operation metrics, so you should have it when using any built-in state stores, ex: Stores.windowStoreBuilder will have MeteredWindowStore. Thanks.

Thanks @showuon , I have found a bug with my. implementation. Will correct it and also add integration tests and then it could be reviewed.

vamossagar12 avatar Sep 02 '21 13:09 vamossagar12

@showuon , I have cleaned up the PR and added the integration tests. Plz review whenever you get the chance.

vamossagar12 avatar Sep 26 '21 06:09 vamossagar12

@showuon , plz review whenever you get the chance.

vamossagar12 avatar Sep 29 '21 17:09 vamossagar12

Thanks @showuon , made the suggested changes

vamossagar12 avatar Oct 08 '21 18:10 vamossagar12

@showuon sorry to bother you again on this one, but plz review whenever you get the chance..

vamossagar12 avatar Oct 15 '21 08:10 vamossagar12

  1. That's correct @showuon, the wrapped().persistent() may not apply to custom in-memory stores. The only reason that check's there is that the InMemoryStateStore already does filtering, so I didn't want to include that as part of this change.
  2. From my knowledge, any new public interface addition needs a KIP. Since this is going to be an internal interface, I didn't think we need a KIP. Infact, I think, if I were to add it to WindowStore or SessionStore then I believe we would need a KIP and I didn't do it for that reason. What do you think? Maybe @ableegoldman can also chime in on this?

vamossagar12 avatar Oct 18 '21 09:10 vamossagar12

Yeah, any change to the public interface is going to require a KIP. And tbh the state store interface is already messy enough, I think we should try to avoid adding anything on that requires a KIP for this sort of thing, if at all possible

ableegoldman avatar Oct 18 '21 20:10 ableegoldman

Thanks Sophie for that. @showuon , I think your concern about persistent stores and custom stores is still valid. The reason why I don't want to add the getObservedStreamTime method in SessionStore or WindowStore is that I feel it's not something which should be added to those interfaces. It's an internal level detail about how to track observed stream time.

Having said that, if we want this behaviour to be also available for custom stores(and which is why we chose to add it to MeteredStores), then those custom stores need to implement PersistentStore classes which is not the way it works today, right?

One approach could be to think about custom state stores separately and have this merged is this looks fine. That's because I think custom state stores will need more thinking because of the way the State stores are structured or wrapped. And also, today the Persistent StateStores provided by Kafka Streams don't enforce retention time which IMO is a bigger problem than Custom State Stores. That's my personal opinion and I am open for suggestions. Let me know if that makes sense.

vamossagar12 avatar Oct 19 '21 16:10 vamossagar12

@vamossagar12 , I'm fine that we can merge the Persistent StateStores improvement first, and think about custom state store later. But I'm not the one have the right to merge your code, so may be you should confirm with @ableegoldman or other committers. :)

showuon avatar Oct 20 '21 06:10 showuon

Haha thanks @showuon :) @ableegoldman , need your input again on this one :D Do you agree with going ahead with merging this PR for Persistent stores and then having a separate ticket for Custom State stores? I can take that one up as well.

vamossagar12 avatar Oct 20 '21 07:10 vamossagar12

@vvcephei @cadonna @mjsax @guozhangwang anyone have any thoughts on this?

ableegoldman avatar Oct 21 '21 03:10 ableegoldman

Hi @vamossagar12 @ableegoldman I took a quick look at the PR and I have a few questions:

  • The fix itself seems to be applied only to persistent stores (built-in or customized), and not in-memory, assuming the latter has done its duties. But for customized in-memory stores it may not be the case. Is that right?
  • I'm a it concerned about applying the filter logic at the metered store, for its complexity introduced. Instead, I'm wondering if we could enforce the retention period restriction into the inner store, i.e. 1) the built-in stores, at the segmented store layer would enforce this rule based on its observed stream time, and 2) we just assume customized stores themselves would enforce this rule, and hence do not have the second guard at the outer meteredStore layer.

guozhangwang avatar Oct 21 '21 17:10 guozhangwang

@guozhangwang

  1. The in memory default state stores already does the filtering. Original thought was to make the Persistent Store be able to also enforce the same retention and hence the checks everywhere that you see.
  2. The idea of adding in MeteredStore was that the enforcement would be applicable to any custom store that's implemented. But turns out, it isn't that simple( I realised it after so many code changes lol). For persistent stores, we could push it down to the inner segmented stores(similar to how the in-memory stores do it today). If we can assume that custom state stores would take care of it, then no point having it in Metered store which goes against the original idea but should be ok IMO.

vamossagar12 avatar Oct 21 '21 17:10 vamossagar12

Did not look into this PR. Left the following comment on the JIRA:

I just got a report, that InMemoryWindowStore does not enforce strict retention for IQ if caching is enabled. We should add corresponding tests before we close this ticket, to ensure that caching does not break strict retention time.

Can we address this in this PR? Based on the PR description we move the check into the metered store, and thus it should fix IQ (and thus the caching issue)?

mjsax avatar Oct 25 '21 16:10 mjsax

Did not look into this PR. Left the following comment on the JIRA:

I just got a report, that InMemoryWindowStore does not enforce strict retention for IQ if caching is enabled. We should add corresponding tests before we close this ticket, to ensure that caching does not break strict retention time.

Can we address this in this PR? Based on the PR description we move the check into the metered store, and thus it should fix IQ (and thus the caching issue)?

Sure @mjsax , I would include it as part of the PR. I think I am already handling it for Persistent stores so I can extend it. Having said that, there have been discussions on this PR that would be worthwhile to move the enforcement down to the individual persistent stores as this implementation of mine seems to have added some complexity. Would like to know your thoughts on this whenever you get a chance to look at the PR.

vamossagar12 avatar Oct 25 '21 16:10 vamossagar12

@vvcephei @cadonna @mjsax @guozhangwang anyone have any thoughts on this?

hi all... it would be great if we can have some decision on this one. Whenever you get the chance, plz weigh in your thoughts.

vamossagar12 avatar Oct 29 '21 12:10 vamossagar12

Personally I'd suggest we enforce the rentention at the inner store, which means:

  1. for built-in in-memory stores, enforce at the InMemoryXXStore level.
  2. for built-in persistent stores, enforce at the SegmentStore level (which is used by all window stores).
  3. for user customized stores, they should enforced the retention themselves: this is via WindowStoreSupplier#retentionPeriod call which is accessible to users.

The rationale of doing that is:

  • enforcing at the highest layer means that lower layers would still return more data than necessary which then need to be filtered at a per-record basis, which is less efficient.
  • when more records are returned from the inner layer, it would also impact our intermediate layers such as polluting the caching layer.

guozhangwang avatar Nov 02 '21 00:11 guozhangwang

Personally I'd suggest we enforce the rentention at the inner store, which means:

  1. for built-in in-memory stores, enforce at the InMemoryXXStore level.
  2. for built-in persistent stores, enforce at the SegmentStore level (which is used by all window stores).
  3. for user customized stores, they should enforced the retention themselves: this is via WindowStoreSupplier#retentionPeriod call which is accessible to users.

The rationale of doing that is:

  • enforcing at the highest layer means that lower layers would still return more data than necessary which then need to be filtered at a per-record basis, which is less efficient.
  • when more records are returned from the inner layer, it would also impact our intermediate layers such as polluting the caching layer.

Thanks @guozhangwang for the response. While I agree with doing this enforcement at the inner most layer due to all the reasons mentioned, one of the problems highlighted on this PR by Mathias is that InMemoryWindowStore does not enforce strict retention for IQ if caching is enabled. While InMemoryWindowStore does enable strict retention but when combined with the caching store, it breaks. That's where doing it on MeteredStore might be beneficial.

vamossagar12 avatar Nov 02 '21 16:11 vamossagar12

While InMemoryWindowStore does enable strict retention but when combined with the caching store, it breaks. That's where doing it on MeteredStore might be beneficial.

I see that, and personally I think this is a bug in the caching layer that we should be fixing --- in fact, the cache itself should better be trimmed by retention so that we can improve on our cache hit as well. I think it is fine to not piggy-back that fix in this PR if the scope suddenly become much larger, as I feel that the issue @mjsax raised worth being fixed by its own PR.

guozhangwang avatar Nov 03 '21 18:11 guozhangwang

While InMemoryWindowStore does enable strict retention but when combined with the caching store, it breaks. That's where doing it on MeteredStore might be beneficial.

I see that, and personally I think this is a bug in the caching layer that we should be fixing --- in fact, the cache itself should better be trimmed by retention so that we can improve on our cache hit as well. I think it is fine to not piggy-back that fix in this PR if the scope suddenly become much larger, as I feel that the issue @mjsax raised worth being fixed by its own PR.

I see.. So what you are suggesting is is that as part of this PR, I move all logic of strict enforcement to the innermost layer and fix the caching issue at the caching layer making the Metered layer cleaner. I think it makes sense because I feel, this PR has introduced some complexity which can be taken away. I can start with that unless @mjsax or @ableegoldman have a different opinion? Also, I looked at the ChangeLoggingXXXStores and in those implementations, it seems to be delegating to the underneath wrapped() store so I think we might be good there.

vamossagar12 avatar Nov 04 '21 04:11 vamossagar12

While InMemoryWindowStore does enable strict retention but when combined with the caching store, it breaks. That's where doing it on MeteredStore might be beneficial.

I see that, and personally I think this is a bug in the caching layer that we should be fixing --- in fact, the cache itself should better be trimmed by retention so that we can improve on our cache hit as well. I think it is fine to not piggy-back that fix in this PR if the scope suddenly become much larger, as I feel that the issue @mjsax raised worth being fixed by its own PR.

I see.. So what you are suggesting is is that as part of this PR, I move all logic of strict enforcement to the innermost layer and fix the caching issue at the caching layer making the Metered layer cleaner. I think it makes sense because I feel, this PR has introduced some complexity which can be taken away. I can start with that unless @mjsax or @ableegoldman have a different opinion? Also, I looked at the ChangeLoggingXXXStores and in those implementations, it seems to be delegating to the underneath wrapped() store so I think we might be good there.

hi.. just wanted to know if i am good to start with moving the logic to the inner stores?

vamossagar12 avatar Nov 09 '21 17:11 vamossagar12

@mjsax @ableegoldman could you chime in here?

guozhangwang avatar Nov 17 '21 00:11 guozhangwang

@mjsax @ableegoldman could you chime in here?

@mjsax @ableegoldman could you chime in here?

hi.. bumping this thread again..

vamossagar12 avatar Dec 01 '21 16:12 vamossagar12

@mjsax @ableegoldman could you chime in here?

@guozhangwang could we also decide upon the next steps here?

vamossagar12 avatar Dec 12 '21 18:12 vamossagar12