crystal icon indicating copy to clipboard operation
crystal copied to clipboard

[RFC] Structured Concurrency

Open straight-shoota opened this issue 7 years ago • 46 comments

Crystal has an great concurrency model based on Fibers and Channels, which can be used to pass messages around.

Fibers are conceptually pretty simple. If you spawn one, it takes off from the main context and runs concurrently for an indefinite amount of time, depending on what it does and how long it takes to do this. From the perspective of the main control flow, it's essentially fire and forget.

Real life problems typically ask for a more sophisticated way of handling concurrent tasks. Sometimes you need to wait for either one, some or all tasks to be finished before continuing the main scope. Error handling in concurrent tasks is also important and the ability to cancel the remaining tasks if others have finished or errored.

Fibers and Channels can be used to implement a model for structured concurrency. Given that this is a pretty common idiom, I'd like to see a generalized implementation in Crystal's stdlib.

What we have

HTTP::Server#listen uses a custom implementation of a wait group executing a number of tasks simultaneously and waiting for all to finish. Other examples are in the parallel macro or Crystal::Compiler#codegen_many_units. parallel is the only feature of structured concurrency currently available in the stdlib, but it is only suitable for a fixed number of concurrent tasks that are known at compile time.

A more generalized approach would help to make this concept easily re-useable. It can be implemented based on the existing features that Fiber and Channel provide. The only thing that's missing is a way to deliberatly kill fibers and unwrap their stack (see #3561, and a proposed implementation in #6450).

Background

I recommend reading the articles referenced below. They both describe a model of structured concurrency which essentially restricts the execution of concurrent tasks to a specific scope and having tools to manage them. This contrasts with the model of go (Go) and spawn (Crystal) which just fires off a new fiber without caring about it's life cycle. This makes it hard to follow control flow: what happens where and when in which scope.

The main idea of this proposal is to understand that each fiber is limited to the scope it is executed in:

Every time our control splits into multiple concurrent paths, we want to make sure that they join up again.

This ensures that fibers don't get lost doing whatever stuff they might not even be supposed to do anymore. I believe this concept can be applied to almost any real-life use case of fibers. Having a structured flow of control also allows for a proper exception flow. Right now, unhandled exceptions within a fiber are just printed and ignored. When a fiber is scoped to some parent context, an exception can just be propagated there.

Prototype

I have implemented a simple prototype of a concurrency feature (based on Fiber.cancel from #6450). The idea is to have a coordination tool for running fibers, called a Spindle. It is used to spawn fibers and ensure to collect them. This particular implementation allows running multiple tasks concurrently and if one of them fails, it cancels all the others. This is of course just an example of behaviour, there are many different ways to react.

The code can be found at: https://gist.github.com/straight-shoota/4437971943bae7000f03fabf3d814a2f

I don't have a concrete proposal how this should be implemented in terms of stdlib API's but the general idea is to provide tools for running tasks concurrently. We could even think about removing unscoped spawn (it can be considered harmful after all), but that's not necessarily required and can probably be decided upon later.

References

Some examples of similar libraries:

straight-shoota avatar Jul 30 '18 15:07 straight-shoota

I'd rather write a "good" Promise implementation and then have Promise.all/Promise.first etc. As far as i'm concerned the abstraction of a fiber with a return value, called a promise or future, is a better and more composable than waitgroups and can still be used as waitgroups pretty easily.

Talking about abstractions based on cancellation should wait untilwe actually have cancellation.

RX14 avatar Jul 31 '18 00:07 RX14

Promises are more about communication than structuring. You can do a lot of things in a similar way, but it feels like an inferior solution to me.

Running two methods concurrently would look like this with a promise:

promise = future ->do_some_stuff
do_some_other_stuff
promise.get

In my example implementation it looks like this:

concurrent do |spindle|
  spindle.spawn ->do_some_stuff
  spindle.spawn ->do_some_other_stuff
end

It's little bit more verbose, but easier to follow control flow. Especially when it gets more complicated.

(Verbosity could actually be reduced when using with spindle yield, but I think I like the expressiveness better.)

One way or the other: a good promise implementation would need a way to cancel a fiber as well, wouldn't it?

straight-shoota avatar Jul 31 '18 08:07 straight-shoota

@straight-shoota you could do it like that or you could use promises like this:

Promise.all(
  Promise.new ->func,
  Promise.new ->func2
)

which is both less verbose and more powerful since you can do

value1, value2 = Promise.all(
  Promise.new ->func,
  Promise.new ->func2
)

very easily to get the function's return values.

Changing it to "first fiber wins" aka a race becomes s/all/first/, just changing one function instead of having to refactor your code to use a different concurrency mechanism (consider the difference between a function taking a spindle as a param and returning a promise, thats a big refactor)

RX14 avatar Jul 31 '18 09:07 RX14

and JS promises have no way to cancel them, we don't need cancellation for them

RX14 avatar Jul 31 '18 09:07 RX14

These Promise.all and Promise.first are nice, but they only allow a static number of concurrent tasks. This is just like the parallel macro and not sufficient for a real-world application where tasks can be dynamically added at runtime (like connection handlers of a network server for example).

straight-shoota avatar Jul 31 '18 11:07 straight-shoota

At least I would want to have a way to cancel a promise/fiber (and do it gracefully) in case its result/effect is no longer required. It shouldn't block resources for unnecessary work.

straight-shoota avatar Jul 31 '18 11:07 straight-shoota

@straight-shoota No, .all and .first can easily take arrays.

RX14 avatar Jul 31 '18 11:07 RX14

How would that work with adding child tasks during runtime? You would have to append them to the array - somehow that could probably be made to work with .all and .first. But it makes a really ugly API using an array as registry for tasks.

straight-shoota avatar Jul 31 '18 12:07 straight-shoota

Ah I see what you want. We could easily add block-based versions of Promise.first and Promise.all which do what you want, but I can't help but feel that adding tasks at runtime would be hard to track. I'd be happy to do that though.

RX14 avatar Jul 31 '18 15:07 RX14

Promises isn't Structured Concurrency. Don't mix concepts, please :)

