JVM 并发性: 使用 Akka 执行异步操作
作者:网络转载 发布时间:[ 2015/9/17 11:32:52 ] 推荐标签:测试开发技术 程序结构
本 系列 中以前的文章介绍了如何通过以下方式实现并发性:
并行地在多个数据集上执行相同的操作(像 Java 8 流一样)
显式地将计算构建成异步执行某些操作,然后将结果组合在一起(像 future 一样)。
这两种方法都是实现并发性的不错方式,但是您必须将它们明确地设计到应用程序中。
在本文和接下来的几篇文章中,我将着重介绍一种不同的并发性实现方法,该方法基于一种特定的程序结构,与显式编码方法不同。这种程序结构是 actor 模型。您将了解如何使用 actor 模型的 Akka 实现。(Akka 是一个构建并发和分布式 JVM 应用程序的工具包和运行时。)请参阅 参考资料,获取本文完整示例代码的链接。
Actor 模型基础知识
用于并发计算的 actor 模型基于各种称为 actor 的原语来构建系统。Actor 执行操作来响应称为消息 的输入。这些操作包括更改 actor 自己的内部状态,以及发出其他消息和创建其他 actor。所有消息都是异步交付的,因此将消息发送方与接收方分开。正是由于这种分离,导致 actor 系统具有内在的并发性:可以不受限制地并行执行任何拥有输入消息的 actor。
在 Akka 术语中,actor 看起来像是某种通过消息进行交互的行为神经束。像真实世界的演员一样,Akka actor 也需要一定程度的隐私。您不能直接将消息发送给 Akka actor。相反,需要将消息发送给等同于邮政信箱的 actor 引用。然后通过该引用将传入的消息路由到 actor 的邮箱,以后再传送给 actor。Akka actor 甚至要求所有传入的消息都是无菌的(或者在 JVM 术语中叫做不可变的),以免受到其他 actor 的污染。
与一些真实世界中演员的需求不同,Akka 中由于某种原因而存在一些看似强制要求的限制。使用 actor 的引用可阻止交换消息以外的任何交互,这些交互可能破坏 actor 模型核心上的解耦本质。Actor 在执行上是单线程的(不超过 1 个线程执行一个特定的 actor 实例),所以邮箱充当着一个缓冲器,在处理消息前会一直保存这些消息。消息的不可变性(由于 JVM 的限制,目前未由 Akka 强制执行,但这是一项既定的要求)意味着根本无需担心可能影响 actor 之间各种共享的数据的同步问题;如果只有共享的数据是不可变的,那么根本不需要同步。
开始实现
现在您已大体了解了 actor 模型和 Akka 细节,是时候看些代码了。使用 hello 作为编码示例司空见惯,但它确实能够帮助用户快速、轻松地理解一种语言或系统。清单 1 显示了 Scala 中的一个 Akka 版本。
清单 1. 简单的 Scala hello
import akka.actor._
import akka.util._
/** Simple hello from an actor in Scala. */
object Hello1 extends App {
val system = ActorSystem("actor-demo-scala")
val hello = system.actorOf(Props[Hello])
hello ! "Bob"
Thread sleep 1000
system shutdown
class Hello extends Actor {
def receive = {
case name: String => println(s"Hello $name")
}
}
}
清单 1 中的代码分为两个单独的代码段,它们都包含在 Hello1 应用程序项目中。第一个代码段是 Akka 应用程序基础架构,它:
创建一个 actor 系统(ActorSystem(...) 行)。
在系统内创建一个 actor(system.actorOf(...) 行,它为所创建的 actor 返回一个 actor 引用)。
使用 actor 引用向 actor 发送消息(hello !"Bob" 行)。
等待一秒钟,然后关闭 actor 系统(system shutdown 行)。
system.actorOf(Props[Hello]) 调用是创建 actor 实例的推荐方式,它使用了专门用于 Hello actor 类型的配置属性。对于这个简单的 actor(扮演一个小角色,只有一句台词),没有配置信息,所以 Props 对象没有参数。如果想在您的 actor 上设置某种配置,可专门为该 actor 定义一个其中包含了所有必要信息的 Props 类。(后面的示例会展示如何操作。)
hello !"Bob" 语句将一条消息(在本例中为字符串 Bob)发送给已创建的 actor。! 运算符是 Akka 中表示将一条消息发送到 actor 的便捷方式,采用了触发即忘的模式。如果不喜欢这种特殊的运算符风格,可使用 tell() 方法实现相同的功能。
第二段代码是 Hello actor 定义,以 class Hello extends Actor 开头。这个特定的 actor 定义非常简单。它定义必需的(对于所有 actor)局部函数 receive,该函数实现了传入消息的处理方式。(receive 是一个局部函数,因为仅为一些输入定义了它 — 在本例中,仅为 String 消息输入定义了该函数。)为这个 actor 所实现的处理方法是,只要收到一条 String 消息,使用该消息值打印一条问候语。
Java 中的 Hello
清单 2 给出了清单 1 中的 Akka Hello 在普通 Java 中的表示。
清单 2. Java 中的 Hello
import akka.actor.*;
public class Hello1 {
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("actor-demo-java");
ActorRef hello = system.actorOf(Props.create(Hello.class));
hello.tell("Bob", ActorRef.noSender());
try {
Thread.sleep(1000);
} catch (InterruptedException e) { /* ignore */ }
system.shutdown();
}
private static class Hello extends UntypedActor {
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
System.out.println("Hello " + message);
}
}
}
}
清单 3 显示了来自包含 lambda 的 Java 8 的 actor 定义,以及 lambda 支持的 ReceiveBuilder 类所需要的导入。清单 3 或许更加紧凑,但与清单 2 大同小异。
清单 3. Java 8 的 Akka Hello 版本
import akka.japi.pf.ReceiveBuilder;
...
private static class Hello extends AbstractActor {
public Hello() {
receive(ReceiveBuilder.
match(String.class, s -> { System.out.println("Hello " + s); }).
build());
}
}
与清单 2 相比,清单 3 中的 Java 8 代码使用了一个不同的基类 —AbstractActor 代替 UntypedActor— 而且还使用了一种不同的方式来定义消息处理方案。ReceiveBuilder 类允许您使用 lambda 表达式来定义消息的处理方式,并采用了类似 Scala 的匹配语法。如果您主要在 Scala 中进行开发工作,此技术可能有助于让 Java Akka 代码看起来更简洁,但使用 Java 8 特定版本的好处似乎有些微不足道。
为什么还要等待?
在主应用程序代码中,将消息发送到 actor 之后,会有一次 Thread sleep 1000 形式的等待,然后才会关闭系统。您可能想知道为什么需要等待。毕竟,消息很容易处理;难道消息没有立即传到 actor,在 hello !"Bob" 语句完成时还在处理当中?
这个问题的答案很简单:“不是”。Akka actor 是异步运行的,所以即使目标 actor 与发送方 actor 位于相同的 JVM 中,目标 actor 也绝不会立即开始执行。相反,处理该消息的线程会将消息添加到目标 actor 的邮箱中。将消息添加到邮箱中会触发一个线程,以便从邮箱获取该消息并调用 actor 的 receive 方法来处理。但从邮箱获取消息的线程通常不同于将消息添加到邮箱的线程。
消息传送时间和保证
“为什么还要等待?” 这一问题的简短答案的背后是一种更深入的原理。Akka 支持 actor 远程通信且具有位置透明性,意味着您的代码没有任何直接的方式来了解一个特定的 actor 是位于同一 JVM 中,还是在系统外的云中某处运行。但这两种情况在实际操作中显然具有完全不同的特征。
“Akka 无法保证消息将被传送到目的地。这种无保证传送背后的哲学原理是 Akka 的核心原理之一。 ”
一个差别与消息丢失有关。Akka 无法保证消息将被传送到目的地,熟悉消息传递系统(用于连接应用程序)的开发人员可能对此很吃惊。这种无保证传送背后的哲学原理是 Akka 的核心原理之一:针对失败而设计。作为一种有意为之的过度简化,可以认为传送保证为消息传输系统添加了很高的复杂性,而且这些更复杂的系统有时无法按预期运行,而应用程序代码还必须涉及恢复操作。这种原理在应用程序代码始终自行处理传送失败情况时很有意义,能够让消息传送系统保持简单。
Akka 可以 保证消息多传送一次,而且绝不会无序地收到从一个 actor 实例发送到另一个 actor 实例的消息。但后者仅适用于特定的 actor 对,二者没有联系。如果 actor A 将消息发送给 actor B,这些消息绝不会被打乱顺序。如果 actor A 将消息发送给 actor C,情况也是如此。但是,如果 actor B 也将消息发送给 actor C(例如将来自 A 的消息转发给 C),B 的消息相对于来自 A 的消息而言可能是乱序的。
在 清单 1 的代码中,消息丢失的概率非常低,因为代码在单个 JVM 中运行,不会生成过多的消息负载。(过多的消息负载可能导致消息丢失。如果 Akka 没有空间来存储消息,例如它没有备用方案,那么只能丢弃消息。)但清单 1 代码的结构仍未对消息传送事件做出任何假设,而且允许 actor 系统执行异步操作。
Actor 和状态
Akka 的 actor 模型很灵活,支持所有类型的 actor。可以使用没有状态信息的 actor(像 Hello1 示例中一样),但这些 actor 可能等效于方法调用。添加状态信息可实现更为灵活的 actor 函数。
清单 1 提供了一个完整的(但很普通)actor 系统示例 — 但拥有一个始终执行同一工作的 actor。每位演员都讨厌反复重复同一句话,所以清单 4 向 actor 添加了一些状态信息,使工作变得更有趣。
清单 4. Polyglot Scala hello
object Hello2 extends App {
case class Greeting(greet: String)
case class Greet(name: String)
val system = ActorSystem("actor-demo-scala")
val hello = system.actorOf(Props[Hello], "hello")
hello ! Greeting("Hello")
hello ! Greet("Bob")
hello ! Greet("Alice")
hello ! Greeting("Hola")
hello ! Greet("Alice")
hello ! Greet("Bob")
Thread sleep 1000
system shutdown
class Hello extends Actor {
var greeting = ""
def receive = {
case Greeting(greet) => greeting = greet
case Greet(name) => println(s"$greeting $name")
}
}
}
清单 4 中的 actor 知道如何处理两种不同类型的消息,这些消息在清单的开头附近定义:Greeting 消息和 Greet 消息,每条消息都包装了一个字符串值。修改后的 Hello actor 收到 Greeting 消息时,会将所包装的字符串保存为 greeting 值。收到 Greet 消息时,则将已保存的 greeting 值与 Greet 字符串组合起来,形成终的消息。下面在运行此应用程序时,我们可以看到在控制台中打印出的消息(但消息不一定是按此顺序出现的,因为 actor 执行顺序是不确定的):
Hello Bob
Hello Alice
Hola Alice
Hola Bob
清单 4 中并没有太多的新代码,所以我没有提供其 Java 版本。您可在代码下载内容中找到它们(参见 参考资料),名为com.sosnoski.concur.article5java.Hello2 和 com.sosnoski.concur.article5java8.Hello2。
属性和交互
真正的 actor 系统会使用多个 actor 来完成工作,它们彼此发送消息来进行交互。并且常常需要为这些 actor 提供配置信息,以准备履行其具体的职责。清单 5 基于 Hello 示例中使用的技术,展示了简化版的 actor 配置和交互。
清单 5. Actor 属性和交互
object Hello3 extends App {
import Greeter._
val system = ActorSystem("actor-demo-scala")
val bob = system.actorOf(props("Bob", "Howya doing"))
val alice = system.actorOf(props("Alice", "Happy to meet you"))
bob ! Greet(alice)
alice ! Greet(bob)
Thread sleep 1000
system shutdown
object Greeter {
case class Greet(peer: ActorRef)
case object AskName
case class TellName(name: String)
def props(name: String, greeting: String) = Props(new Greeter(name, greeting))
}
class Greeter(myName: String, greeting: String) extends Actor {
import Greeter._
def receive = {
case Greet(peer) => peer ! AskName
case AskName => sender ! TellName(myName)
case TellName(name) => println(s"$greeting, $name")
}
}
}
清单 5 在领导角色中包含了一个新的 actor,即 Greeter actor。Greeter 在 Hello2 示例的基础上更进了一步,包含:
所传递的属性,目的是配置 Greeter 实例
定义了配置属性和消息的 Scala 配套对象(如果您有 Java 工作背景,可将这个配套对象视为与 actor 类同名的静态 helper 类)
在 Greeter actor 的实例间发送的消息
此代码生成的输出很简单:
Howya doing, Alice
Happy to meet you, Bob
相关推荐
更新发布
功能测试和接口测试的区别
2023/3/23 14:23:39如何写好测试用例文档
2023/3/22 16:17:39常用的选择回归测试的方式有哪些?
2022/6/14 16:14:27测试流程中需要重点把关几个过程?
2021/10/18 15:37:44性能测试的七种方法
2021/9/17 15:19:29全链路压测优化思路
2021/9/14 15:42:25性能测试流程浅谈
2021/5/28 17:25:47常见的APP性能测试指标
2021/5/8 17:01:11