skynet icon indicating copy to clipboard operation
skynet copied to clipboard

Scala implementation using Future.

Open ochrons opened this issue 9 years ago • 25 comments

ochrons avatar Feb 14 '16 15:02 ochrons

This is better:

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global


object Skynet extends App {
  def caller(num: Int, size: Int, div: Int): Future[Long] = {
    if (size == 1) {
      Future.successful(num.toLong)
    } else {
      val futures = for (i <- 0 until div) yield caller(num + i * (size / div), size / div, div)
      Future.sequence(futures).map(_.sum)
    }
  }
  // Warmup
  Await.result(caller(0, 1000000, 10), Duration.Inf)

  val startTime = System.currentTimeMillis()
  val x = Await.result(caller(0, 1000000, 10), Duration.Inf)
  val diffMs = System.currentTimeMillis() - startTime

  println(s"Result: $x in $diffMs ms.")
}

I.E. add a warmup to it

schmitch avatar Feb 14 '16 15:02 schmitch

Or just run it for 10M or 100M iterations :)

Go runs out of memory soon after 10M iterationrs, while the Scala Future version is totally stable with practically no memory cost at all even for 100M iterations.

ochrons avatar Feb 14 '16 15:02 ochrons

AnyWhy not keep both implementations?

dbohdan avatar Feb 14 '16 15:02 dbohdan

Using Akka actors for something like this is pointless and a bad example for actor usage. Would be different if the benchmark first initialized an actor hierarchy and then did some message processing using it.

ochrons avatar Feb 14 '16 15:02 ochrons

Can you please re-add this in a separate folder since we have updated idiomatic Akka now?

wizzard0 avatar Feb 14 '16 17:02 wizzard0

It's now in a separate scala-future dir with a bit of warm-up added.

ochrons avatar Feb 14 '16 18:02 ochrons

This is a not equivalent to the other languages. At the leaf level, you are synchronously adding 10 immediately-successful futures instead of creating 10 actors. You shouldn't be surprised that there's a performance increase.

This is the "Skynet 1M threads microbenchmark" not the "Skynet 100k threads microbenchmark".

ogier avatar Feb 14 '16 18:02 ogier

akka isn't equivalent to the other languages, too. first a akka actor contains twice the state of an erlang actor second other languages using static dispatch, while akka needs runtime dynamic dispatch.

also he only adds the number successfully. he still passes everything to a executor. you can't just compare languages or think that implementations will actually do the same. also this example is close to the golang example, pretty close. while the akka example does a thousand things more. also you will never run on 1m threads. in no example.

if size == 1 {
        c <- num
        return
    }

this will definitly be inlined. which is the same as Future.successful()

P.S. microbenchmark means trying to do the same thing with the best possible way a language offers, and trust me nobody would ever use the akka implementation, for such a task.

schmitch avatar Feb 14 '16 19:02 schmitch

@ogier going from Future.successful(x) to Future(x) does impact performance, from 230ms to 380ms. Then again, this is how it would be idiomatically implemented with Scala futures, so why would you want to do it otherwise.

Would be better to have a sensible benchmark that does something relevant and then implement that in different languages/frameworks. This one is just measuring the overhead of creating async tasks.

ochrons avatar Feb 14 '16 20:02 ochrons

It sounds like there are some disagreements going on about the nature of this benchmark. The benchmark's description is "Skynet 1M threads microbenchmark", so it seems to me there is no way you can create < 1M threads and call it a valid benchmark.

Futures are not necessarily concurrent. There is absolutely zero concurrency involved in the following, when size == div:

val futures = for (i <- 0 until div) yield caller(num + i * (size / div), size / div, div)

This is 10 sequential, recursive calls to a function that constructs a Future object and returns it. @schmitch you seem to think this is equivalent to the Go example which reads:

for i := 0; i < div; i++ {
    subNum := num + i*(size/div)
    go skynet(rc, subNum, size/div, div)
}

But it is not. The skynet() call in Go is running in its own concurrent goroutine. It is not inlined, and never will be because c <- num blocks execution until a value is consumed from c which does not happen until after the whole for loop. Even if it could be inlined (which it cannot without causing deadlock), the Go compiler never does inline goroutines in practice, even when they are absurdly trivial. Here's one example: a goroutine sends a constant value down a channel that is never used, yet 100ms later we still find it live, blocked, trying to send the value: http://play.golang.org/p/mvz1wLUHsJ

@ochrons, yes, exactly, this benchmark is "just measuring the overhead of creating async tasks." That's the whole point. If you did any other non-trivial computation in the body of the tasks, then it would not be a valid measure of the overhead of async tasks.

ogier avatar Feb 14 '16 20:02 ogier

if you "just measuring the overhead of creating async tasks."

than the future example is correct. if you want to compare csp vs actors than the golang example would be correct, however csp would always win. You could try the JSCP implementation and compare it with the Java Actor Model, however you wanted to measure creating async tasks, and that would use the future implementation.

schmitch avatar Feb 14 '16 20:02 schmitch

@ogier then you agree with me in saying that the Node.js example should be removed altogether, as it creates zero async tasks :)

For some reason, though, the author(s) accepted that PR, so I'm not too sure what exactly is the purpose of this benchmark.

ochrons avatar Feb 14 '16 20:02 ochrons

And you should also mesure the memory overhead, not just CPU. With 10M iterations the Go version takes 5GB of memory and at 20M iterations I had to kill the process after it went up to 20GB (I have 32GB on my machine).

