feat: add mapWithCompletion and flatMapWithCompletion for safe stream cancellation
This PR addresses the issue where users cannot safely cancel Rx streams from within operator functions without encountering concurrent modification exceptions.
Problem
The original issue was that calling c.cancel from within a map function creates a concurrent modification problem:
var c = Cancelable.empty
c = rx.map(x => if cond then c.cancel) // This throws concurrent modification exception
c.run()
Solution
Added two new methods that allow operators to safely signal completion by returning None:
-
mapWithCompletion[B](f: A => Option[B]): Rx[B] -
flatMapWithCompletion[B](f: A => Option[RxOps[B]]): Rx[B]
These methods follow the same pattern as the existing TakeOp implementation, where returning None triggers an OnCompletion event and stops further processing with RxResult.Stop.
Usage Example
val rx = Rx.sequence(1, 2, 3, 4, 5)
// Safe completion signaling
rx.mapWithCompletion { x =>
if (x == 3) {
None // Complete the stream safely
} else {
Some(x * 2) // Continue processing
}
}.run()
// Works with flatMap too
rx.flatMapWithCompletion { x =>
if (x == 3) {
None // Complete the stream safely
} else {
Some(Rx.single(x * 10)) // Continue with new stream
}
}.run()
Implementation Details
- Added new case classes
MapWithCompletionOpandFlatMapWithCompletionOpto represent these operators - Implemented handling in
RxRunnerthat properly emitsOnCompletionevents whenNoneis returned - Maintains full backwards compatibility - existing
mapandflatMapmethods are unchanged - Follows existing code patterns and conventions in the codebase
Testing
- Added comprehensive test suite covering normal operation, early completion, empty sequences, and error propagation
- All existing tests continue to pass, ensuring no regressions
- Code formatted with
scalafmtAllto follow project conventions
Fixes #3689.
💡 You can make Copilot smarter by setting up custom instructions, customizing its development environment and configuring Model Context Protocol (MCP) servers. Learn more Copilot coding agent tips in the docs.