FSharpx.Async icon indicating copy to clipboard operation
FSharpx.Async copied to clipboard

AsyncPipe?

Open eulerfx opened this issue 10 years ago • 0 comments

Wanted to see if anybody had input on another async-based control abstraction. Source code here. An async pipe is similar to an AsyncSeq but more general. In addition to emitting values, it can also await input. Finally, it can complete with some value, such as an exception. The base types are:

/// An async pipeline which consumes values of type 'i, produces
/// values of type 'o and completes with a result 'a or an error.
type AsyncPipe<'i, 'o, 'a> = Async<AsyncPipeStep<'i, 'o, 'a>>

/// An individual step in an async pipeline.
and AsyncPipeStep<'i, 'o, 'a> =

  /// The pipeline completed with result 'a.
  | Done of 'a

  /// The pipeline is emitting a value of type 'o.
  | Emit of 'o * AsyncPipe<'i, 'o, 'a>

  /// The pipeline is consuming a value of type 'i.
  | Await of ('i option -> AsyncPipe<'i, 'o, 'a>)

An interesting operation on pipes is composition:

val compose (p1:AsyncPipe<'i, 'o, 'a>) (p2:AsyncPipe<'o, 'o2, 'a>) : AsyncPipe<'i, 'o2, 'a>

This operation fuses two pipes together.This can be useful for declaring certain types of async workflows. For example, if you're iterating an AsyncSeq using iterAsync, there is no way to signal a stop to the iteration, but it can be done with pipes using a consuming pipe which stops at some point, causing the entire workflow to stop.

An AsyncSeq is a special case of AsyncPipe in the following way:

type AsyncSeq<'a> = AsyncPipe<Void, 'a, 'unit>

eulerfx avatar Mar 18 '15 17:03 eulerfx