Structured Concurrency is about controlling the lifetime of nested coroutines (they can't outlive their parent). We can spawn new coroutines at any time (e.g. on Socket#accept) and eventually tell one or many to stop at any time. Lifetime is all that's cared about —use channels to pass values.

A promise defers computation of a fixed set of fibers to return a set of values. Since Promise.all waits for values, you can't add a fiber at any time to the list (e.g. Socket#accept) and the current fiber is blocked. Promise.all only happens to fake Structure Concurrency.

Passing a mutable Array to Promise.all would be an ugly hack, and prone to concurrency issues: I can push a fiber? what if previous fibers are finished already? we raise an exception? what if we have a TCP server that responded to all requests and got idle, then comes a new request? what if I pop or delete a fiber from the array?

Promises != Structured Concurrency.

ysbaddaden avatar Aug 01 '18 08:08 ysbaddaden

I was thinking more of a

Promise.all do |promises|
  promises << ...
end

interface in addition to the other promise interfaces to make it do wait groups as well.

I guess it's probably quite a hack though, and better to separate the two concepts.

RX14 avatar Aug 01 '18 10:08 RX14

@RX14 That doesn't seem much different from

concurrent do |spindle|
  spindle.spawn ...
end

So we pretty much want the same thing in this regard ;)

straight-shoota avatar Aug 01 '18 10:08 straight-shoota

Honestly I kind of like the idea of having something other than promises, which I sometimes feel like is a bad solution but manages to be used everywhere...

refi64 avatar Aug 01 '18 13:08 refi64

@straight-shoota yeah I like the concept I was just wondering if we could work it into the "promise" concept for simplicity.

RX14 avatar Aug 01 '18 13:08 RX14

I've been writing a Promise library for Crystal Lang and it's almost complete (core implementation complete with specs) https://github.com/spider-gazelle/promise

It might not be "Structured Concurrency" in the most strict sense however it does simplify coordinating a bunch of async events and it's quite a popular paradigm. Would love to see it in the standard library if that's something you would consider.

I think my implementation is pretty neat in any case:

require "promise"

promise = Promise.new(Int32)
result = p.then { |result| result.not_nil! + 100 }.then { |result| "change #{result} type" }
promise.resolve(10)

# Can also grab the value via a future
result.value # => "change 110 type"

Any .then block can change promise types as it propagates down the chain. .catch blocks can only propagate exceptions and are used to recover values back to the initiating promise value type.

Thanks to the Crystal type safety it puts most promise implementations to shame.

stakach avatar Aug 02 '18 04:08 stakach

I don't think a promise library in crystal would look anything at all like a JS promise library. .then isn't really required at all. Libraries should expose synchronous APIs, and any uses of Promise should really stay internal to the application.

In fact making promises too much like JS will mean people start using promises like JS and crystal promises absolutely should not be used the same as JS. They should be used pretty sparingly.

RX14 avatar Aug 02 '18 10:08 RX14

Well yeah (exposing synchronous APIs) Not sure how you escape from using the equivalent of .then though it's kind of the je ne sais quoi of promises. Without it all you have is futures.

