Airstream icon indicating copy to clipboard operation
Airstream copied to clipboard

new operator:take first n element of an eventstream

Open doofin opened this issue 5 years ago • 3 comments

I find some times we need to take first n element of an Eventstream ,something like that :https://www.learnrxjs.io/learn-rxjs/operators/filtering/take

doofin avatar Jun 11 '20 18:06 doofin

 implicit class EventStreamExt[A](private val aStream: EventStream[A]) extends AnyVal {
    def take(n: Int): EventStream[A] = {
      var seen = 0
      aStream.collect {
        case event if seen < n =>
          seen += 1
          event
      }
    }
}

vic avatar Jun 11 '20 18:06 vic

thanks @vic ,currently I guess something like that would also work:

a combineWith EventStream.fromSeq(0 to n) filter (x => x._2 < n) map (_._1)

doofin avatar Jun 11 '20 18:06 doofin

I will implement this eventually.

@vic's version works just fine in 99% cases but if we were to add a take operator to Airstream it would need to account for stream lifecycle, i.e. it should unsubscribe from parent observable when reaching the limit. Also need to decide whether the take count should reset or not if the stream stops (loses all observers), and also what if anything to do about Signals.

raquo avatar Jun 14 '20 20:06 raquo