


在 Akka 中,一个Future是用来获取某个并发操作结果的数据结构。这个结果可以以同步(阻塞)或异步(非阻塞)的方式访问。


为了运行回调和操作,Futures 需要有一个ExecutionContext,它与java.util.concurrent.Executor很相像. 如果你在作用域内有一个ActorSystem,它会把自己的派发器用作ExecutionContext,或者你也可以用ExecutionContext伴生对象提供的工厂方法来将ExecutorsExecutorServices进行包装,或者甚至创建自己的实例。

import scala.concurrent.{ ExecutionContext, Promise }

implicit val ec = ExecutionContext.fromExecutorService(yourExecutorServiceGoesHere)

// Do stuff with your brand new shiny ExecutionContext
val f = Promise.successful("foo")

// Then shut your ExecutionContext down at some
// appropriate place in your program/application



class A extends Actor {
  import context.dispatcher
  val f = Future("hello")
  def receive = {
    // receive omitted ...

用于 Actor

通常有两种方法来从一个Actor获取回应:第一种是发送一个消息(actor ! msg,这种方法只在发送者是一个Actor时有效),第二种是通过一个Future

使用Actor?方法来发送消息会返回一个Future. 要等待并获取结果的最简单方法是:

import scala.concurrent.Await
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._

implicit val timeout = Timeout(5 seconds)
val future = actor ? msg // enabled by the “ask” import
val result = Await.result(future, timeout.duration).asInstanceOf[String]

这会导致当前线程阻塞,并等待Actor通过它的应答来 ‘完成’Future。但是阻塞会导致性能问题,所以是不推荐的. 导致阻塞的操作位于Await.resultAwait.ready中,这样就方便定位阻塞的位置. 对阻塞方式的替代方法会在本文档中进一步讨论。还要注意Actor返回的Future的类型 是Future[Any],这是因为Actor是动态的. 这也是为什么上例中使用了asInstanceOf。在使用非阻塞方式时,最好使用mapTo方法来将Future转换到期望的类型:

import scala.concurrent.Future
import akka.pattern.ask

val future: Future[String] = ask(actor, msg).mapTo[String]

如果转换成功,mapTo方法会返回一个包含结果的新的Future,如果不成功,则返回ClassCastException. 对Exception的处理将在本文档进一步讨论。


import akka.pattern.pipe
future pipeTo actor


Akka中的一个常见用例是在不需要额外使用Actor工具的情况下并发地执行计算. 如果你发现你只是为了并行地执行一个计算而创建了一堆Actor,下面是一种更好(也更快)的方法:

import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._

val future = Future {
  "Hello" + "World"
future foreach println

在上面的代码中,被传递给Future的代码块会被缺省的Dispatcher执行,代码块的返回结果会被用来完成Future(在这个例子中,结果是一个字符串:“HelloWorld”). 与从Actor返回的Future不同,这个Future拥有合适的类型,我们还避免了管理Actor的开销。


val future = Future.successful("Yay!")


val otherFuture = Future.failed[String](new IllegalArgumentException("Bang!"))


val promise = Promise[String]()
val theFuture = promise.future

函数式 Future

Scala 的Future有一些 monadic 方法,与Scala集合所使用的方法非常相似. 这使你可以构造出可以传递结果的 ‘管道’ 或 ‘数据流’ 。

Future 是 Monad

Future以函数式风格工作的第一个方法是map. 它需要一个Function来对Future的结果进行处理,返回一个新的结果。map方法的返回值是包含新结果的另一个Future:

val f1 = Future {
  "Hello" + "World"
val f2 = f1 map { x =>
f2 foreach println

这个例子中我们在Future内部连接两个字符串。我们没有等待这个Future结束,而是使用map方法来将计算字符串长度的函数应用于它. 现在我们有了第二个Future,它的最终结果是一个Int. 当先前的Future完成时,它会应用我们的函数并用其结果来完成第二个Future。最终我们得到的结果是 10. 先前的Future仍然持有字符串“HelloWorld”,而不受map的影响。


val f1 = Future {
  "Hello" + "World"
val f2 = Future.successful(3)
val f3 = f1 map { x =>
  f2 map { y =>
    x.length * y
f3 foreach println

f3的类型是Future[Future[Int]]而不是我们所期望的Future[Int]. 这时我们需要使用flatMap方法:

val f1 = Future {
  "Hello" + "World"
val f2 = Future.successful(3)
val f3 = f1 flatMap { x =>
  f2 map { y =>
    x.length * y
f3 foreach println

使用嵌套的mapflatmap组合子来组合Future,有时会变得非常复杂和难以阅读,这时使用 Scala 的 ‘for comprehensions’ 一般会生成可读性更好的代码。见下一部分的示例。


val future1 = Future.successful(4)
val future2 = future1.filter(_ % 2 == 0)

future2 foreach println

val failedFilter = future1.filter(_ % 2 == 1).recover {
  // When filter fails, it will have a java.util.NoSuchElementException
  case m: NoSuchElementException => 0

failedFilter foreach println
For Comprehensions

由于Future拥有mapfilterflatMap方法,它可以方便地用于 ‘for comprehension’:

val f = for {
  a <- Future(10 / 2) // 10 / 2 = 5
  b <- Future(a + 1) //  5 + 1 = 6
  c <- Future(a - 1) //  5 - 1 = 4
  if c > 3 // Future.filter
} yield b * c //  6 * 4 = 24

// Note that the execution of futures a, b, and c
// are not done in parallel.

f foreach println

这样写代码的时候需要记住的是:虽然看上去上例的部分代码可以并发地运行,for comprehension的每一步实际是顺序执行的。每一步是在单独的线程中运行的,但是相较于将所有的计算在一个单独的Future中运行并没有太大好处。只有先创建Future,然后对其进行组合的情况下才能真正得到好处。

组合 Futures

上例中的for comprehension 是对Future进行组合的例子. 这种方法的常见用例是将多个Actor的回应组合成一个单独的计算而不用调用Await.resultAwait.ready来阻塞地获得每一个结果. 先看看使用Await.result的例子:

val f1 = ask(actor1, msg1)
val f2 = ask(actor2, msg2)

val a = Await.result(f1, 3 seconds).asInstanceOf[Int]
val b = Await.result(f2, 3 seconds).asInstanceOf[Int]

val f3 = ask(actor3, (a + b))

val result = Await.result(f3, 3 seconds).asInstanceOf[Int]



这里我们等待前2个Actor的结果然后将其发送给第三个Actor. 我们调用了3次Await.result,导致我们的程序在获得最终结果前阻塞了3次。现在跟下例比较:

val f1 = ask(actor1, msg1)
val f2 = ask(actor2, msg2)

val f3 = for {
  a <- f1.mapTo[Int]
  b <- f2.mapTo[Int]
  c <- ask(actor3, (a + b)).mapTo[Int]
} yield c

f3 foreach println

这里我们有两个actor各自处理自己的一条消息。一旦这2个结果可用了(注意我们并没有阻塞地等待这些结果!),它们会被加起来发送给第三个Actor,这第三个actor回应一个字符串,我们把它赋值给 ‘result’。

上面的方法对已知给定Actor数量的时候就足够了,但是当Actor数量较大时就显得比较笨重。sequencetraverse两个辅助方法可以帮助处理更复杂的情况。这两个方法都是用来将T[Future[A]]转换为Future[T[A]](其中TTraversable子类). 例如:

// oddActor returns odd numbers sequentially from 1 as a List[Future[Int]]
val listOfFutures = List.fill(100)(akka.pattern.ask(oddActor, GetNext).mapTo[Int])

// now we have a Future[List[Int]]
val futureList = Future.sequence(listOfFutures)

// Find the sum of the odd numbers
val oddSum = futureList.map(_.sum)
oddSum foreach println

现在来解释一下,Future.sequence将输入的List[Future[Int]]转换为Future[List[Int]]. 这样我们就可以将map直接作用于List[Int],从而得到List的总和。

traverse方法与sequence类似,但它以T[A]A => Future[B]函数为参数返回一个Future[T[B]],这里的T同样也是Traversable的子类. 例如,用traverse来计算前100个奇数的和:

val futureList = Future.traverse((1 to 100).toList)(x => Future(x * 2 - 1))
val oddSum = futureList.map(_.sum)
oddSum foreach println


val futureList = Future.sequence((1 to 100).toList.map(x => Future(x * 2 - 1)))
val oddSum = futureList.map(_.sum)
oddSum foreach println


然后我们有一个方法fold,它的参数包括一个初始值 ,一个Future序列和一个作用于初始值和Future类型返回与初始值相同类型的函数,它将这个函数异步地应用于future序列的所有元素,它的执行将在最后一个Future完成之后开始。

// Create a sequence of Futures
val futures = for (i <- 1 to 1000) yield Future(i * 2)
val futureSum = Future.fold(futures)(0)(_ + _)
futureSum foreach println


如果传给fold的序列是空的,它将返回初始值,在上例中,这个值是0. 有时你没有一个初始值,而使用序列中第一个已完成的Future的值作为初始值,你可以使用reduce,它的用法是这样的:

// Create a sequence of Futures
val futures = for (i <- 1 to 1000) yield Future(i * 2)
val futureSum = Future.reduce(futures)(_ + _)
futureSum foreach println



有时你只想要监听Future的完成事件,对其进行响应,不是创建新的Future,而仅仅是产生副作用. Scala为这种情况准备了onCompleteonSuccessonFailure,其中后两者是第一项的特例。

future onSuccess {
  case "bar"     => println("Got my bar alright!")
  case x: String => println("Got some random string: " + x)
future onFailure {
  case ise: IllegalStateException if ise.getMessage == "OHNOES" =>
  //OHNOES! We are in deep trouble, do something!
  case e: Exception =>
  //Do something else
future onComplete {
  case Success(result)  => doSomethingOnSuccess(result)
  case Failure(failure) => doSomethingOnFailure(failure)


由于回调的执行是无序的,而且可能是并发执行的,当你需要操作有序的时候代码行为往往很怪异。但有一个解决办法是使用andThen. 它会为指定的回调创建一个新的Future,这个Future与原先的Future拥有相同的结果,这样就可以像下例一样定义次序:

val result = Future { loadPage(url) } andThen {
  case Failure(exception) => log(exception)
} andThen {
  case _ => watchSomeTV()
result foreach println



val future4 = future1 fallbackTo future2 fallbackTo future3
future4 foreach println


val future3 = future1 zip future2 map { case (a, b) => a + " " + b }
future3 foreach println


由于Future的结果是与程序的其它部分并发生成的,因此异常需要作特殊的处理。不管是Actor或是派发器正在完成此Future,如果抛出了ExceptionFuture将持有这个异常而不是一个有效的值. 如果Future持有Exception,调用Await.result将导致此异常被再次抛出从而得到正确的处理。

通过返回一个不同的结果来处理Exception也是可能的. 这是使用recover方法实现的. 例如:

val future = akka.pattern.ask(actor, msg1) recover {
  case e: ArithmeticException => 0
future foreach println

在这个例子中,如果actor回应了包含ArithmeticExceptionakka.actor.Status.Failure,我们的Future将持有 0 作为结果.recover方法与标准的 try/catch 块非常相似,可以用这种方式处理多种Exception, 如果其中有没有提到的Exception,这种异常将以“好像没有定义recover一样”的的方式来处理。


val future = akka.pattern.ask(actor, msg1) recoverWith {
  case e: ArithmeticException => Future.successful(0)
  case foo: IllegalArgumentException =>
    Future.failed[Int](new IllegalStateException("All br0ken!"))
future foreach println



// TODO after is unfortunately shadowed by ScalaTest, fix as part of #3759
// import akka.pattern.after

val delayed = akka.pattern.after(200 millis, using = system.scheduler)(Future.failed(
  new IllegalStateException("OHNOES")))
val future = Future { Thread.sleep(1000); "foo" }
val result = Future firstCompletedOf Seq(future, delayed)

