scala.rx icon indicating copy to clipboard operation
scala.rx copied to clipboard

rx subscriptions for collections

Open antonkulaga opened this issue 8 years ago • 1 comments

In my libraries (like scala-js-binding ) I often nead to subscribe to Rx-ses of collections of items and also react to the changes inside of items themselves. So, I really, really, need a nice memory-leack-safe way to subscribe both to collection of rx-es and to each Var[Item] separately. I have something like:

val items: Rx[List[Var[Item]]  = Rx{/*some code*/}
val views = Rx{ items() /*some other code*/

So, when new items are added to the collection , HTML view are created and binded to the item. Whenever an item is deleted, both view and its subscription to the item should also be deleted

antonkulaga avatar Jan 10 '16 01:01 antonkulaga

So I think 0.3.0 solves this, or at least gets much closer to a pretty good solution "out of the box".

The main "feature" of 0.3 is the introduction of RxCtx. RxCtx is an explicit mechanism to deal with the leaky Rx problem. While 0.3 doesn't have an explicit RxList or, really, any kind of data structures other than the trio of Rx / Var / Obs, RxCtx is another tool that can be used to get more precise control over the behavior of the resulting code.

By way of your example, lets take the inner most thing in items: the Var[Item], given your description of the problem, it sounds like you want to do something like the following:

Var[Item] => Rx[HtmlTag]

And the whole program would ideally look something like:

Rx[List[Var[Item]]]  =>  Rx[List[Rx[HtmlTag]]] => Rx[HtmlTag]

But in 0.2 there was a fundamental problem with the first step, in that you can't really write the function you want:

def itemTag(item: Var[Item]) => Rx[HtmlTag]

On its own, that function leaks - and its not just a memory leak, but worse than that it leaks an active Rx. Every time that def is evaluated, another new Rx is created, the old Rx is forgotten... but not reclaimed. And the forgotten Rxes all still are updating, burning CPU cycles and its all manner of not great.

Now, it is possible to work around this issue in 0.2. Basically, you have to keep track of the Var[Item]s manually and remember to call .killAll. I think this is the "subscription tracking" you were referring to. And while it is possible, it certainly is not all that pleasant, and its easy to get wrong in ways that are not particularly easy to track down and fix.

In 0.3 though, you can now write this:

def itemTag(item: Var[Item])(implicit ctx: RxCtx) => Rx[HtmlTag]

And with that, comes a whole host of interesting consequences, for example if you write:

val views = Rx { items().map { i => 
  val view = itemTag(i) //views is now the owner of the new Rx[HtmlTag]
  view
}}

Then views owns (by its RxCtx) each view, but none of them are "downstream" of views. This means that any Rxing that occurs inside of view does not propagate back up - you can independently Rx each view without causing a "recalc" of all the views. In this case, views will only recalc if items itself changes, ie something is added or removed from the List.

On the other hand:

val views = Rx { items().map { i => 
  val view = itemTag(i) //views is now the owner of the new Rx[HtmlTag]
  view() //And it now also has a dependency on the Rx[HtmlTag]
  view
}}

Then any time any view changes (or if items change), all of views will recalc (as well as anything that depends on views). Depending on the desired behavior, this might be preferable.

However, in both cases when "views" does recalc, its RxCtx keeps track of all the owned Rxes, which in both cases is every Rx generated by itemTag. As part of the normal Rx recalc, all owned Rxes of the updating Rx are killed (and are recreated by virtue of the owner going through a recalc) This is precisely when a leak would have occured.

So, by playing with ownership and dependencies a lot of different kinds of behaviors can be created.

I should also mention that in 0.3, the RxCtx actually has associated compile time checks that help prevent mistakes, for example writing:

def itemTag(item: Var[Item]) = Rx { ..my code.. }

Is actually a compile time error!

Its now not possible to forget the RxCtx and end up with the leaky Rx problem of 0.2, instead

def itemTag(item: Var[Item])(implicit ctx: RxCtx) = Rx { ..my code.. }
//or
def itemTag(item: Var[Item]) = Rx.unsafe { ..my code .. }

Are required - in the latter case, it is equivalent to 0.2 code, which may be necessary in some corner cases.

Voltir avatar Jan 10 '16 12:01 Voltir