Version 3
This is a draft PR for the upcoming release version v3, it will break and redesign the whole reactive API using the generics feature offered by Go 1.18. Technically, this PR will resolve issues #375 #250 #343, as well as some missing features such as #362 #347.
⚠️⚠️⚠️ This is still under heavy development, PR is just to keep track of the overall progression.
I'm trying to map the rxjs API design to this lib as much as possible
Alpha Version CHANGELOG :
- [x] remove
vendorpackages and usego.modinstead - [x] utilize generic to implement strong type
- [x] rework Observe API to let user to be able to control the subscription
- [x] set minimum go version to v1.19 (required Generics and sync.Pointer)
- [ ] add test cases for every function
- [ ] update
READMEand API documentation
Beta Version CHANGELOG :
- [ ] add Hot observable such as
BehaviorSubject - [ ] refactor to scheduler, references:
- https://medium.com/swlh/understanding-rxphp-schedulers-part-1-d3c0ab6eb881
- https://medium.com/@jakob.mats/understanding-rxphp-schedulers-part-2-6c4d01d7e65e
Request for comments :
- API look and feel
Proposal for changing observe API:
Current :
for v := range observable.Observe() {}
// OR
obs := observable.Observe()
for {
select {
case item, ok :=<- obs:
}
}
Proposed : this is highly inspired by rxjs API design
type Subscription interface {
Unsubscribe()
}
observable[T].Subscribe(onNext func(T), onError func(error), onCompleted func()) Subscription
The current API is very confusing, users have no way to control the subscription (unsubscribe the stream when needed), and they required to handle the error or data manually. I suggested we move the abstraction into internal function, and expose only the required functions.
The new version of API will look like this. @teivah any comments atm? rxgo.IObservable interface will be renamed as rxgo.Observable in the future. Currently, I try to remove the old codes part by part.
Note: This stream data flow will be processed synchronously, that's why the function named as SubscribeSync
There are significant changes in the API. For example,
Previous
rxgo.Defer([]Producer{func(ctx context.Context, next chan<- Item) {
next <- Of(1)
next <- Of(2)
next <- Error(fmt.Errorf("some error"))
}})
Current
rxgo.Defer(func() rxgo.IObservable[uint] {
return rxgo.Interval(1000)
})
AND creating an observable is as easy as :
newObservable(func(sub Subscriber[int]) {
for i := 0; i < 10; i++ {
Next[int](I).Send(sub)
time.Sleep(time.Second)
}
Complete[int]().Send(sub)
})
I took a look at this PR (massive work!), and I got curious about one thing - do you plan on porting the scheduler concept to rxgo as well? I would mainly find it useful to be able to mock/control time in my tests.
I took a look at this PR (massive work!), and I got curious about one thing - do you plan on porting the scheduler concept to rxgo as well? I would mainly find it useful to be able to mock/control time in my tests.
I will take a look on that after finish the general API, I think should be feasible.
I'm IDLE for some time due to work, will continue on this, so far the basic functionality is OK to go atm.
@si3nloong hello, is this PR still active? I would love to pick it up to get a feel about maintaining this package, but don't want to over step if you have timed plan to complete it.
@davidlondono @si3nloong Wanted to bump the thread to see if this PR will ever get picked up again.
I'm interested to know if this pull request is still thing as I am in the process of defining my own V3 which uses generics, not sure about the other issues that have been identfied because I'm not sure if they will affect what I need for my own projects.