pipelines
pipelines copied to clipboard
Concurrent processing pipelines in Go.
splunk/pipelines
Manages concurrency using Go + Generics. Separate what your code does from how it runs on threads.
Install it locally:
go get github.com/splunk/pipelines
Overview
Pipelines provides a set of primitives for managing concurrent pipelines.
A pipeline is a set of processing stages connected by channels.
Each stage is run on one or more goroutines.
pipelines manages starting, stopping, and scaling up each pipeline stage for you, allowing you to keep concurrency concerns away from your business logic.
Stages
Each pipeline stage is provided as a top-level func which accepts one or more inputs channels
and return one or more output channels.
Map: converts achan Sto achan T, by converting eachSto exactly oneT.OptionMap: converts achan Sto achan T, by converting eachSto zero or oneTs.FlatMap: converts achan Sto achan T, by converting eachSto zero or moreTs.Combine: combines twochan Tinto a singlechan T.Flatten: converts achan []Tto achan T.Tee: converts achan Tinto twochan T, each of which receive exactly the same values.ForkMapCtx: converts achan Sinto achan T, by converting eachSto zero or moreTs. UnlikeFlatMap, a new goroutine is started to convert each value ofS.
Any stage which converts a chan S into a chan T requires that the caller pass a conversion func that knows how to
turn S into T.
Map, FlatMap, and OptionMap, each have variants MapCtx, FlatMapCtx, and OptionMapCtx, which allow the caller
to pass a conversion func which accepts a context.Context as its first argument. This allows a conversion func to
perform I/O safely.
Each stage creates and manages the closure of its output channel, and listens for shutdown via context.Context.
Combining Stages
Stages are meant to be combined by passing output channels into input channels using the pattern shown in the toy example below:
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
input := pipelines.Chan([]int{1, 3, 5})
expanded := pipelines.FlatMap(ctx, input, withAdjacent) // (x) => [x, x+1]: yields [1,2,3,4,5,6]
doubled := pipelines.Map(ctx, expanded, double) // (x) => x*2: yields [2,4,6,8,10,12]
exclaimed := pipelines.Map(ctx, doubled, exclaim) // (x) => "${x}!": yields [2!,4!,6!,8!,10!,12!]
result, err := pipelines.Reduce(ctx, exclaimed, func(prefix string, str string) string {
return prefix + " " + str
})
if err != nil {
fmt.Println("context was cancelled!")
}
fmt.Print(result)
// Output: 2! 4! 6! 8! 10! 12!
}
func withAdjacent(x int) []int { return []int{x, x+1} }
func double(x int) int { return x*2 }
func exclaim(x int) string { return fmt.Sprintf("%d!", x)}
In real-world applications, the functions used to convert values flowing through the pipeline can be much more complex.
The pipelines package provides a way to separate what each stage of the pipeline is doing from the code used to make it concurrent.
Configuring Stages
Each pipeline stage can be configured with a set of powerful options which modifies the concurrent behavior of the pipelines.
WithBuffer(n int): buffers the output channel with sizen. Output channels are unbuffered by default.WithPool(n int): runs the pipeline stage on a worker pool of sizen.
A few options are provided for listening to when a pipeline has halted.
WithDone(context.Context): configures the stage to cancel a derived context when the stage has stopped. Can be used to signal when ANY stage in a pipeline has been stopped.WithWaitGroup(sync.WaitGroup): configures the stage to use the provided WaitGroup to signal when all goroutines started in the stage have stopped. Can be used to signal when ALL stages in a pipeline have been stopped.
Sinks and Helpers
A sink serves as the last stage of a processing pipeline. All sinks are implemented as blocking calls which don't start any new goroutines.
Drain: converts achan Tto a[]T.Reduce: converts achan Sto aT, by combining multiple values ofSinto one value ofT.
The following helpers are included to make conversion from standard to channels simpler.
Chan: converts any[]Tto achan T.
Error Handling
Fatal and non-fatal errors that occur during a pipeline can be reported via an ErrorSink.
To ensure fatal errors shut down pipeline stages, NewErrorSink wraps and returns a context which is cancelled
whenever a fatal error is reported.
Errors can be reported by calling ErrorSink.All(), which reports all errors in flight.
See the example in the documentation for usage.