The Scala version is fine running 100M iterations with less than 1GB.

ochrons avatar Feb 14 '16 20:02 ochrons

@schmitch I don't think the Futures example is "measuring the overhead of creating async tasks" because Future.successful(num) does not create an async task. It's called in a loop ten times without any concurrency. As for "csp would always win", the fastest Async implementation of this benchmark is currently .NET core, which is not CSP.

@ochrons Yes, I agree, the node.js example suffers the exact same issue as this PR, and I don't think it belongs in this benchmark.

As to whether Futures are more or less idiomatic than Actors, the idiomatic way to write this function certainly uses neither. The idiomatic function is:

for (int i = 0; i < size; i++) {
    sum += i;
}
return sum;

That uses less memory, less CPU, and less time than anything recursive, let alone anything concurrent. But that's not the point. Measuring overhead of concurrency is the point.

ogier avatar Feb 14 '16 21:02 ogier

I've updated the Scala Future implementation to use real Futures at the leaf level, so now it indeed creates 1,111,111 async tasks like the Go version.

Takes 380ms to run 1.1M tasks and 38177ms to run 111.1M tasks.

ochrons avatar Feb 14 '16 21:02 ochrons

@ogier The .NET Core implementation is using Task.FromResult which is equivalent to Future.successful. Although it's calling the function in a Task.Run so maybe that doesn't matter.

https://github.com/atemerev/skynet/blob/master/dnx/Program.cs#L51

ochrons avatar Feb 14 '16 21:02 ochrons

@ochrons The implementation there is not actually Task.FromResult(...), it's Task.Run(() => Task.FromResult(...)). Importantly it makes ten concurrent recursive calls instead of making ten sequential recursive calls and then using their results in a way that might be asynchronous. The equivalent in your code is changing from (0 to div).map(n => skynet(...)) to something like (0 to div).map(n => Future { skynet(...) }). Forgive my poor scala-fu, other changes may be required too.

ogier avatar Feb 14 '16 21:02 ogier

actually if I run Future.sequence(for (0 until div) yield Future(1)) it will do the same thing than Task.Run(); Task.WhenAll() if you actually think that's not true, then whatever. maybe you are just unhappy about the result. Even Future.sequence(for (0 until div) yield Future.successfull(1)) would actually do the same as task.run 10 times.

schmitch avatar Feb 14 '16 21:02 schmitch

I don't quite understand why the idiomatic Scala code should be changed to match some C# Task concept. If the call to skynet was wrapped in a Future it would result in a Seq[Future[Future[Long]]] instead of Seq[Future[Long]], so you would have double async wrapping for no reason.

As the Future(num) creates an async task, and the Future.sequence().map(...) creates an async task, all code paths lead to async task creation and therefore a total of 1,111,111 async tasks are created. The C# version might create more, depending on the implementation of ContinueWith.

ochrons avatar Feb 14 '16 22:02 ochrons

@schmitch Correct me if I am wrong, but I believe Future.sequence() initiates exactly one new concurrent thread of execution: it awaits the completion of each of the futures passed to it, and assigns that list of results to the future it returns. Future.successful does not run anything at all concurrently (as exhibited by the fact that it doesn't even use an ExecutionContext). So creating a list of 10 Futures with Future.successful and passing that to Future.sequence represents one concurrent computation.

@ochrons I agree, the latest code looks fine. Sorry for harping on this point over and over.

ogier avatar Feb 14 '16 22:02 ogier

actually Future.sequence will dispatch every future. actually internals will actually execute schedule the code multiple times, this could change. You could actually create a custom execution context and override execute. than you can count how often this makes a thread switch. actually scala is already way above 1M.

Btw. the explanation is pretty simple. If one of the Future's is failed future.sequence will fail, too. the next thing it does is running a flatMap operation on the future which actually needs to be on a thread and passes it into a Future(Seq()). i would call that canbuiltfrom awefullness. for (r <- fr; a <- fa) yield (r += a)while fr is a Future[Builder] the fa is the actual future and a <- fa means future.flatMap which is actually running onComplete which is passed on a executioncontext, so actually the Future.sequence will actually run on different threads a mutiple times. so running Future.sequence(Seq(Future(1), Future(1))) means that you actually dispatch something a dozen times. To actually run exactly 1.111.111 times would be nearly impossible.

schmitch avatar Feb 14 '16 23:02 schmitch

@schmitch The operation of Future.sequence is not exactly like that. Mostly the code is run in the context (and thread) of the future that's being completed (a callback registered with onComplete). There aren't really that many additional async contextes created due to the use of Future.sequence.

This is related to DefaultPromise linking, which I don't pretend to understand :)

ochrons avatar Feb 14 '16 23:02 ochrons

still if you run future.successfull and run onComplete you will still run it on a thread even if future.successful would've just assigned it.

You could actually pass that as the ExecutionContext to sequence():

  val counter = new AtomicInteger(0)
  val threadPool = Executors.newWorkStealingPool()

  implicit val ec = new ExecutionContext {

    override def execute(runnable: Runnable) {
      counter.incrementAndGet()

      threadPool.submit(runnable)
    }

    def reportFailure(t: Throwable) {}
  }

schmitch avatar Feb 14 '16 23:02 schmitch

Is there some reason why this PR has not been merged, especially now that there are separate results for different concurrency types?

ochrons avatar Feb 16 '16 08:02 ochrons

I used JMH for "warm-up" and other benchmark problems: #62

Mironor avatar Feb 19 '16 10:02 Mironor