Skip to content

Update futures.md (zh-cn) #1417

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 163 additions & 23 deletions _zh-cn/overviews/core/futures.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,123 @@ discourse: false

## 简介

Future提供了一套高效便捷的非阻塞并行操作管理方案。其基本思想很简单,所谓Future,指的是一类占位符对象,用于指代某些尚未完成的计算的结果。一般来说,由Future指代的计算都是并行执行的,计算完毕后可另行获取相关计算结果。以这种方式组织并行任务,便可以写出高效、异步、非阻塞的并行代码
Future提供了一种合理执行并行操作(高效非阻塞)的方式,Future就是代表某个可能尚不存在的值的占位对象。一般来说,Future的值的填充方式都是并发的,而取用则是顺序的。以这种方式来编排并发任务,很容易写出更快的异步非阻塞的并行代码

默认情况下,future和promise并不采用一般的阻塞操作,而是依赖回调进行非阻塞操作。为了在语法和概念层面更加简明扼要地使用这些回调,Scala还提供了flatMap、foreach和filter等算子,使得我们能够以非阻塞的方式对future进行组合。当然,future仍然支持阻塞操作——必要时,可以阻塞等待future(不过并不鼓励这样做)。
默认情况下,future 和 promise 是非阻塞的,其使用 callback 回调来替代传统的阻塞操作。为了在语法和概念层面更加简明扼要地使用这些回调,Scala 还提供了 flatMap、foreach 和 filter 等算子,使得我们能够以非阻塞的方式对 future 进行组合。当然,future 仍然支持阻塞操作——必要时,可以阻塞等待future(不过并不鼓励这样做)。

一个典型的 future 如下所示:

val inverseFuture: Future[Matrix] = Future {
fatMatrix.inverse() // non-blocking long lasting computation
}(executionContext)

或者更惯用的写法如下:

implicit val ec: ExecutionContext = ...
val inverseFuture : Future[Matrix] = Future {
fatMatrix.inverse()
} // ec is implicitly passed

两段代码都将 `fatMatrix.inverse()` 的执行委托给 `ExecutionContext`,并且在 `inverseFuture` 中获取计算结果。

## 执行上下文 (Execution Context)

Future 和 Promises 都依赖 `ExecutionContext` 执行上下文,后者负责执行计算。

`ExecutionContext` 和一个 [Executor](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html) 类似,它们都用于在一个新线程、线程池或者当前线程中执行计算(不鼓励在当前线程中执行计算 - 参照下文)。

`scala.concurrent` 包带有开箱即用的 `ExecutionContext` 实现,这是一个全局的静态线程池。除此之外,你也可以将一个 `Executor` 转型为 `ExecutionContext` 来获得它。最后,用户可以自由的扩展 `ExecutionContext` 特质来实现自己的执行上下文,尽管这种需求可能非常少见。

### 全局执行上下文(The Global Execution Context)

`ExecutionContext.global` 是一个由 [ForkJoinPool](http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html) 支持的 `ExecutionContext`,它应该可以满足大多数情况下的使用,但是需要小心。`ForkJoinPool` 管理着有限数量的线程(线程的最大数量参照并行度级别(parallelism level))。仅当每个任务都被 `blocking` 调用阻塞时,并发数才会超过并行度级别(下面将详细介绍),否则,存在全局执行上下文中的线程池中资源不足的风险,造成没有计算可以被处理的问题。

