清单 3. 使用 fork-join 框架解决选择大值问题
  public class MaxWithFJ extends RecursiveAction {
  private final int threshold;
  private final SelectMaxProblem problem;
  public int result;
  public MaxWithFJ(SelectMaxProblem problem, int threshold) {
  this.problem = problem;
  this.threshold = threshold;
  }
  protected void compute() {
  if (problem.size < threshold)
  result = problem.solveSequentially();
  else {
  int midpoint = problem.size / 2;
  MaxWithFJ left = new MaxWithFJ(problem.subproblem(0, midpoint), threshold);
  MaxWithFJ right = new MaxWithFJ(problem.subproblem(midpoint +
  1, problem.size), threshold);
  coInvoke(left, right);
  result = Math.max(left.result, right.result);
  }
  }
  public static void main(String[] args) {
  SelectMaxProblem problem = ...
  int threshold = ...
  int nThreads = ...
  MaxWithFJ mfj = new MaxWithFJ(problem, threshold);
  ForkJoinExecutor fjPool = new ForkJoinPool(nThreads);
  fjPool.invoke(mfj);
  int result = mfj.result;
  }
  }
  表 1 显示了在不同系统上从 500,000 个元素的数组中选择大元素的结果,以及改变阙值,使顺序方法优于并行方法。对于大多数运行,fork-join 池中的线程数量与可用的硬件线程(内核数乘以每个内核中的线程数)相等。与顺序方法相比,这些线程数量呈现一种加速比(speedup)。
  表 1. 在不同系统上从 500k 个元素的数组中运行 select-max 的结果
  阙值 = 500k 阙值 = 50k 阙值 = 5k 阙值 = 500 阙值 = -50
  Pentium-4 HT(2 个线程) 1.0 1.07 1.02 .82 .2
  Dual-Xeon HT(4 个线程) .88 3.02 3.2 2.22 .43
  8-way Opteron(8 个线程) 1.0 5.29 5.73 4.53 2.03
  8-core Niagara(32 个线程) .98 10.46 17.21 15.34 6.49
  结果很令人振奋,因为它们在选择各种参数时显示了不错的加速比。因此,只要避免为问题和底层硬件选择完全不合理的参数,会获得不错的结果。使用 chip-multithreading 技术,优的加速比不太明显;像 Hyperthreading 这样的 CMT 方法所提供的性能要低于等价数量的实际内核提供的性能,但是性能损失取决于许多因素,包括正在执行的代码的缓存丢失率(miss rate)。
  此处选择的顺序阙值范围从 500K(数组的大小,表示没有并行性)到 50。在这个例子中,阙值为 50 实在太小,有些不切实际,而且结果显示,顺序阙值太低时,fork-join 任务管理开销将起决定作用。但是表 1 也显示只要避免这种不切实际的过高和过低的参数,会获得不错的结果。选择 Runtime.availableProcessors() 作为工作线程的数量通常会获得与理想结果相近的结果,因为在 fork-join 池中执行的任务都是和 CPU 绑定的,但是,只要避免设置过大或过小的池,这个参数对结果不会有太大影响。
  MaxWithFJ 类中不需要显式同步。它操作的数据对于问题的生命周期来说是不变的,并且 ForkJoinExecutor 中有足够的内部同步可以保证问题数据对于子任务的可视性,也可以保证子任务结果对于跟它们结合的任务的可视性。
  fork-join 框架剖析
  可以有很多方法实现 清单 3 中演示的 fork-join 框架。可以使用原始的线程;Thread.start() 和 Thread.join() 提供了所有必要的功能。然而,这种方法需要的线程数可能比 VM 所能支持的数量更多。对于大小为 N(假设为一个很小的顺序阙值)的问题,将需要 O(N) 个线程来解决问题(问题树深度为 log2N,深度为 k 的二进制树有 2k 个节点)。在这些线程中,半数线程会用几乎整个生命周期来等待子任务的完成。创建线程会占用许多内存,这使得这种方法受到限制(尽管这种方法也能工作,但是代码非常复杂,并且需要仔细针对问题大小和硬件进行参数调优)。
  使用传统的线程池来实现 fork-join 也具有挑战性,因为 fork-join 任务将线程生命周期的大部分时间花费在等待其他任务上。这种行为会造成线程饥饿死锁(thread starvation deadlock),除非小心选择参数以限制创建的任务数量,或者池本身非常大。传统的线程池是为相互独立的任务设计的,而且设计中也考虑了潜在的阻塞、粗粒度任务 — fork-join 解决方案不会产生这两种情况。对于传统线程池的细粒度任务,也存在所有工作线程共享的任务队列发生争用的情况。
  工作窃取
  fork-join 框架通过一种称作工作窃取(work stealing) 的技术减少了工作队列的争用情况。每个工作线程都有自己的工作队列,这是使用双端队列(或者叫做 deque)来实现的(Java 6 在类库中添加了几种 deque 实现,包括 ArrayDeque 和 LinkedBlockingDeque)。当一个任务划分一个新线程时,它将自己推到 deque 的头部。当一个任务执行与另一个未完成任务的合并操作时,它会将另一个任务推到队列头部并执行,而不会休眠以等待另一任务完成(像 Thread.join() 的操作一样)。当线程的任务队列为空,它将尝试从另一个线程的 deque 的尾部 窃取另一个任务。
  可以使用标准队列实现工作窃取,但是与标准队列相比,deque 具有两方面的优势:减少争用和窃取。因为只有工作线程会访问自身的 deque 的头部,deque 头部永远不会发生争用;因为只有当一个线程空闲时才会访问 deque 的尾部,所以也很少存在线程的 deque 尾部的争用(在 fork-join 框架中结合 deque 实现会使这些访问模式进一步减少协调成本)。跟传统的基于线程池的方法相比,减少争用会大大降低同步成本。此外,这种方法暗含的后进先出(last-in-first-out,LIFO)任务排队机制意味着大的任务排在队列的尾部,当另一个线程需要窃取任务时,它将得到一个能够分解成多个小任务的任务,从而避免了在未来窃取任务。因此,工作窃取实现了合理的负载平衡,无需进行协调并且将同步成本降到了小。
  结束语
  fork-join 方法提供了一种表示可并行化算法的简单方式,而不用提前了解目标系统将提供多大程度的并行性。所有的排序、搜索和数字算法都可以进行并行分解(以后,像 Arrays.sort() 这样的标准库机制将会使用 fork-join 框架,允许应用程序免费享有并行分解的益处)。随着处理器数量的增长,我们将需要在程序内部使用更多的并行性,以有效利用这些处理器;对计算密集型操作(比如排序)进行并行分解,使程序能够更容易利用未来的硬件。