skynet
skynet copied to clipboard
Scala implementation using Future.
This is better:
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
object Skynet extends App {
def caller(num: Int, size: Int, div: Int): Future[Long] = {
if (size == 1) {
Future.successful(num.toLong)
} else {
val futures = for (i <- 0 until div) yield caller(num + i * (size / div), size / div, div)
Future.sequence(futures).map(_.sum)
}
}
// Warmup
Await.result(caller(0, 1000000, 10), Duration.Inf)
val startTime = System.currentTimeMillis()
val x = Await.result(caller(0, 1000000, 10), Duration.Inf)
val diffMs = System.currentTimeMillis() - startTime
println(s"Result: $x in $diffMs ms.")
}
I.E. add a warmup to it
Or just run it for 10M or 100M iterations :)
Go runs out of memory soon after 10M iterationrs, while the Scala Future version is totally stable with practically no memory cost at all even for 100M iterations.
AnyWhy not keep both implementations?
Using Akka actors for something like this is pointless and a bad example for actor usage. Would be different if the benchmark first initialized an actor hierarchy and then did some message processing using it.
Can you please re-add this in a separate folder since we have updated idiomatic Akka now?
It's now in a separate scala-future
dir with a bit of warm-up added.
This is a not equivalent to the other languages. At the leaf level, you are synchronously adding 10 immediately-successful futures instead of creating 10 actors. You shouldn't be surprised that there's a performance increase.
This is the "Skynet 1M threads microbenchmark" not the "Skynet 100k threads microbenchmark".
akka isn't equivalent to the other languages, too. first a akka actor contains twice the state of an erlang actor second other languages using static dispatch, while akka needs runtime dynamic dispatch.
also he only adds the number successfully. he still passes everything to a executor. you can't just compare languages or think that implementations will actually do the same. also this example is close to the golang example, pretty close. while the akka example does a thousand things more. also you will never run on 1m threads. in no example.
if size == 1 {
c <- num
return
}
this will definitly be inlined. which is the same as Future.successful()
P.S. microbenchmark means trying to do the same thing with the best possible way a language offers, and trust me nobody would ever use the akka implementation, for such a task.
@ogier going from Future.successful(x)
to Future(x)
does impact performance, from 230ms to 380ms. Then again, this is how it would be idiomatically implemented with Scala futures, so why would you want to do it otherwise.
Would be better to have a sensible benchmark that does something relevant and then implement that in different languages/frameworks. This one is just measuring the overhead of creating async tasks.
It sounds like there are some disagreements going on about the nature of this benchmark. The benchmark's description is "Skynet 1M threads microbenchmark", so it seems to me there is no way you can create < 1M threads and call it a valid benchmark.
Futures are not necessarily concurrent. There is absolutely zero concurrency involved in the following, when size == div:
val futures = for (i <- 0 until div) yield caller(num + i * (size / div), size / div, div)
This is 10 sequential, recursive calls to a function that constructs a Future object and returns it. @schmitch you seem to think this is equivalent to the Go example which reads:
for i := 0; i < div; i++ {
subNum := num + i*(size/div)
go skynet(rc, subNum, size/div, div)
}
But it is not. The skynet() call in Go is running in its own concurrent goroutine. It is not inlined, and never will be because c <- num
blocks execution until a value is consumed from c
which does not happen until after the whole for loop. Even if it could be inlined (which it cannot without causing deadlock), the Go compiler never does inline goroutines in practice, even when they are absurdly trivial. Here's one example: a goroutine sends a constant value down a channel that is never used, yet 100ms later we still find it live, blocked, trying to send the value: http://play.golang.org/p/mvz1wLUHsJ
@ochrons, yes, exactly, this benchmark is "just measuring the overhead of creating async tasks." That's the whole point. If you did any other non-trivial computation in the body of the tasks, then it would not be a valid measure of the overhead of async tasks.
if you "just measuring the overhead of creating async tasks."
than the future example is correct. if you want to compare csp vs actors than the golang example would be correct, however csp would always win. You could try the JSCP implementation and compare it with the Java Actor Model, however you wanted to measure creating async tasks, and that would use the future implementation.
@ogier then you agree with me in saying that the Node.js example should be removed altogether, as it creates zero async tasks :)
For some reason, though, the author(s) accepted that PR, so I'm not too sure what exactly is the purpose of this benchmark.
And you should also mesure the memory overhead, not just CPU. With 10M iterations the Go version takes 5GB of memory and at 20M iterations I had to kill the process after it went up to 20GB (I have 32GB on my machine).
The Scala version is fine running 100M iterations with less than 1GB.
@schmitch I don't think the Futures example is "measuring the overhead of creating async tasks" because Future.successful(num)
does not create an async task. It's called in a loop ten times without any concurrency. As for "csp would always win", the fastest Async implementation of this benchmark is currently .NET core, which is not CSP.
@ochrons Yes, I agree, the node.js example suffers the exact same issue as this PR, and I don't think it belongs in this benchmark.
As to whether Futures are more or less idiomatic than Actors, the idiomatic way to write this function certainly uses neither. The idiomatic function is:
for (int i = 0; i < size; i++) {
sum += i;
}
return sum;
That uses less memory, less CPU, and less time than anything recursive, let alone anything concurrent. But that's not the point. Measuring overhead of concurrency is the point.
I've updated the Scala Future implementation to use real Futures at the leaf level, so now it indeed creates 1,111,111 async tasks like the Go version.
Takes 380ms to run 1.1M tasks and 38177ms to run 111.1M tasks.
@ogier The .NET Core implementation is using Task.FromResult
which is equivalent to Future.successful
. Although it's calling the function in a Task.Run
so maybe that doesn't matter.
https://github.com/atemerev/skynet/blob/master/dnx/Program.cs#L51
@ochrons The implementation there is not actually Task.FromResult(...)
, it's Task.Run(() => Task.FromResult(...))
. Importantly it makes ten concurrent recursive calls instead of making ten sequential recursive calls and then using their results in a way that might be asynchronous. The equivalent in your code is changing from (0 to div).map(n => skynet(...))
to something like (0 to div).map(n => Future { skynet(...) })
. Forgive my poor scala-fu, other changes may be required too.
actually if I run Future.sequence(for (0 until div) yield Future(1))
it will do the same thing than Task.Run(); Task.WhenAll() if you actually think that's not true, then whatever. maybe you are just unhappy about the result. Even Future.sequence(for (0 until div) yield Future.successfull(1))
would actually do the same as task.run 10 times.
I don't quite understand why the idiomatic Scala code should be changed to match some C# Task concept. If the call to skynet
was wrapped in a Future
it would result in a Seq[Future[Future[Long]]]
instead of Seq[Future[Long]]
, so you would have double async wrapping for no reason.
As the Future(num)
creates an async task, and the Future.sequence().map(...)
creates an async task, all code paths lead to async task creation and therefore a total of 1,111,111 async tasks are created. The C# version might create more, depending on the implementation of ContinueWith
.
@schmitch Correct me if I am wrong, but I believe Future.sequence() initiates exactly one new concurrent thread of execution: it awaits the completion of each of the futures passed to it, and assigns that list of results to the future it returns. Future.successful does not run anything at all concurrently (as exhibited by the fact that it doesn't even use an ExecutionContext). So creating a list of 10 Futures with Future.successful and passing that to Future.sequence represents one concurrent computation.
@ochrons I agree, the latest code looks fine. Sorry for harping on this point over and over.
actually Future.sequence will dispatch every future. actually internals will actually execute schedule the code multiple times, this could change. You could actually create a custom execution context and override execute. than you can count how often this makes a thread switch. actually scala is already way above 1M.
Btw. the explanation is pretty simple. If one of the Future's is failed future.sequence will fail, too. the next thing it does is running a flatMap operation on the future which actually needs to be on a thread and passes it into a Future(Seq()). i would call that canbuiltfrom awefullness. for (r <- fr; a <- fa) yield (r += a)
while fr is a Future[Builder] the fa is the actual future and a <- fa
means future.flatMap which is actually running onComplete which is passed on a executioncontext, so actually the Future.sequence will actually run on different threads a mutiple times. so running Future.sequence(Seq(Future(1), Future(1))) means that you actually dispatch something a dozen times. To actually run exactly 1.111.111 times would be nearly impossible.
@schmitch The operation of Future.sequence
is not exactly like that. Mostly the code is run in the context (and thread) of the future that's being completed (a callback registered with onComplete
). There aren't really that many additional async contextes created due to the use of Future.sequence
.
This is related to DefaultPromise
linking, which I don't pretend to understand :)
still if you run future.successfull and run onComplete you will still run it on a thread even if future.successful would've just assigned it.
You could actually pass that as the ExecutionContext to sequence():
val counter = new AtomicInteger(0)
val threadPool = Executors.newWorkStealingPool()
implicit val ec = new ExecutionContext {
override def execute(runnable: Runnable) {
counter.incrementAndGet()
threadPool.submit(runnable)
}
def reportFailure(t: Throwable) {}
}
Is there some reason why this PR has not been merged, especially now that there are separate results for different concurrency types?
I used JMH for "warm-up" and other benchmark problems: #62