这篇文章讨论了Java应用中并行处理的多种方法。从自己管理Java线程,到各种更好几的解决方法,Executor服务、ForkJoin 框架以及计算中的Actor模型。
  Java并发编程的4种风格:Threads,Executors,ForkJoin和Actors
  我们生活在一个事情并行发生的世界。自然地,我们编写的程序也反映了这个特点,它们可以并发的执行。当然除了Python代码(译者注:链接里面讲述了Python的全局解释器锁,解释了原因),不过你仍然可以使用Jython在JVM上运行你的程序,来利用多处理器电脑的强大能力。
  然而,并发程序的复杂程度远远超出了人类大脑的处理能力。相比较而言,我们简直弱爆了:我们生来不是为了思考多线程程序、评估并发访问有限资源以及预测哪里会发生错误或者瓶颈。
  面对这些困难,人类已经总结了不少并发计算的解决方案和模型。这些模型强调问题的不同部分,当我们实现并行计算时,可以根据问题做出不同的选择。
  在这篇文章中,我将会用对同一个问题,用不同的代码来实现并发的解决方案;然后讨论这些方案有哪些好的地方,有哪些缺陷,可能会有什么样的陷阱在等着你。
  我们将介绍下面几种并发处理和异步代码的方式:
  裸线程
  Executors和Services
  ForkJoin框架和并行流
  Actor模型
  为了更加有趣一些,我没有仅仅通过一些代码来说明这些方法,而是使用了一个共同的任务,因此每一节中的代码差不多都是等价的。另外,这些代码仅仅是展示用的,初始化的代码并没有写出来,并且它们也不是产品级的软件示例。
  对了,后一件事:在文章后,有一个小调查,关于你或者你的组织正在使用哪种并发模式。为了你的工程师同胞们,请填一下调查!
  任务
  任务:实现一个方法,它接收一条消息和一组字符串作为参数,这些字符串与某个搜索引擎的查询页面对应。对每个字符串,这个方法发出一个http请求来查询消息,并返回第一条可用的结果,越快越好。
  如果有错误发生,抛出一个异常或者返回空都是可以的。我只是尝试避免为了等待结果而出现无限循环。
  简单说明:这次我不会真正深入到多线程如何通讯的细节,或者深入到Java内存模型。如果你迫切地想了解这些,你可以看我前面的文章利用JCStress测试并发。
  那么,让我们从直接、核心的方式来在JVM上实现并发:手动管理裸线程。
  方法1:使用“原汁原味”的裸线程
  解放你的代码,回归自然,使用裸线程!线程是并发基本的单元。Java线程本质上被映射到操作系统线程,并且每个线程对象对应着一个计算机底层线程。
  自然地,JVM管理着线程的生存期,而且只要你不需要线程间通讯,你也不需要关注线程调度。
  每个线程有自己的栈空间,它占用了JVM进程空间的指定一部分。
  线程的接口相当简明,你只需要提供一个Runnable,调用.start()开始计算。没有现成的API来结束线程,你需要自己来实现,通过类似boolean类型的标记来通讯。
  在下面的例子中,我们对每个被查询的搜索引擎,创建了一个线程。查询的结果被设置到AtomicReference,它不需要锁或者其他机制来保证只出现一次写操作。开始吧!

 

