emma
emma copied to clipboard
Cache-Call Insertion breaks Flink iterations
With the current setup, if a workset collection is accessed more than twice, it will still be cached by the cache call insertion optimization.
Example
for (_ <- 0 until 5) {
val cands = for {
x <- comps
e <- edges
y <- comps
if x.id == e.src
if y.id == e.dst
} yield LVertex(y.id, Math.min(x.label, y.label))
comps = for {
Group(id, cs) <- cands.groupBy(_.id)
} yield LVertex(id, cs.map(_.label).min)
}
This is translated to (approximately)
FlinkNtv.iterate(comps)(comps => {
val cands = for {
x <- comps
e <- edges
y <- comps
if x.id == e.src
if y.id == e.dst
} yield LVertex(y.id, Math.min(x.label, y.label))
for {
Group(id, cs) <- cands.groupBy(_.id)
} yield LVertex(id, cs.map(_.label).min)
})
In the rewritten version, the lambda passed to FlinkNtv.iterate
has a parameter comps
which is accessed twice the lambda body. Because of that, a subsequent addCacheCalls
transformation will insert a FlinkOps.cache(comps)
call at the beginning of the lambda body.
Suggested Solution
As a first approximation, we should exclude parameter caching for lambdas passed to a FlinkNtv.iterate
operator.
PS. The same applied for edges
-- usages in the iterate
lambda should not be considered as part of the caching criteria.