memphis icon indicating copy to clipboard operation
memphis copied to clipboard

Feature: Log compaction

Open yanivbh1 opened this issue 2 years ago • 8 comments

yanivbh1 avatar Apr 04 '23 11:04 yanivbh1

Idea of the compaction is to have a reduce logic to reduce the messages after the retention period (or a seperately configurable period)

Example:

A Station is consuming messages containing data from different logical entities, e.g. employees. Now we are starting with a new service that is interessted in employee data.

At start up , the service consumes all messages of the station, ending up with the latest state of each employee.

Option 1: Unlimited retention time (Event sourcing) Problem: We don't want to keep all messages forever, e.g. due to storage. Also it might be to resource intensive to consume all messages. For the latter, we can reduce the workload by setting an offset, i.e. starting to consume after this offset. However, we might not get any information about employees produced before.

Option 2: Compaction logic Solution: Have a compaction (reduce) logic that reduces messages for a specific employee, keeping only the latest state. This would need an custom unique identifier that is settable by the producer. In this case we might set the uuid of the employee. After the given period, all messages having this identifier, e.g. in the header, are dropped and only the latest is kept.

Note: Setting an empty payload in a message, i.e. null would also make sure that all data of this employee is permanently deleted (similar to kafka tombstones).

faweis avatar Apr 05 '23 08:04 faweis

Let's try to tackle this one together. The way we planned to implement it is to use an already existing functionality. Since we forked NATS, we can use all kinds of NATS features to enforce our vision. Hence we can use something called per-subject-retention set to 1. That means that we can catalog the messages based on some id, set in produce time, and every time a message gets published with an existing id, the old message will be replaced by the new one. What do you think about it?

idanasulin2706 avatar Apr 09 '23 18:04 idanasulin2706

Hey guys, can I work on this if you don't mind :)

abhirajranjan avatar May 13 '23 02:05 abhirajranjan

@abhirajranjan sure go for it

idanasulin2706 avatar May 13 '23 03:05 idanasulin2706

Hey @abhirajranjan , how are you? Have you got the chance to work on it?

yanivbh1 avatar Jun 29 '23 12:06 yanivbh1

From what i have understood from above, there should be a array that will save id of messages and when a new message with same Id comes in we check in the array. If ID matches then we replace that message in our store, right? I would like to give this a change but i don't know what and in which files i have to make changes, but if my guess is correct it must be in server/storage.go or server/memstore.go

Exar04 avatar Jan 13 '24 19:01 Exar04

@Exar04 thanks for the willing to help. It is more complex than that since we are a distributed data product, meaning that the data persists in at least 1 disk in any given time. In cluster mode it is obviously persist on more than 1 disks. If you will take a look on a suggestion I commented in this issue a few months ago, this can be a better approach to tackle this one

idanasulin2706 avatar Jan 14 '24 12:01 idanasulin2706

Yeah, what i understood from that was we can use NATS functions to add this feature, but i don't know much about it. so can you refer me to few resources of NATS which talk about this? maybe then I could get better understanding of this

Exar04 avatar Jan 17 '24 11:01 Exar04