gtrs
gtrs copied to clipboard
Using bloom filter before adding
I have a use case where it would be really helpful to have the option to skip adding items to the stream if they already exist, using a Bloom filter. I have included a Lua script that demonstrates how this can be done, as well as an example of how it can be used in Go code.
In my implementation, I have a Add method that takes in a field and a value to be added to a stream, as well as a flag for whether the item should be checked for uniqueness. If the uniq flag is set to true, the method calls a Lua script to add the item to the stream and check for uniqueness in a Bloom filter. If the item already exists in the Bloom filter, it is not added to the stream.
If the uniq flag is set to false, the item is added to the stream without checking for uniqueness.
To handle the case where an item is not added to the stream because it already exists in the Bloom filter, I have a second stream called dupeKey that stores potential duplicates. I then have a separate process that consumes this stream and checks if the items stored in it are actually duplicates or not.
I was writing a similar package when I discovered gtrs and had already implemented this:
var xaddUnique = rueidis.NewLuaScript(LUA_XADD_UNIQ)
// Add adds a new entry to the stream.
// If uniq is true, it only adds the entry if it doesn't already exist in the stream.
// If uniq is false, it adds the entry regardless of whether it already exists or not.
func (s *stream) Add(ctx context.Context, field, value string, uniq bool) (bool, error) {
key := s.key
// Call the xaddUnique Lua script to add the field and value to the stream
// and check for uniqueness in the bloom filter.
if uniq {
var err error
uniq, err = xaddUnique.Exec(
ctx,
s.client,
[]string{key, s.bloomKey},
[]string{field, value},
).AsBool()
if err != nil || uniq || s.dupeKey == "" {
return uniq, err
}
// Use the dupeKey and fallthrough
// the condition to store the duplicate value.
key = s.dupeKey
}
// Add the new entry to the stream without checking for uniqueness.
return uniq, s.client.Do(
ctx,
s.client.B().Xadd().Key(key).Id("*").FieldValue().FieldValue(field, value).Build(),
).Error()
}
-- LUA_XADD_UNIQ is a Lua script that adds a field and value to a Redis stream, and
-- adds an item to a Redis Bloom filter to track the field and value. It takes in two
-- keys, representing the stream and the Bloom filter, and two arguments, representing
-- the field and value to be added.
-- The script first constructs an item by concatenating the stream key, the field,
-- and the value. It then checks if the item already exists in the Bloom filter using
-- the BF.EXISTS command. If the item exists, the script returns false without adding
-- the item to the stream or the Bloom filter.
-- If the item does not exist in the Bloom filter, the script adds the item to the
-- stream using the XADD command, and then adds the item to the Bloom filter using the
-- BF.ADD command. It then returns true to indicate that the item was successfully
-- added to the stream and the Bloom filter.
local key = KEYS[1]
local bloom = KEYS[2]
local field = ARGV[1]
local value = ARGV[2]
local item = key .. ":" .. field .. ":" .. value
-- Check if the item already exists in the bloom filter
local success, result = pcall(redis.call, "BF.EXISTS", bloom, item)
if not success then
return result.err
elseif result == 1 then
return false
end
-- Add to the stream
local success, result = pcall(redis.call, "XADD", key, "*", field, value)
if not success then
return result.err
end
-- Add to the bloom filter
local success, result = pcall(redis.call, "BF.ADD", bloom, item)
if not success then
return result.err
end
return true
Is it okay if I adapt the code and propose a pr?
Hi! First of all, happy new year! 🎆
Its a very cool feature. If you want to contribute to this project, which I happily would accept, you should generalize it a little. I would suggest:
-
Adding a new type - UniqueStream and shadow the general stream by either making a common interface (and letting Add return "" as id?) or just embedding it (keeping name?) or adding a separare function AddUnique. This is because I find the unique parameter is a bit odd and over speicified in the Add function - it should stay simple
-
Instead of adding the value to another stream immediately, just returning a bool (or "" or error) where it was added - this allows freely extending the fallback with custom code.
sDups := NewStream("dups")
sUnique = NewUniqueStream("unqis") // Or just use basic Stream and AddUnique
if !sUnique.Add(data) { // or != "" or some duplicate error senitnel
sDups.Add(data) // but it can be any other action
}
i would be very interested in the conversion of Stream from struct -> interface and also creating a HookStream
is this something you are still interested in and would accept ?
@elee1766 Can you clarify a little... 'HookStream'? But generally, if it's re-usable and helpful I see now problems in adding new features 🙂
@elee1766 Can you clarify a little... 'HookStream'? But generally, if it's re-usable and helpful I see now problems in adding new features 🙂
this "bloom filter" can be generified to a stream with hooks.
For instance, see function Add(ctx context.Context, v T, idarg ...string) (string, error)
we can look at it this way:
type AddFunc[T any] = func(ctx context.Context, v T, idarg ...string) (string, error)
type AddHook[T any] = func(next AddFunc[T], ctx context.Context, v T, idarg ...string) (string, error)
type HookStream[T any] struct {
Stream[T]
OnAdd AddHook[T]
}
// honor the interface
func (h *HookStream) Add(ctx context.Context, v T, idarg ...string) (string, error) {
if h.OnAdd == nil {
return h.Stream.Add(ctx, v, idarg...)
}
return h.OnAdd(h.Stream.Add, ctx, v, idarg...)
}
which could be used for instance like:
var myStream = func() *Stream[any]
var addToBloom = func(v any, idarg []string) error
var checkInBloom = func(v any, idarg []string) (bool, error)
var ErrAlreadyInserted = errors.New("already inserted")
func main() {
s := myStream()
h := &HookStream{
Stream: s,
OnAdd: func(next AddFunc[any], ctx context.Context, v T, idarg ...string) (string, error) {
exists, err := checkInBloom(v, idarg)
if err != nil {
return err
}
if exists {
return nil, ErrAlreadyInserted // or maybe just return "", nil
}
ans, err := next(ctx, v, idarg)
if err != nil {
return err
}
err := addToBloom(v, idarg)
if err != nil {
return err
}
return ans, nil
},
}
}
there are many ways to package the actual hooks (passing functions, passing an interface, etc).
fwiw - my use case for this would be for metrics.
I see 🙂 Reminds me a little of middleware, it has the same layered next() calls...
I'd honestly say this is somewhat over-engineered for a simple library 😅 , you can always manually wrap the stream in your client code to customize behavior like logging/metrics/etc. I even assume most people would prefer doing this themselves instead of using one more abstraction
However if we include different implemented hooks (like bloom) I'd agree that it would make sense to add this wrappable stream
I see 🙂 Reminds me a little of middleware, it has the same layered next() calls...
I'd honestly say this is somewhat over-engineered for a simple library 😅 , you can always manually wrap the stream in your client code to customize behavior like logging/metrics/etc. I even assume most people would prefer doing this themselves instead of using one more abstraction
However if we include different implemented hooks (like bloom) I'd agree that it would make sense to add this wrappable stream
yeah. middleware is right.
I think it makes some sense still to make stream an interface - so that downstream consumers don't need to declare their own
I think it makes some sense still to make stream an interface
True! And then we can maybe even hide the internal type...
I know you like coding 🙂 Wanna give this a quick shot?
I think it makes some sense still to make stream an interface
True! And then we can maybe even hide the internal type...
I know you like coding 🙂 Wanna give this a quick shot?
not sure if i like coding but i'll give it a shot :)