private static String getFirstResult(String question, List<String> engines) {
AtomicReference<String> result = new AtomicReference<>();
for(String base: engines) {
String url = base + question;
new Thread(() -> {
result.compareAndSet(null, WS.url(url).get());
}).start();
}
while(result.get() == null); // wait for some result to appear
return result.get();
}

  使用裸线程的主要优点是,你很接近并发计算的操作系统/硬件模型,并且这个模型非常简单。多个线程运行,通过共享内存通讯,是这样。
  自己管理线程的大劣势是,你很容易过分的关注线程的数量。线程是很昂贵的对象,创建它们需要耗费大量的内存和时间。这是一个矛盾,线程太少,你不能获得良好的并发性;线程太多,将很可能导致内存问题,调度也变得更复杂。
  然而,如果你需要一个快速和简单的解决方案,你可以使用这个方法,不要犹豫。
  方法2:认真对待Executor和CompletionService
  另一个选择是使用API来管理一组线程。幸运的是,JVM为我们提供了这样的功能,是Executor接口。Executor接口的定义非常简单:
  public interface Executor {
  void execute(Runnable command);
  }
  它隐藏了如何处理Runnable的细节。它仅仅说,“开发者!你只是一袋肉,给我任务,我会处理它!”
  更酷的是,Executors类提供了一组方法,能够创建拥有完善配置的线程池和executor。我们将使用newFixedThreadPool(),它创建预定义数量的线程,并不允许线程数量超过这个预定义值。这意味着,如果所有的线程都被使用的话,提交的命令将会被放到一个队列中等待;当然这是由executor来管理的。
  在它的上层,有ExecutorService管理executor的生命周期,以及CompletionService会抽象掉更多细节,作为已完成任务的队列。得益于此,我们不必担心只会得到第一个结果。
  下面service.take()的一次调用将会只返回一个结果。

 

private static String getFirstResultExecutors(String question, List<String> engines) {
ExecutorCompletionService<String> service = new ExecutorCompletionService<String>(Executors.newFixedThreadPool(4));
for(String base: engines) {
String url = base + question;
service.submit(() -> {
return WS.url(url).get();
});
}
try {
return service.take().get();
}
catch(InterruptedException | ExecutionException e) {
return null;
}
}

  如果你需要精确的控制程序产生的线程数量,以及它们的精确行为,那么executor和executor服务将是正确的选择。例如,需要仔细考虑的一个重要问题是,当所有线程都在忙于做其他事情时,需要什么样的策略?增加线程数量或者不做数量限制?把任务放入到队列等待?如果队列也满了呢?无限制的增加队列大小?
  感谢JDK,已经有很多配置项回答了这些问题,并且有着直观的名字,例如上面的Executors.newFixedThreadPool(4)。
  线程和服务的生命周期也可以通过选项来配置,使资源可以在恰当的时间关闭。的不便之处是,对新手来说,配置选项可以更简单和直观一些。然而,在并发编程方面,你几乎找不到更简单的了。
  总之,对于大型系统,我个人认为使用executor合适。
  方法3:通过并行流,使用ForkJoinPool (FJP)
  Java 8中加入了并行流,从此我们有了一个并行处理集合的简单方法。它和lambda一起,构成了并发计算的一个强大工具。
  如果你打算运用这种方法,那么有几点需要注意。首先,你必须掌握一些函数编程的概念,它实际上更有优势。其次,你很难知道并行流实际上是否使用了超过一个线程,这要由流的具体实现来决定。如果你无法控制流的数据源,你无法确定它做了什么。
  另外,你需要记住,默认情况下是通过ForkJoinPool.commonPool()实现并行的。这个通用池由JVM来管理,并且被JVM进程内的所有线程共享。这简化了配置项,因此你不用担心。
  private static String getFirstResult(String question, List<String> engines) {
  // get element as soon as it is available
  Optional<String> result = engines.stream().parallel().map((base) -> {
  String url = base + question;
  return WS.url(url).get();
  }).findAny();
  return result.get();
  }
  看上面的例子,我们不关心单独的任务在哪里完成,由谁完成。然而,这也意味着,你的应用程序中可能存在一些停滞的任务,而你却无法不知道。在另一篇关于并行流的文章中,我详细地描述了这个问题。并且有一个变通的解决方案,虽然它并不是世界上直观的方案。
  ForkJoin是一个很好的框架,由比我更聪明的人来编写和预先配置。因此当我需要写一个包含并行处理的小型程序时,它是我的第一选择。
  它大的缺点是,你必须预见到它可能产生的并发症。如果对JVM没有整体上的深入了解,这很难做到。这只能来自于经验。