Lazify `ap`
ap should evaluate synchronous continuations on transfer.
Desired semantics
When an ap evaluation context resumes, i.e. a ? task terminates or a ?>/?< flow is ready to transfer, the
continuation is evaluated eagerly if and only if it has a branch containing an asynchronous operator (? or ?>).
Otherwise, the process becomes ready to transfer and the continuation is run lazily on transfer.
Motivations
ap's current evaluation rules make some patterns inherently unsafe due to possible data losses.
An example of such a use case is the problem discussed in this slack thread https://clojurians.slack.com/archives/CL85MBPEF/p1713457348286889 - partition an input flow in batches constrained by both a maximum batch size and a maximum delay between the first and the last element of the batch.
The proposed solution, while functional and elegant, has a subtle issue.
(defn batch [max-size max-delay input]
(m/ap (let [[_ input] (m/?> (m/group-by {} input))]
(m/? (m/reduce conj
(m/eduction (take-while (complement #{::timeout})) (take max-size)
(m/ap (m/amb= (m/?> input) (m/? (m/sleep max-delay ::timeout))))))))))
An input value may be skipped when the scheduler thread races with the thread producing this value. The data loss can be observed consistently with this test summing the successive batch sizes :
((m/reduce (fn [n b] (+ n (count b))) 0
(batch 4 15
(m/ap
(let [x (m/?> (m/seed (range 10000)))]
(m/? (m/via m/blk (Thread/sleep (rand 10)) x))))))
prn prn)
;; eventually prints a number inferior to 10000
The data loss occurs in the inner ap, under the following scenario :
- T1 (scheduler thread) resolves the sleep,
apis ready to transfer::timeout, theeductionstage is notified,apis transferred immediately and the transducer pipeline terminates due totake-while. - T2 (producer thread) makes a new value available on the group consumer and wins the race against T1 trying to cancel
ap. The internal output buffer is empty, so the group consumer is transferred immediately,apis now ready to transfer the value and theeductionstage is notified again. - T1 cancels
apand flushes remaining values. The group consumer is now cancelled, but the first value of the next batch has already been transferred to the internal output buffer and will therefore be discarded.
What should have happened instead :
- In step 2,
apshould have simply notified theeductionstage without transferring the value. This is the correct behavior in this case because the result can be computed synchronously. - In step 3, the group consumer is cancelled while the transfer is pending, allowing
group-byto reinject the value on the next group.
Accidental benefits
If ap evaluates ?< lazily then it's strictly more powerful than cp. Therefore, cp can be deprecated.
Chesterton's fence
The current evaluation semantics are mainly a consequence of cloroutine's design. Cloroutine doesn't expose any information about the continuation, so the only possible way to figure out if the final result can be computed synchronously is to actually try to compute it.
Implementation strategy
Unknown.
I was curious if this might be related (or not)...
(defn take-none [f]
(m/eduction (take 0) f))
(defn echo [f]
(m/ap
(let [v (m/?> f)]
(prn 'produce v)
v)))
(m/?
(m/reduce (fn [_ _]) nil
(take-none (echo (m/seed (range))))))
produce 0
produce 1
The echo/seed combination gets two values ahead of the consumer before being cancelled.
Now let's add in another layer:
(defn copy [f]
(m/ap (m/?> f)))
... (take-none (copy (echo (m/seed (range)))))
produce 0
produce 1
produce 2
... (take-none (copy (copy (echo (m/seed (range))))))
produce 0
produce 1
produce 2
produce 3
Each added copy buffers an additional value, so each copy we add causes the producer to get an additional value ahead of the consumer.
Suppose this issue was implemented. In copy, the branch doesn't contain an asynchronous operator (it doesn't do anything but return), so the continuation would be evaluated lazily... and would that mean that copy would no longer buffer an additional value in a pipeline like it does now?
yes, when this issue is solved copy will be semantically equivalent to identity.