默认的 `ExecutionContext.global` 将其 fork-join 线程池的并行级别设置为可用的处理器数([Runtime.availableProcessors](http://docs.oracle.com/javase/7/docs/api/java/lang/Runtime.html#availableProcessors%28%29))。可以通过如下 VM 属性来覆盖这一设置:

- scala.concurrent.context.minThreads - 默认为 `Runtime.availableProcessors`
- scala.concurrent.context.numThreads - 可以是 `xN` 或者 `N` 默认为 `Runtime.availableProcessors`
- scala.concurrent.context.maxThreads - 默认为 `Runtime.availableProcessors`

并行级别将被设置为 `numThreads`,只要它保持在 `[minThreads; maxThreads]` 之间。

如上所述,存在阻塞计算的情况下,`ForkJoinPool` 可以增加超出并行级别的线程数。如 `ForkJoinPool` 的 API 所述,这必须手动进行:

import scala.concurrent.Future
import scala.concurrent.forkjoin._

// the following is equivalent to `implicit val ec = ExecutionContext.global`
import ExecutionContext.Implicits.global

Future {
ForkJoinPool.managedBlock(
new ManagedBlocker {
var done = false

def block(): Boolean = {
try {
myLock.lock()
// ...
} finally {
done = true
}
true
}

def isReleasable: Boolean = done
}
)
}

幸好,并发包提供了一种便捷的方法:

Future {
blocking {
myLock.lock()
// ...
}
}

请注意,`blocking` 是一种通用的结构,[下面](https://docs.scala-lang.org/overviews/core/futures.html#blocking-inside-a-future)将会对此有更多讨论。

最后但是同样重要的是,你必须记住,ForkJoinPool 并不是为了长时间阻塞操作而设计的,即便通知阻塞,其也可能不会像你期望的那样产生新的 Worker,并且当创建新 Worker 时,它们可以多达 32767 个。为了给你一个概念,下面的代码将会使用 32000 个线程:

implicit val ec = ExecutionContext.global

for( i <- 1 to 32000 ) {
Future {
blocking {
Thread.sleep(999999)
}
}
}

如果需要长时间阻塞操作,我们建议使用专门的 `ExecutionContext`,这是对于 Java `Executor` 的一个包装。

### 从 Java Executor 获取上下文(Adapting a Java Executor)

使用 `ExecutionContext.fromExecutor` 方法可以将一个 Java `Executor` 包装成 `ExecutionContext`,比如:

ExecutionContext.fromExecutor(new ThreadPoolExecutor( /* your configuration */ ))

## 线程同步的 Execution Context

可能有人想要在当前线程中运行一个 `ExecutionContext`:

val currentThreadExecutionContext = ExecutionContext.fromExecutor(
new Executor {
// Do not do this!
def execute(runnable: Runnable) { runnable.run() }
})

这应该避免,因为它会在你执行 future 的时候引入不确定因素:

Future {
doSomething
}(ExecutionContext.global).map {
doSomethingElse
}(currentThreadExecutionContext)

`doSomethingElse` 调用可以在 `doSomething` 的线程中执行,也可以在主线程中执行,因此可以是异步调用,也可以是同步调用。正如[这里](http://blog.ometer.com/2011/07/24/callbacks-synchronous-and-asynchronous/)所解释的,调用必须声明自己是同步或者异步,不能是“根据环境改变”的。

## Future

Expand Down Expand Up @@ -69,7 +183,7 @@ Future的一个重要属性在于它只能被赋值一次。一旦给定了某
source.toSeq.indexOfSlice("myKeyword")
}

### Callbacks(回调函数)
### 回调函数(Callbacks)

现在我们知道如何开始一个异步计算来创建一个新的future值,但是我们没有展示一旦此结果变得可用后如何来使用,以便我们能够用它来做一些有用的事。我们经常对计算结果感兴趣而不仅仅是它的副作用。

Expand Down Expand Up @@ -180,7 +294,7 @@ onComplete方法一般在某种意义上它允许客户处理future计算出的

- 一旦执行完,回调将从future对象中移除,这样更适合JVM的垃圾回收机制(GC)。

### 函数组合(Functional Composition)和For解构(For-Comprehensions)
### 函数组合 (Functional Composition) 和For解构 (For-Comprehensions)

尽管前文所展示的回调机制已经足够把future的结果和后继计算结合起来的,但是有些时候回调机制并不易于使用,且容易造成冗余的代码。我们可以通过一个例子来说明。假设我们有一个用于进行货币交易服务的API,我们想要在有盈利的时候购进一些美元。让我们先来看看怎样用回调来解决这个问题:

Expand Down Expand Up @@ -335,9 +449,33 @@ fallbackTo组合器生成的future对象可以在该原future成功完成计算

## Blocking

正如前面所说的,在future的blocking非常有效地缓解性能和预防死锁。虽然在futures中使用这些功能方面的首选方式是Callbacks和combinators,但在某些处理中也会需要用到blocking,并且它也是被Futures and Promises API所支持的。
Futures 通常是异步的,不会阻塞底层执行线程,但是在某些情况下,阻塞是有必要的。我们区分了阻塞线程执行的两种方式:在 future 内部阻塞线程执行以及阻止来自外部的其他 future,直到当前的 future 完成。

### 在 Future 之内阻塞

正如全局 `ExecutionContext` 所示,通过 `blocking` 结构来通知 `ExecutionContext` 暂停是可行的。但是,`ExecutionContext` 可以完全自行决定实现,虽然某些 `ExecutionContext`,比如 `ExecutionContext.global` 通过 `ManagedBlocker` 来实现阻塞,但是在某些上下文下,比如固定线程池:

ExecutionContext.fromExecutor(Executors.newFixedThreadPool(x))

什么也不会做,如下所示:

implicit val ec = ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(4))
Future {
blocking { blockingStuff() }
}

和如下代码等效:

Future { blockingStuff() }

阻塞代码也可能抛出异常,在这种情况下,异常将会转发给调用者。

### 在 Future 之外阻塞

在之前的并发交易(concurrency trading)例子中,在应用的最后有一处用到block来确定是否所有的futures已经完成。这有个如何使用block来处理一个future结果的例子:
正如前面所说的,在 future 的 blocking 非常有效地缓解性能和预防死锁。虽然在 futures 中使用这些功能方面的首选方式是 Callbacks 和 combinators,但在某些处理中也会需要用到 blocking,并且它也是被 Futures and Promises API 所支持的。

在之前的并发交易(concurrency trading)例子中,在应用的最后有一处用到 block 来确定是否所有的 futures 已经完成。这有个如何使用 block 来处理一个future 结果的例子:

import scala.concurrent._
import scala.concurrent.duration._
Expand All @@ -355,11 +493,11 @@ fallbackTo组合器生成的future对象可以在该原future成功完成计算
Await.result(purchase, 0 nanos)
}

在这种情况下这个future是不成功的,这个调用者转发出了该future对象不成功的异常。它包含了失败的投影(projection)-- 阻塞(blocking)该结果将会造成一个NoSuchElementException异常在原future对象被成功计算的情况下被抛出
在这种情况下这个 future 是不成功的,这个调用者转发出了该 future 对象不成功的异常。它包含了失败的投影(projection)-- 阻塞(blocking)该结果将会造成一个 NoSuchElementException 异常在原 future 对象被成功计算的情况下被抛出

相反的,调用`Await.ready`来等待这个future直到它已完成,但获不到它的结果。同样的方式,调用那个方法时如果这个future是失败的,它将不会抛出异常。
相反的,调用 `Await.ready` 来等待这个 future 直到它已完成,但获不到它的结果。同样的方式,调用那个方法时如果这个 future 是失败的,它将不会抛出异常。

The Future trait实现了Awaitable trait还有其`ready()`和`result()`方法。这些方法不能被客户端直接调用,它们只能通过执行环境上下文来进行调用。
Future 特质实现了 Awaitable 特质还有其`ready()`和`result()`方法。这些方法不能被客户端直接调用,它们只能通过执行环境上下文来进行调用。

为了允许程序调用可能是阻塞式的第三方代码,而又不必实现Awaitable特质,原函数可以用如下的方式来调用:

Expand Down Expand Up @@ -470,25 +608,27 @@ completeWith方法将用另外一个future完成promise计算。当该future结

注意,在这种实现方式中,如果f与g都不是成功的,那么`first(f, g)`将不会实现(即返回一个值或者返回一个异常)。

## 工具(Utilities)
## 实用工具(Utilities)

为了简化在并发应用中处理时序(time)的问题,`scala.concurrent`引入了 Duration 抽象类。Duration 并无意成为另外一个表示时间的类,它是为了配合并发库使用的,其位于`scala.concurrent`包中。

Duration 是一个基本的,用于表示时间长短的类,其可以代指有限的时长或者无穷。有限的时长用 FiniteDuration 类来表示,并通过时间长度`(length)`和`java.util.concurrent.TimeUnit`来构造。无穷的时间继承自 Duration,其只包含两种实例,`Duration.Inf` 和 `Duration.MinusInf`。并发库中提供了一些 Durations 的子类用做隐式转换,这些子类不应被开发者直接使用。

为了简化在并发应用中处理时序(time)的问题,`scala.concurrent`引入了Duration抽象。Duration不是被作为另外一个通常的时间抽象存在的。他是为了用在并发(concurrency)库中使用的,Duration位于`scala.concurrent`包中。
抽象类 Duration 包含了如下方法:

Duration是表示时间长短的基础类,其可以是有限的或者无限的。有限的duration用FiniteDuration类来表示,并通过时间长度`(length)`和`java.util.concurrent.TimeUnit`来构造。无限的durations,同样扩展了Duration,只在两种情况下存在,`Duration.Inf`和`Duration.MinusInf`。库中同样提供了一些Durations的子类用来做隐式的转换,这些子类不应被直接使用。
1. 转换到不同的时间单位 `(toNanos, toMicros, toMillis, toSeconds, toMinutes, toHours, toDays and toUnit(unit: TimeUnit))`。
2. 不同时长之间的比较 `(<,<=,> 和 >=)`。
3. 算术运算符 `(+, -, *, / 和 unary_-)`
4. 一个 Duration 对象的最大和最小方法 `(min,max)`。
5. 检查时长是否为无限的 `(isFinite)`。

抽象的Duration类包含了如下方法
Duration 能够用如下方法创建实例

到不同时间单位的转换`(toNanos, toMicros, toMillis, toSeconds, toMinutes, toHours, toDays and toUnit(unit: TimeUnit))`。
durations的比较`(<,<=,>和>=)`。
算术运算符`(+, -, *, / 和单值运算_-)`
duration的最大最小方法`(min,max)`。
测试duration是否是无限的方法`(isFinite)`。
Duration能够用如下方法实例化`(instantiated)`:
1. 通过 Int 和 Long 类型隐式转换,比如 `val d = 100 millis`。
2. 通过传递一个 `Long` 和 `java.util.concurrent.TimeUnit` 来构造。例如:`val d = Duration(100, MILLISECONDS)`。
3. 通过传递一串字符来表示时间区间,例如 `val d = Duration("1.2 µs")`。

隐式的通过Int和Long类型转换得来 `val d = 100 millis`。
通过传递一个`Long length`和`java.util.concurrent.TimeUnit`。例如`val d = Duration(100, MILLISECONDS)`。
通过传递一个字符串来表示时间区间,例如 `val d = Duration("1.2 µs")`。
Duration也提供了unapply方法,因此可以i被用于模式匹配中,例如:
Duration 也提供了 unapply 方法,因此可以被用于模式匹配中,例如:

import scala.concurrent.duration._
import java.util.concurrent.TimeUnit._
Expand Down