stakach avatar Aug 02 '18 12:08 stakach

@stakach ah, my terminology was all messed up. I'd like futures in the stdlib and promises perhaps can be a shard built on that.

RX14 avatar Aug 02 '18 12:08 RX14

I'm still not sure what the conceptual difference between a promise and future would be in crystal though

RX14 avatar Aug 02 '18 12:08 RX14

I like to think of promises and futures as either end of a pipe, the promise is where I can put something in and the future is where I can wait and listen for the result.

Now taking this picture, I just described a Channel with a buffer capacity of one that can only be written to once. So perhaps for us promises would actually be just redundant to channels.

jhass avatar Aug 02 '18 13:08 jhass

Promises are really complimentary to channels. Channels for distributing async work and promises for handling the results.

For instance, I'm working on updating this influxdb library to the latest version of Crystal lang as the original maintainer doesn't have the time.

The original version uses spawn to perform an async request (a channel here would be ideal) - however either way you can never know if the request succeeded and you don't always want to wait around at this point for the response. That bit of code looks pretty messy:

      if sync
        send_write(body).status_code == 204
      else
        spawn { send_write(body) }
        true
      end

A Promise in this case is the perfect solution.

  • You can ignore the result like the async implementation above
  • You can be synchronous if you want to be, using Promise#value
  • You can use a call back to handle a success or failure condition

For comparison, the promise version of the above would be:

promise = Promise.new(Bool)
channel.send {promise, body}
promise

Promises provide flexibility and are simple to use. I don't think they should be seen as competition to other flow control solutions, they are just one of many tools

stakach avatar Aug 02 '18 13:08 stakach

Well, regardless of the terminology what I want is a fiber which can return a value, and you can wait for it to complete with a value or error

RX14 avatar Aug 02 '18 14:08 RX14

@stakach I don't think a promise is the right tool in your example. The library method #write should just be blocking like any Crystal IO method and directly call #send_write. It should be the caller's task to manage concurrency and execute this method in a separate fiber where appropriate. The library simply doesn't need to care about that.

This would be idiomatic Crystal with a simplified API by removing sync argument and a dependency on a promise implementation. It's consistent with other IO methods, including others on the same type (select, query, drop) don't seem to have an option to execute asynchronously.

straight-shoota avatar Aug 02 '18 15:08 straight-shoota

@straight-shoota problem is running the write concurrently doesn't work on the current version of Crystal so it has to be via the channel to ensure serial writes.

Basically because the HTTP client response from influx is chunked, crystal yields the current fiber while it waits for IO mid HTTP response cycle. Then I can start another request while the client is still processing the previous response which leads to weird errors. (not sure if this is an issue with the crystal lang HTTP::Client however if you take that influxdb library, make the minor changes to run on 0.25.1 and run the tests, it blows up) - this is a common issue with fibers and the library should definitely deal with this not the caller.

stakach avatar Aug 02 '18 16:08 stakach

Yes, that's a shortcoming of HTTP::Client. It currently can't multiplex concurrent requests to the same endpoint over multiple connections. See #6011

Until this is resolved, you would need to use several client instances or guard one with a mutex. But this issue exists whether #write is async or not (it could always be called from a different fiber).

straight-shoota avatar Aug 02 '18 16:08 straight-shoota

The problem of being called from multiple different fibers is already solved by using the channel - any fiber can call the code, the channel is used to make the HTTP::Client requests in serial and the promise returns the result to the calling fibers.

Very little complexity vs locks and/or multiple client instances

stakach avatar Aug 02 '18 16:08 stakach

@RX14 I deliver you "a fiber which can return a value, and you can wait for it to complete with a value or error" https://github.com/spider-gazelle/promise#simple-concurrency-control

It's not an alternative structured concurrency but I still think it's pretty cool.

stakach avatar Aug 03 '18 16:08 stakach

@straight-shoota thanks for the advice - I threw a lock around the influxdb HTTP client, you were right, was definitely the way to go. Also watched the Trio video does seem pretty cool. Look forward to Spindles landing!

stakach avatar Aug 04 '18 09:08 stakach

Not sure how you escape from using the equivalent of .then

We had the same debates back there in Boost mailing lists:

sticker

vinipsmaker avatar Dec 12 '18 00:12 vinipsmaker

Indeed. Go doesn't have promises either, I'm sure there's a good reason (might be lack of generics, but I'm not sure). I remember @waj was always against promises. The whole point of non-blocking IO and spawn is to avoid callback hell. But wait group is something good.

asterite avatar Dec 12 '18 00:12 asterite