pekko icon indicating copy to clipboard operation
pekko copied to clipboard

chore: Add FutureOps which with an await style.

Open He-Pin opened this issue 1 year ago • 15 comments

Motivation: I found the Await.result is everywhere.

Modification: Add an await operator to the future.

Result: Fluent code.

[info] Benchmark                              (pool)  (recursion)  (threads)   Mode  Cnt        Score       Error   Units
[info] CompleteFutureBenchmark.success           fix         1024          1  thrpt    5  1069166.183 ± 46755.228  ops/ms
[info] CompleteFutureBenchmark.successOption     fix         1024          1  thrpt    5  1383230.259 ± 15758.252  ops/ms
[info] CompleteFutureBenchmark.successUnwrap     fix         1024          1  thrpt    5  1377034.015 ± 46460.695  ops/ms

I did some changes in the scala/scala

He-Pin avatar Jan 02 '25 16:01 He-Pin

This code is internal. Does it matter if it is fluent?

pjfanning avatar Jan 02 '25 18:01 pjfanning

So does performance, I think which will avoid the waiting if the future is already done.

I was want to make this public, but it seems safer for me to taste it first.

He-Pin avatar Jan 02 '25 18:01 He-Pin

If there is a perf issue in Await.result, I would prefer it was fixed in Scala runtime.

The Scala 2 impl seems to try to optimise for the completed case.

    final def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable match {
      case f: Future[T] if f.isCompleted => f.result(atMost)(AwaitPermission)
      case _ => blocking(awaitable.result(atMost)(AwaitPermission))
    }

pjfanning avatar Jan 02 '25 18:01 pjfanning

The code seems need more adjustment, as we don't need to test if it's a future anymore

He-Pin avatar Jan 02 '25 19:01 He-Pin

??? Why its closed?

He-Pin avatar Jan 02 '25 19:01 He-Pin

@pjfanning I have attached the result

He-Pin avatar Jan 03 '25 06:01 He-Pin

@pjfanning I have attached the result

I would still like this discussed with the Scala team to understand if the Await.result can be improved. Everyone benefits if you have discovered a perf issue there and that it then gets fixed.

pjfanning avatar Jan 03 '25 07:01 pjfanning

@pjfanning https://github.com/scala/scala/pull/10972 , submitted to scala

He-Pin avatar Jan 03 '25 07:01 He-Pin

scala/scala#10972 , submitted to scala

Awesome that you have proposed this improvement upstream, thanks!

I see two motivations for this change: the performance improvement, and the fact that the code 'looks nicer'. I agree that if this performance improvement could make it upstream that would be much better.

For the idea that this change makes the code looks nicer, I'm not sure I agree: it replaces well-known Scala idiom with a custom helper. While the custom helper is perhaps nicer, it's also 'one more thing to learn'. Also, because asynchronicity is at the heart of Pekko, we should use Await sparingly and with caution. For this reason, having to use the arguably-'clumsy' Scala idiom might not be so bad, as it adds some friction to consider whether Await is really needed.

raboof avatar Jan 03 '25 11:01 raboof

  1. this is just following the style in gears.
  extension [T](src: Source[scala.util.Try[T]])
    /** Waits for an item to arrive from the source, then automatically unwraps it. Suspends until an item returns.
      * @see
      *   [[Source!.awaitResult awaitResult]] for non-unwrapping await.
      */
    inline def await(using Async) = src.awaitResult.get

and then with usage:

  test("Constant returns") {
    Async.blocking:
      for (i <- -5 to 5)
        val f1 = Future { i }
        val f2 = Future.now(Success(i))
        assertEquals(f1.await, i)
        assertEquals(f1.await, f2.await)
  }
  1. The current implementation by @viktorklang will double-getting on java.util.concurrent.atomic.AtomicReference#get, have no idea, why would we do that.
  2. This optimization fix point 2.
  3. I try to avoid the lifting to Option with
  def unwrap: Try[T]

in Future.scala, and then

    @throws(classOf[TimeoutException])
    @throws(classOf[InterruptedException])
    final def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable match {
      case f: Future[T] @unchecked =>
        val r = f.unwrap
        if (r ne null) r.get
        else blocking(awaitable.result(atMost)(AwaitPermission))
      case _ => blocking(awaitable.result(atMost)(AwaitPermission))
    }

But the result doesn't have much difference in gc pressure.

He-Pin avatar Jan 03 '25 11:01 He-Pin

One advantage of having this inside is it works on all scala versions.

He-Pin avatar Jan 03 '25 16:01 He-Pin

@He-Pin could you add the CompleteFutureBenchmark to this PR? I think we have some benchmarks in this repo where this could be added.

pjfanning avatar Jan 04 '25 11:01 pjfanning

I would like us to take a little bit of time over this just to see what the Scala team and other users have to say. I am not against merging a change to Pekko but I think that change must not negatively impact the awaits for incomplete futures. The existing benchmark only checks complete futures. I know it is harder to measure the impact of a change on incomplete futures because benchmarks are mainly measuring the wait and not the code around getting the result from the awaitable after it completes.

pjfanning avatar Jan 04 '25 12:01 pjfanning

@pjfanning I think that will not make much difference, the old implementation will always do the AtomicReference.get so does this, one. and when the future is not completed, this will return a None (no allocation), which was returning a boolean.

He-Pin avatar Jan 04 '25 12:01 He-Pin

And I think there is a limitation of the current Scala implementation, were we can't save the value into a slot to avoid double get, the value is on the stack, which is very cheap,but we can't do it with current scala.

https://contributors.scala-lang.org/t/inline-convertion-in-pattern-matchings-if-guard/6962/12

He-Pin avatar Jan 04 '25 13:01 He-Pin

merged upstream

He-Pin avatar Sep 19 '25 18:09 He-Pin