JMS 基本知识及消息传送模型

JMS(Java Message Service) 是 Java 为面向消息中间件(MOM)定义的接口。JMS 的通信管道就是消息队列,说到消息队列,历史就悠久,在 MS 系统中很早就有 MSMQ,譬如邮件、群组就是些消息队列。JMS 因其异步,所以可用来解决高并发的问题,分布式可对负载进行均衡。

JMS 已成为 J2EE 规范中的一部分,所以在 J2EE 应用服务器中都有 JMS 核心部分 MQ 的实现,MQ 也有独立的产品,如 ActiveMQ、JBoss MQ(已更名为 JBoss Messaging)、WebSphere MQ 等。

如果我们蒙着头来理解,JMS 消息通信中的主要角色应该有:消息生产者(Producer)消息消费者(Consumer)、它们间的消息队列(Queue)、以及所传送的消息(Message)

由于 JMS 有两种通信模式:端到端(P2P) 和主题/订阅模式,所以还有个角色就是主题(Topic)。通信中还需要处理诸如连接(Connection),面向连接就会产生会话(Session),而连接一般都是通过连接工厂(Connection Factory) 来获得。

而 MQ 中间件的存在,使得端与端之间不需要直接相连来建立队列,且对于主题/订阅模式更是不可能,所以消息生产者和消费者它们都是指向到 MQ  中间件上的,它们在 MQ 上所指向的队列或主题被抽像为目的地(Destination),对于消息生产者称之为目的地可以理解,但作为消费者来说也叫做目的地中文描述上有些欠妥,谓之消息来源地(Source) 较好理解。还有,因为是异步的消息通信,所以就要注册消息监听器(MessageListener)

通过上面的理解,我们把 JMS 中所有的角色都串联起来了,我们在编程中要处理的基本就那些角色(对象),下面是 JMS 所定义的所有接口关系图(异常接口类未列于其中)。

现在来说说 JMS 中那两种通信模式的区别。首先来看最简单的 P2P 或队列模型。它就像我们邮件发送那样的方式,P 用户发往 C 用户的邮件只有 C 能收到,P 可以在多个邮件客户端发邮件给 C,C 也可以开多个邮件客户端来接收,某一个接收端收取了邮件 M 的话,则另一个接收端就收不到邮件 M 了。腾讯的 Foxmail 默认行为除外,它收完邮件后还会在服务器端保留,经常造成公司邮箱占用过大。用个图来描述:

这种 P2P 的通信息方式应该是数据传输在实际中可能不需要经过 MQ 服务器,而是直接在两个客户端间进行。
只有一个消费者将获得消息

生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。

每一个成功处理的消息都由接收者签收

另一种传送模型是 主题发布/订阅模型。这种方式就类似于邮件列表,一般用户会就某个主题加入邮件列表,即订阅了该主题,当就该主题发布的信息,其它订阅者就都能收到,而且接收方之间是不受影响的。用个图来揣测一下 MQ Server 对该种模型的实现方式:

其实在 ActiveMQ 的管理控制台的 Topics 标签页中,也还是可以看到存在 Queue 的概念,这取决于 JMS 产品针对 Pub/Sub 模型的实现行为上。

MQ 除了完成上面消息转发或分发的任务之外,有时候称这个为目的地管理器(DestinationManager),可以认为是 MQ 的的核心,除此之外还要负责消息缓存、状态、持久化、事物、安全性的管理。消息的持久性还是一项很重要的服务,消费方未启动时,假如有消息到来,消费方再次连接也可以接收到消息,即使 MQ 重启后消息也不会丢失。

MQ 产品一般还支持集群,以及与 MSMQ 或其他 MQ 产品桥接起来,也允许用其他语言编写客户端程序。

最后来看下以上两种消息传送模型中,消息生产者与消费者实现的主要步骤,两种模型的步骤差不多,只是一个是创建队列(Queue),一个是创建主题(Topic)。

消息生产者实现主要步骤:

  //1. 由ConnectionFactory 创建连接,一般 ConnectionFactory 从 JNDI 中获得
  Connection connection = connectionFactory.createConnection(); 

  //2. 创建 Session,
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

  //3. P2P 中创建 Destination, 这里创建了一个 Queue,队列名为 "Hello.Unmi"
  Destination destination = session.createQueue("Hello.Unmi"); //实际应用中队列是从 JNDI 中获得
          //3. Sub/Pub 模型时,创建 Destination, 创建了一个 Topic,主题为 "Unmi.Learn.ActiveMQ"
          Destination destination = session.createTopic("Unmi.Learn.ActiveMQ");

  //4. 创建 Producer,
  MessageProducer producer = session.createProducer(destination);

  //5. 创建 Message,这里创建的是一个文本消息,可创建多种类型的消息
  Message message = session.createTextMessage("Hello JMS Sended.");

  //6. 发送消息
  producer.send(message); 

消息消费者实现主要步骤:

  //1. 创建 Connection,//ActiveMQConnection 实现了 QueueConnection, TopicConnection
  Connection connection = connectionFactory.createConnection();

  //2. 创建 Session
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

  //3. 创建 Destination, 一个 Queue,队列名与上同,这样就能接收到前面生产者发来的消息
  Destination destination = session.createQueue("Hello.Unmi");//实际应用中队列是从 JNDI 中获得
          //3. Sub/Pub 模型时,创建 Destination, 一个 Topic,主题名同上,可接上前面发布的消息
          Destination destination = session.createTopic("Unmi.Learn.ActiveMQ");  //实际应用中队列是从 JNDI 中获得

  //4. 创建 Consumer
  MessageConsumer consumer = session.createConsumer(destination);

  //5. 注册消息监听器,当消息到达时被触发并处理消息,也可阻塞式监听 consumer.receive()
  consumer.setMessageListener(new MessageListener() {
      public void onMessage(Message message) {
              //Do something with the message.
      }
   });

上面代码全部是用最顶层的接口类型来引用的变量,实际应用中会用具体类型来引用变量,以使用更方便的方法。如直接用到 TopicPublisher、TopicSender、QueueSession、QueueConnection 等。而且还可能直接用某个 MQ 产品特定的实现类,如 ActiveMQConnection、等。如果为了便于切换不同的 MQ 产品,当然用最上层的接口去引用类型,但不得不要用到某个 MQ 产品的有利特性的时候,程序代码与该 MQ 产品存在这种高耦合也是不可避免的。

本文链接 https://yanbin.blog/jms-introduction-message-model/, 来自 隔叶黄莺 Yanbin Blog

[版权声明] Creative Commons License 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。

Subscribe
Notify of
guest

1 Comment
Inline Feedbacks
View all comments
阿龙
14 years ago

不错,支持