emma icon indicating copy to clipboard operation
emma copied to clipboard

Cache-Call Insertion breaks Flink iterations

Open aalexandrov opened this issue 7 years ago • 1 comments

With the current setup, if a workset collection is accessed more than twice, it will still be cached by the cache call insertion optimization.

Example

for (_ <- 0 until 5) {
  val cands = for {
    x <- comps
    e <- edges
    y <- comps
    if x.id == e.src
    if y.id == e.dst
  } yield LVertex(y.id, Math.min(x.label, y.label))

  comps = for {
    Group(id, cs) <- cands.groupBy(_.id)
  } yield LVertex(id, cs.map(_.label).min)
}

This is translated to (approximately)

FlinkNtv.iterate(comps)(comps => {
  val cands = for {
    x <- comps
    e <- edges
    y <- comps
    if x.id == e.src
    if y.id == e.dst
  } yield LVertex(y.id, Math.min(x.label, y.label))

  for {
    Group(id, cs) <- cands.groupBy(_.id)
  } yield LVertex(id, cs.map(_.label).min)
})

In the rewritten version, the lambda passed to FlinkNtv.iterate has a parameter comps which is accessed twice the lambda body. Because of that, a subsequent addCacheCalls transformation will insert a FlinkOps.cache(comps) call at the beginning of the lambda body.

Suggested Solution

As a first approximation, we should exclude parameter caching for lambdas passed to a FlinkNtv.iterate operator.

aalexandrov avatar Feb 06 '18 17:02 aalexandrov

PS. The same applied for edges -- usages in the iterate lambda should not be considered as part of the caching criteria.

aalexandrov avatar Feb 06 '18 23:02 aalexandrov