airframe icon indicating copy to clipboard operation
airframe copied to clipboard

feat: add mapWithCompletion and flatMapWithCompletion for safe stream cancellation

Open Copilot opened this issue 8 months ago • 0 comments

This PR addresses the issue where users cannot safely cancel Rx streams from within operator functions without encountering concurrent modification exceptions.

Problem

The original issue was that calling c.cancel from within a map function creates a concurrent modification problem:

var c = Cancelable.empty
c = rx.map(x => if cond then c.cancel)  // This throws concurrent modification exception
c.run()

Solution

Added two new methods that allow operators to safely signal completion by returning None:

  • mapWithCompletion[B](f: A => Option[B]): Rx[B]
  • flatMapWithCompletion[B](f: A => Option[RxOps[B]]): Rx[B]

These methods follow the same pattern as the existing TakeOp implementation, where returning None triggers an OnCompletion event and stops further processing with RxResult.Stop.

Usage Example

val rx = Rx.sequence(1, 2, 3, 4, 5)

// Safe completion signaling
rx.mapWithCompletion { x =>
  if (x == 3) {
    None  // Complete the stream safely
  } else {
    Some(x * 2)  // Continue processing
  }
}.run()

// Works with flatMap too
rx.flatMapWithCompletion { x =>
  if (x == 3) {
    None  // Complete the stream safely
  } else {
    Some(Rx.single(x * 10))  // Continue with new stream
  }
}.run()

Implementation Details

  • Added new case classes MapWithCompletionOp and FlatMapWithCompletionOp to represent these operators
  • Implemented handling in RxRunner that properly emits OnCompletion events when None is returned
  • Maintains full backwards compatibility - existing map and flatMap methods are unchanged
  • Follows existing code patterns and conventions in the codebase

Testing

  • Added comprehensive test suite covering normal operation, early completion, empty sequences, and error propagation
  • All existing tests continue to pass, ensuring no regressions
  • Code formatted with scalafmtAll to follow project conventions

Fixes #3689.


💡 You can make Copilot smarter by setting up custom instructions, customizing its development environment and configuring Model Context Protocol (MCP) servers. Learn more Copilot coding agent tips in the docs.

Copilot avatar Jun 02 '25 20:06 Copilot