在近两年里,Sun引入了很多针对企业应用程序开发的API。其中令人兴奋的是Java消息服务或者JMS。JMS API是为了在企业中进行消息传递而设计的,其中,JNDI的任务是为目录服务命名和提供目录服务,而JNBC的任务是提供数据库访问。JMS是为了给企业消息传递提供一种通用工具而设计的API,它没有考虑将消息发送到任何一台应用服务器的底层实现,也没有考虑您希望使用的其他企业消息传递软件技术。对于那些创建或者使用面向消息的中间件(MOM)的开发人员,尤其是那些需要在自己的产品中使用这些工具的Java开发人员,这是一个很大的进步。通过JMS,您应该能够编写一组针对JMS API进行消息传递的代码,然后在任何提供JMS支持的消息传递系统中使用这些代码。
  在本文中,我将向您展示如何创建一系列的消息生产者和消费者,他们利用了JMS API的大部分函数来使用持久存储的主题和队列、创建临时目的地、通过消息选择器过滤消息,等等。我将在JMS的第一个供应商的实现中实现这些例子,该实现是随BEA的WebLogic应用服务器一起提供的(关于该服务器以及配置JMS的详细信息,可以从www.JavaDevelopersJournal.com的“install.txt”在线文件中获得)。在这一过程中,我将带您一起经历实现这些应用程序的必要步骤,其中包括修改应用服务器来支持您将创建的示例代码。
  在继续创建下面的示例代码之前,建议您先获得并安装新版本的BEA/WebLogic应用服务器。可以从BEA的网站http://ecommerce.bea.com/index.jsp下载免费使用试用版。
  代码
  包括在线的源文件在内,总共有4个源文件展示了如何在JMS中使用两种不同的消息传递类型:发布/订阅和点对点。Sender.java和Receiver.java展示了如何在JMS中使用Queue进行点对点消息传递,而Publisher.java和Subscriber.java展示了如何使用Topic进行发布/订阅消息传递。“readme.txt”文件(也在网上)包含运行各种应用程序的命令以及有关代码的新信息。在尝试运行这些应用程序之前,一定要先阅读“install.txt”文件,并根据指示对WebLogic服务器进行必要的修改。因为示例代码中还包含一些注释和JMS的更多细节,本文无法完全介绍这些细节,因此,我们鼓励读者检查源代码并运行这些应用程序。
  Java消息服务
  Java消息服务被设计为一组接口,这些接口将由MOM供应商和其他希望支持消息传递的供应商(应用服务器供应商、数据库服务器供应商,等等)实现。这些接口为希望实现代码的客户应用提供了一个公共API,针对可能的、给定数量的底层消息传递系统进行消息传递。
  同时,JMS的设计非常灵活,它可以在保持可移植性的同时提供对现有消息传递系统的广泛支持。因此,它不支持每个消息传递系统中每种可能的消息传递选择。尽管您可能熟悉任何给定的MOM产品,但JMS不会自动支持该产品的每一个方面。
  JMS中的主要概念是Destination。Destination仅仅是消息生产者与消息消费者之间的一种关联。Destination分为两种类型:Topic和Queue。为每种Destination类型定义的单独接口允许供应商支持其中一种类型或者两种类型都支持,这取决于供应商的消息传递工具。Topic是为发布/订阅消息传递而设计的,而Queue是为客户对客户(点对点)的消息传递设计的。这两种Destination类型都支持持久性。JMS还提供了对事务的支持。事务使您能够支持消息传递操作的提交和回滚。因此,如果事务中的操作失败了,那么您可以对发生在该事务中的所用消息传递操作执行回滚。同样地,如果每件事都很顺利,那么您可以提交这些消息传递操作,使它们持久可用。因为JMS事务超出了本文的讨论范围,所以我将让您自己去研究JMS的这些方面。
  初始化JMS
  连接工厂

  要初始化JMS,则需要使用ConnectionFactory,它是一个标记接口,其中没有任何方法,并且可以通过TopicConnectionFactory或者QueueConnectionFactory得以扩展。供应商实现这些工厂接口中的一个或者两个接口均实现,以提供对其特定消息传递服务实现的访问。WebLogic提供了每种Factory类型的一般实现。Factory,以及Topic和Queue,都被认为是“托管”对象,因此它们都是WebLogic服务器在开机时创建的(在我们的例子中,我们将使用WebLogic的JMS实现)。然后,可以通过JNDI检索这些托管对象。还可以使用JNDI命名上下文来检索这些托管对象。
  如果这看起来有点让您感到迷惑,不要担心,示例代码会使您很容易理解它们。您还可以在WebLogic中创建自己的Factory对象,而不是使用默认的Factory,这项操作是在weblogic.properties文件中完成的。用户定义的Factory将在后面进行讨论,这一节主要关注Topic,定义自己的Factory的详细信息包含在“install.txt”文件中,该文件以及本文的源代码都可以在网上找到。
  以下代码来自Sender.java示例,它将向您展示如何初始化JMS,以及如何从JNDI中获得Queue的ConnectionFactory:
  public final String JMS_FACTORY = "javax.jms.QueueConnectionFactory";
  ...
  queueFx = (QueueConnectionFactory) initCtx.lookup(JMS_FACTORY);
  Queue的默认ConnectionFactory的名称被传递到初始JNDI命名上下文的lookup()方法中。然后会把对QueueConnectionFactory实现的引用从WebLogic应用服务器返回给我们(Sender.java 示例代码中的getInitialContext()方法展示了初始化JNDI和从WebLogic中获得初始命名上下文的细节。您可以从Sun的网站获得关于API的详细信息。网址是:http://java.sun.com/products/jndi/1.2/javadoc/index.html)。
  因为ConnectionFactories 是托管对象,所以由WebLogic应用服务器负责为Queue、Topic以及所有用户定义的Factory创建默认ConnectionFactories,并将它们与WebLogic JNDI实现中的名称绑定在一起,以便通过客户端代码进行查找。因此,在JNDI中,Queue的默认ConnectionFactory的名称是“javax.jms.QueueConnectionFactory”,在通过JNDI查找该名称的时候,将返回一个对这个接口的默认实现的引用,该引用是由WebLogic提供的。Topic也有一个默认的ConnectionFactory,它遵循相同的格式,即“javax.jms.TopicConnectionFactory”。通过JNDI进行查找将返回一个对TopicConnectionFactory默认实现的引用,它也是WebLogic提供的。
  连接
  在成功创建正确的ConnectionFactory后,下一步将是创建一个“连接”,它是JMS定义的一个接口,表示连接到底层消息传递供应商的客户机连接。ConnectionFactory负责返回可以与底层消息传递系统进行通信的Connection实现。通常客户机只使用单一连接。根据JMS文档,Connection的目的是“利用JMS提供者封装开放的连接”,以及表示“客户机与提供者服务例程之间的开放TCP/IP套接字。该文档还指出Connection应该是进行客户机身份验证的地方,除了其他一些事项外,客户机还可以指定惟一标志符。像ConnectionFactories一样,Connections也有两种类型,这取决于您将使用的Destination类型。QueueConnection和TopicConnection 都扩展了基本的Connection接口,通常,您只能使用其中的一个,使用哪一个则取决于客户将使用的消息传递方式。Connection的创建是通过ConnectionFactory完成的。Connection包含两种重要的方法:start和stop,开始和停止在连接过程中发送和接收消息。请参见清单1中的完整代码块。
  会话
  一旦从ConnectionFactory中获得一个Connection,必须从Connection中创建一个或者多个会话。Session也是基本接口,同时,有两种扩展基本接口的特定于Destination的接口:QueueSession和TopicSession,它们可以提供特定于Queue或者Topic的Session方法。Session是为Destination类型生产消息的消费者或生产者的工厂的一种类型(如果Destination类型是QueueSession,那么它将创建面向Queue的生产者和消费者;如果Destination类型是TopicSession,那么它将创建面向Topic的生产者和消费者)。
  Session可以被事务化,也可以不被事务化,通常,您可以通过向Connection上的适当创建方法传递一个布尔参数对此进行设置。同样,您也可以向Connection的Session创建方法传递一个参数,该方法可以设置将创建的Session的消息确认模式,而该模式将指定是由客户机还是由消费者来确认它所检索的任何信息(如果使用事务机制,则忽略该参数)。Session提供的其他方法是各种消息创建方法,这些方法允许您创建包含文本、字节、属性甚至串行化Java对象的特定类型的JMS消息(有关的更多信息在下面的Message接口上)。图1是实际JMS接口的继承和创建关系的图表,请参见清单1中的代码(暂缺图)
  目的地(Destination)
  在创建任何消息的生产者或消费者之前,必须要有特定的Destination,通过它您可以将任何生产者与消费者联系起来。记住,Destination是托管对象,类似ConnectionFactories。这意味着Destination是由WebLogic维护的,必须通过JNDI查找来检索它。这还意味着,在这种情况下,Destination必须是事先定义好的。但这并不是说您总是要提前创建Destination。JMS API提供了创建临时Destination的能力,但这个Destination只能在创建它的Session的生存周期中使用,JMS API也能在运行时创建性Destination。然而,在JMS当前的WebLogic实现中,应该注意的是,如果通过Session创建Destination,那么只要WebLogic服务器在运行,这些Destination会存在。如果服务器出故障或者死机了,那么Destination也会不存在。创建真正的性Destination的方法是在weblogic.properties文件中创建它(请参阅“install.txt”文件,以获得关于如何做到这一点的详细信息)。
  鉴于本文的目的,我们的Destination是通过weblogic.properties文件提前创建的。该文件中定义的Destination是WebLogic应用服务器在开机时创建的,并且可以通过JNDI用于客户代码。清单1显示了Send.java文件中的代码,展示了如何创建QueueConnection和QueueSession,以及如何像检索Sender应用程序的包层次结构那样检索Destination,我们在weblogic.properties文件中定义的队列的名称为“jdj.article.queue.sender”。
  消息的消费者和消息的生产者
  JMS初始化过程的后一个阶段是创建MessageConsumers和MessageProducers。像 ConnectionFactory一样,它们也有Connection 和Session两个基本接口,为了使用Topic或者Queue,还有一些扩展这两个基本接口的特定于Destination的基本接口。(我在使用MessageProducer时使用Producer这个术语,在使用MessageConsumer时使用Consumer这个术语)。MessageConsumers用于检索发送到Destination的消息,MessageProducers用于将消息发送到Destination。二者均由Session实例创建。MessageProducer由特定于Destination的接口QueueSender和TopicPublisher扩展。MessageConsumer则由QueueReceiver和TopicSubscriber接口扩展。
  一旦已经创建了自己的MessageConsumer和/或MessageProducer,也做好了接收和/或发送消息的所有准备。因为生产者和消费者的创建是特定于Queue或Topic的,所以我将在下面的相关小节中讨论特定于Destination类型的这两种类型的处理。
  消息
  在进入下一步并开始深入探讨Topic和Queue中的消息发送和接收之前,我们需要讨论另一个接口,即Message,它表示JMS消息本身。这个对象包含将发送到Destination的信息,以及正在Destination上进行监听的消费者接收的信息。Message是由会话实例创建的,它由三个部分组成:消息头、属性和消息体。消息头是用来识别和路由消息的,客户端开发人员通常不会看到或者处理消息头信息。属性支持通过消息传递的特定于应用程序的值。这些属性字段是预先定义的,对它们的完整描述可以在JMS文档中找到。我将在示例代码中使用一些属性。消息的主体部分是消息的实际“有效负载”,它有5种类型,可以通过5个特定的接口来表示它们:StreamMessage、MapMessage、ObjectMessage、TextMessage 和BytesMessage(参见图1)。
  持久性
  JMS还有一个重要方面值得讨论:它支持消息的持久传递。很简单,这意味着当消息发送到Destination时,如果Consumer没有运行或不可用,那么这个消息将被保存起来,直至下次Consumer连接到Destination为止。例如,如果您有5个应用程序在某一个Topic上进行监听,其中一个崩溃了,那么下一次该应用程序启动并连接到这个特定的Topic时,尽管该应用程序崩溃了,但发送到这个Topic的所有消息都将在应用程序开始再次进行监听时发送到这个应用程序。如果这看起来难以理解,那么在您运行示例代码时会更有意义。
  队列
  Queue是为“点对点”或“一对一”的消息传递而设计的。这意味着应该只有一台客户机发送消息给Queue,并且只有一个应用程序可以处理这个Queue中的消息。
  事实上,您可以有多台向某一个Queue发送消息的客户机,但只能有一个应用程序处理该Queue中的消息。如果在Queue上有多个Consumer,那么哪个Consumer接收消息将无法保证,但是只能有一个Consumer接收消息。如果Destination上需要多个Consumer,并且想让它们都收到相同的消息,那么应该使用Topic(参见本文后面内容)。
  Sender.java和Receiver.java文件中包含展示如何使用Queue的代码。这些代码展示了JMS的初始化过程,并展示了如何检索预定义的Queue,以及为了在Queue上发送和接收消息,应该如何创建MessageProducer和MessageConsumer。
  为了在Queue中消费和产生消息,系统中有两个特定接口。其中一个接口是QueueSender,它是通过调用QueueSession的一个createQueueSender()方法,从QueueSession返回的,用于向Queue发送消息。另一个接口是QueueReceiver,它是通过调用QueueSession的一个createQueueReceiver()方法,从QueueSession返回的,用于接收来自Queue的消息。
  清单2是来自Sender.java的sendMsg方法的代码,它展示了如何从Session中如何创建MessageProducer,然后构造一个TextMessage并发送它。在这段代码中,我们将创建一个QueueSender,然后创建一个TextMessage,它包含从应用程序用户界面的TextField中检索到的文本。随后,我们使用QueueSender方法“发送”消息。
  在MessageConsumer上,有两种处理传入消息接收的方法:同步的和异步的。第一步是创建MessageConsumer;下一步是判断您想在Consumer上同步发送信息还是异步发送消息。