JVM 并发性: 使用 Akka 执行异步操作
作者:网络转载 发布时间:[ 2015/9/17 11:32:52 ] 推荐标签:测试开发技术 程序结构
如果尝试运行该代码几次,可能会看到这些行的顺序是相反的。这种排序是 Akka actor 系统动态本质的另一个例子,其中处理各个消息时的顺序是不确定的(但包含我在 “消息传送时间和保证” 中讨论的几个重要例外)。
Java 中的 Greeter
清单 6 显示了清单 5 中的 Akka Greeter 代码的普通 Java 版本。
清单 6. Java 中的 Greeter
public class Hello3 {
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("actor-demo-java");
ActorRef bob = system.actorOf(Greeter.props("Bob", "Howya doing"));
ActorRef alice = system.actorOf(Greeter.props("Alice", "Happy to meet you"));
bob.tell(new Greet(alice), ActorRef.noSender());
alice.tell(new Greet(bob), ActorRef.noSender());
try {
Thread.sleep(1000);
} catch (InterruptedException e) { /* ignore */ }
system.shutdown();
}
// messages
private static class Greet {
public final ActorRef target;
public Greet(ActorRef actor) {
target = actor;
}
}
private static Object AskName = new Object();
private static class TellName {
public final String name;
public TellName(String name) {
this.name = name;
}
}
// actor implementation
private static class Greeter extends UntypedActor {
private final String myName;
private final String greeting;
Greeter(String name, String greeting) {
myName = name;
this.greeting = greeting;
}
public static Props props(String name, String greeting) {
return Props.create(Greeter.class, name, greeting);
}
public void onReceive(Object message) throws Exception {
if (message instanceof Greet) {
((Greet)message).target.tell(AskName, self());
} else if (message == AskName) {
sender().tell(new TellName(myName), self());
} else if (message instanceof TellName) {
System.out.println(greeting + ", " + ((TellName)message).name);
}
}
}
}
清单 7 显示了包含 lambda 的 Java 8 版本。同样,此版本在消息处理的实现方面要更为紧凑,但其他方面都是相同的。
清单 7. Java 8 版本
import akka.japi.pf.ReceiveBuilder;
...
private static class Greeter extends AbstractActor {
private final String myName;
private final String greeting;
Greeter(String name, String greeting) {
myName = name;
this.greeting = greeting;
receive(ReceiveBuilder.
match(Greet.class, g -> { g.target.tell(AskName, self()); }).
matchEquals(AskName, a -> { sender().tell(new TellName(myName), self()); }).
match(TellName.class, t -> { System.out.println(greeting + ", " + t.name); }).
build());
}
public static Props props(String name, String greeting) {
return Props.create(Greeter.class, name, greeting);
}
}
传递属性
Akka 使用 Props 对象将各种配置属性传递给 actor。每个 Props 实例包装 actor 类所需的构造函数参数的一个副本,以及对该类的引用。可通过两种方式将此信息传递给 Props 构造函数。清单 5 中的示例将 actor 的构造函数作为一个名称传递 (pass-by-name) 参数传递给 Props 构造函数。注意,此方式不会直接调用构造函数并传递结果;它传递构造函数调用(如果您有 Java 工作背景,可能觉得这很陌生)。
将 actor 配置传递给 Props 构造函数的另一种方法是,提供 actor 的类作为第一个参数,将 actor 的构造函数参数作为剩余的参数。对于 清单 5 中的示例,这种调用形式为 Props(classOf[Greeter], name, greeting)。
无论使用哪种形式的 Props 构造函数,传递给新 actor 的值都需要可序列化,以便在必要时通过网络将 Props 发送到可运行该 actor 实例的任何地方。对于名称传递构造函数调用的情况,像 清单 5 中的用法,需要将调用发送出 JVM 时,会序列化调用的闭包。
在 Scala 代码中创建 Props 对象的 Akka 建议做法是:在一个配套对象中定义工厂方法,像 清单 5 中所做的那样。对 Props 使用名称传递构造函数调用方法时,此技术可阻止任何问题意外地关闭对 actor 对象的 this 引用。配套对象也是定义 actor 将接收的各种消息的不错地方,这样,所有关联的信息都位于同一位置。对于 Java actor,也可在 actor 类中使用静态构造函数方法,如 清单 6 中所用的方法。
发送消息的 Actor
清单 5 中的每个 Greeter actor 都配置了一个名称和一句问候语,但将问候语告知另一个 actor 时,首先要找到另一个 actor 的名称。Greeteractor 通过向另一个 actor 发送一条单独的消息来完成此任务:AskName 消息。AskName 消息本身不含任何信息,但收到它的 Greeter 实例知道应使用一个包含 TellName 发送方名称的 TellName 消息作为响应。当第一个 Greeter 收到所返回的 TellName 消息时,它打印出自己的问候语。
发送给 actor 的每个消息都包含由 Akka 提供的一些附加信息,特别的是消息发送方的 ActorRef。您可在消息处理过程中的任何时刻,通过调用在 actor 基类上定义的 sender() 方法来访问这些发送方的信息。Greeter actor 在处理 AskName 消息的过程中会使用发送方引用,以便将 TellName 响应发送给正确的 actor。
Akka 允许您代表另一个 actor 发送消息(一种良性的身份盗窃形式),以便收到该消息的 actor 将另一个 actor 视为发送方。这是在 actor 系统中经常使用的一个有用特性,尤其是对于请求-响应类型的消息交换,因为此时您希望将响应传送到不同于发出请求的 actor 的某个地方。actor 外部的应用程序代码所发出的消息,默认将使用名为 deadletter actor 的特殊 Akka 作为发送方。任何时候无法将消息传送给 actor 时,也可使用 deadletter actor,这为用户提供了一种便捷的方式,在 actor 系统中通过打开合适的日志(我将在下一期中介绍)来跟踪无法传送的消息。
设置 actor 的类型
您可能注意到了,示例的消息序列中没有任何类型的信息来明确表明消息的目标是 Greeter 实例。Akka actor 及其交换的消息一般都属于这种情况。甚至用于表示消息目标 actor 的 ActorRef 也是无类型的。
编写无类型的 actor 系统有着实际的优势。您可以 定义 actor 类型(比如通过它们可处理的一组消息),但这么做有误导性。在 Akka 中,actor 可以改变它们的行为(下一期会更详细地介绍此内容),所以不同的消息集可能适合不同的 actor 状态。类型也可能妨碍我们合理地简化 actor 模型,因为系统将所有 actor 视为至少拥有处理任何消息的潜力。
但是 Akka 仍然支持类型化 actor,以防您确实想要使用这种方法。这种支持在连接 actor 和非 actor 节点时有用。您可定义一个接口,非 actor 节点使用该接口与 actor 进行通信,使 actor 看起来更像是正常的程序组件。对于大部分操作,这样做所带来的麻烦太多,可能不值得去做,但考虑到从 actor 系统外部直接将消息发送给 actor 的简单性(从目前为止的任何示例应用程序中可以看到,非 actor 代码可以发送消息),有这个选项也很不错。
消息和可变性
Akka 希望您肯定不会意外地在 actor 之间共享可变的数据。如果共享可变的数据,结果会非常糟 — 比不上在对抗幽灵时穿过自己的质子束(如果您不太熟悉,参见电影做鬼敢死队),但仍然很糟。共享可变数据的问题在于,actor 在单独的线程中运行。如果在 actor 之间共享可变数据,则无法在运行 actor 的线程之间进行协调,所以各个线程不会看到其他线程正在做什么,并且可能通过多种不同的方式对彼此造成破坏。如果正在运行分布式系统,问题会更严重,每个 actor 都将拥有自己的可变数据副本。
所以消息必须是不可变的,而且不仅仅是在表面层面上。如果消息数据中包含任何对象,这些对象必须也是不可变的,依此类推,一直到消息所引用的所有对象。Akka 目前不能强制实施此要求,但 Akka 开发人员希望在将来的某个时刻能强制实施这些限制。如果您希望自己代码在未来的 Akka 版本中仍可使用,那么现在必须留意这一要求。
询问与告诉
清单 5 中的代码使用标准 tell 操作来发送消息。在 Akka 中,也可使用 ask 消息模式作为一种辅助性操作。ask 操作(由 ? 运算符或使用ask 函数表示)发送一条包含 Future 的消息作为响应。在清单 8 中,我们重建了清单 5 中的代码,使用 ask 来代替 tell。
清单 8. 使用 ask
import scala.concurrent.duration._
import akka.actor._
import akka.util._
import akka.pattern.ask
object Hello4 extends App {
import Greeter._
val system = ActorSystem("actor-demo-scala")
val bob = system.actorOf(props("Bob", "Howya doing"))
val alice = system.actorOf(props("Alice", "Happy to meet you"))
bob ! Greet(alice)
alice ! Greet(bob)
Thread sleep 1000
system shutdown
object Greeter {
case class Greet(peer: ActorRef)
case object AskName
def props(name: String, greeting: String) = Props(new Greeter(name, greeting))
}
class Greeter(myName: String, greeting: String) extends Actor {
import Greeter._
import system.dispatcher
implicit val timeout = Timeout(5 seconds)
def receive = {
case Greet(peer) => {
val futureName = peer ? AskName
futureName.foreach { name => println(s"$greeting, $name") }
}
case AskName => sender ! myName
}
}
}
在清单 8 的代码中,TellName 消息已被替换为 ask。ask 操作返回的 future 的类型为 Future[Any],因为编译器对要返回的结果一无所知。当 future 完成时,foreach 使用 import system.dispatcher 语句所定义的隐式调度器来执行 println。如果 future 未完成且在允许的超时(另一个隐式值,在本例中定义为 5 秒)内提供了响应消息,它会完成并抛出超时异常。
在幕后,ask 模式创建一个特殊的一次性 actor 在消息交换中充当中介。该中介会收到一个 Promise 和要发送的消息,以及目标 actor 引用。它发送消息,然后等待期望的响应消息。收到响应后,它会履行承诺并完成初的 actor 所使用的 future。
使用 ask 方法有一些限制。具体来讲,要避免公开 actor 状态(可能导致线程问题),必须确保您未在 future 完成时所执行的代码中使用来自该 actor 的任何可变状态。在实际情况中,为在 actor 之间发送的消息使用 tell 模式通常要更容易。ask 模式更有用的一种情况是,应用程序代码需要从 actor(无论是否具有类型)获取响应时(比如启动 actor 系统和创建初始 actor 的主程序)。
小角色
“在对您明确处理异步操作有所帮助时,应毫不犹豫地向设计中引入新的 actor。”
ask 模式所创建的一次性 actor 是在使用 Akka 时要记住的一种出色设计原则。通常您希望构造您的 actor 系统,以便中间处理步骤是由为这种特定的用途而设计的特殊 actor 所执行的。一个常见的例子是,需要在进入下一处理阶段之前合并不同的异步结果。如果为不同的结果使用消息,您可让一个 actor 来收集各个结果,直到所有结果都准备好,然后触发下一阶段的操作。这基本上是 ask 模式所用的一般的一次性 actor。
Akka actor 是轻型的(每个 actor 实例大约 300 到 400 字节,无论 actor 类使用哪种存储都是如此),所以您可安全地设计程序结构,在适当的时候使用多个 actor。使用专用的 actor 有助于保持代码简单且易于理解,这也是编写并发程序与编写顺序程序相比的一个优势。在对您明确处理异步操作有所帮助时,应毫不犹豫地向设计中引入新的 actor。
补充几句
Akka 是一个强大的系统,但 Akka 和 actor 模型通常都需要一种与直观的过程代码不同的编程风格。对于过程代码,程序结构中的所有调用都是确定的,并且您可查看程序的整个调用树。在 actor 模型中,消息是被乐观地触发的,无法保证它们将会送达,而且常常很难确定事件发生的顺序。actor 模型的好处是,这是一种构建高并发性和可伸缩性应用程序的轻松方式,我在后面的几期中会再次介绍此主题。
希望本文能够让您足够清楚地了解 Akka,激起您进一步探索该内容的欲望。下一次我将更深入地介绍 actor 系统和 actor 交互,包括如何轻松地跟踪系统中各 actor 之间的交互。
本文内容不用于商业目的,如涉及知识产权问题,请权利人联系SPASVO小编(021-61079698-8054),我们将立即处理,马上删除。
相关推荐
Java性能测试有哪些不为众人所知的原则?Java设计模式??装饰者模式谈谈Java中遍历Map的几种方法Java Web入门必知你需要理解的Java反射机制知识总结编写更好的Java单元测试的7个技巧编程常用的几种时间戳转换(java .net 数据库)适合Java开发者学习的Python入门教程Java webdriver如何获取浏览器新窗口中的元素?Java重写与重载(区别与用途)Java变量的分类与初始化JavaScript有这几种测试分类Java有哪四个核心技术?给 Java开发者的10个大数据工具和框架Java中几个常用设计模式汇总java生态圈常用技术框架、开源中间件,系统架构及经典案例等
更新发布
功能测试和接口测试的区别
2023/3/23 14:23:39如何写好测试用例文档
2023/3/22 16:17:39常用的选择回归测试的方式有哪些?
2022/6/14 16:14:27测试流程中需要重点把关几个过程?
2021/10/18 15:37:44性能测试的七种方法
2021/9/17 15:19:29全链路压测优化思路
2021/9/14 15:42:25性能测试流程浅谈
2021/5/28 17:25:47常见的APP性能测试指标
2021/5/8 17:01:11热门文章
常见的移动App Bug??崩溃的测试用例设计如何用Jmeter做压力测试QC使用说明APP压力测试入门教程移动app测试中的主要问题jenkins+testng+ant+webdriver持续集成测试使用JMeter进行HTTP负载测试Selenium 2.0 WebDriver 使用指南