RxGo
RxGo copied to clipboard
How to implement multiple subscribers dynamically?
Hello, I want to use RxGo to batch an expensive operation. I did similar stuff before using Multicast Sink from Reactor. I prepared a very simplified version of how I did. Map section defines the costly function.
https://github.com/ormanli/rxgo-batching/blob/2c542f4a31be482b68bb74d035ef12a066857f15/main.go#L45-L119
type record struct {
ID uint64 `json:"ID"`
Name string `json:"Name"`
}
type recordSink struct {
recordMap map[uint64]record
input chan rxgo.Item
pipeline rxgo.Observable
counter uint64
mu sync.RWMutex
}
func (s *recordSink) add(r record) (record, error) {
id := atomic.AddUint64(&s.counter, 1)
go func(id uint64, r record) {
s.input <- rxgo.Of(record{
ID: id,
Name: r.Name,
})
}(id, r)
item, err := s.pipeline.
Find(func(i interface{}) bool {
r := i.(record)
return r.ID == id
}).Get()
if err != nil {
return record{}, err
}
return item.V.(record), nil
}
func (s *recordSink) getAll() []record {
s.mu.RLock()
defer s.mu.RUnlock()
records := make([]record, 0, len(s.recordMap))
for _, v := range s.recordMap {
records = append(records, v)
}
return records
}
func newRecordSink() *recordSink {
s := &recordSink{
input: make(chan rxgo.Item),
recordMap: make(map[uint64]record),
}
s.pipeline = rxgo.FromChannel(s.input).
BufferWithTimeOrCount(rxgo.WithDuration(time.Millisecond*10), 10).
Map(func(_ context.Context, i interface{}) (interface{}, error) {
records := i.([]interface{})
s.mu.Lock()
defer s.mu.Unlock()
for k := range records {
r := records[k].(record)
s.recordMap[r.ID] = r
}
return records, nil
}).
FlatMap(func(item rxgo.Item) rxgo.Observable {
return rxgo.Just(item.V.([]interface{}))()
})
return s
}
When I add a single record, it works without a problem. But if I add multiple records, it only processes one record and stops processing other items. As I understand from documentation, my code can only have one subscriber. Is there any way to have multiple subscribers added and removed dynamically?
There are some test cases I added to verify my code isn't working. https://github.com/ormanli/rxgo-batching/blob/2c542f4a31be482b68bb74d035ef12a066857f15/sink_test.go#L11-L90
You can have multiple subscribers per observable. See .DoOnNext(...)
Not sure, what you want to do, but your code looks very complicated for doing batch processing. Why not use a simple working queue and have a timer look for new tasks?
timer := rxgo.Interval(rxgo.WithDuration(time.Second / 60)) timerDisposal := timer.DoOnNext(workHandler)
I prefer to keep things as simple as possible.
Hey @ormanli,
It's because there's a race. I'm not exactly sure about what you're trying to achieve but when first
is the first item emitted to the input
channel, it works. If second
is first, it doesn't work as first
will be consumed by Find
and you're having a goroutine stuck here.