ActiveMQ - JMS规范
# JMS 是什么
JMS 就是 Java 消息服务(Java Message Service),是一套设计规范。
什么是 Java 消息服务?
Java 消息服务指的是两个应用程序之间进行异步通信的 API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持 Java 应用程序开发。在 JavaEE 中,当两个应用程序使用 JMS 进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。
# JMS 的消息头
常用属性:
- JMSDestination:消息目的地
- JMSDeliveryMode:消息持久化模式
- JMSExpiration:消息过期时间
- JMSPriority:消息的优先级
- JMSMessageID:消息的唯一标识符。后面我们会介绍如何解决幂等性
说明: 消息的生产者可以 set 这些属性,消息的消费者可以 get 这些属性。这些属性在 send 方法里面也可以设置
生产者演示:具体看 15-40 行代码
public class JmsProduceTopic {
// linux 上部署的activemq 的 IP 地址 + activemq 的端口号
public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
public static final String TOPIC_NAME = "topic001";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
for (int i = 1; i <= 3; i++) {
TextMessage textMessage = session.createTextMessage("msg --- " + i);
// 这里可以指定每个消息的目的地
textMessage.setJMSDestination(topic);
/*
持久模式和非持久模式。
一条持久性的消息:应该被传送「一次仅仅一次」,这就意味着如果 JMS 提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。
一条非持久的消息:最多会传递一次,这意味着服务器出现故障,该消息将会永远丢失。
DeliveryMode.NON_PERSISTENT:1,非持久化
DeliveryMode.PERSISTENT:2,持久化
*/
textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
/*
可以设置消息在一定时间后过期,默认是永不过期。
消息过期时间,等于 Destination 的 send 方法中的 timeToLive 值加上发送时刻的GMT时间值。
如果 timeToLive 值等于 0,则 JMSExpiration 被设为 0,表示该消息永不过期。
如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息被清除。
*/
textMessage.setJMSExpiration(1000);
/*
消息优先级,从 0-9 十个级别,0-4 是普通消息 5-9 是加急消息。
JMS 不要求 MQ 严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是 4 级。
*/
textMessage.setJMSPriority(10);
// 唯一标识每个消息的标识。MQ 会给我们默认生成一个,我们也可以自己指定。
textMessage.setJMSMessageID("ABCD");
// 上面有些属性在 send 方法里也能设置
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("消息发送到 MQ 完成");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# JMS 的消息体
JMS 的消息体是封装具体的消息数据。
发送和接收的消息类型必须保持一致(不是什么接口多态)
5 种消息体格式:(前两个常用)
- TextMessage:普通字符串消息,包含一个 String
- MapMessage:一个 Map 类型的消息,key 为 String 类型,value 为 Java 的基本类型
- BytesMessage:二进制数组消息,包含一个 byte[]
- StreamMessage:Java 数据流信息,用标准流操作来顺序的填充和读取
- ObjectMessage:对象消息,包含一个可序列化的 Java 对象
MapMessage 生产者演示:具体看 24-32 行代码
package com.eight;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduceTopic {
// linux 上部署的activemq 的 IP 地址 + activemq 的端口号
public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
public static final String TOPIC_NAME = "topic001";
public static void main(String[] args) throws JMSException {
// 1.按照给定的 url 创建连接工厂,这个构造器采用默认的用户名密码。该类的其他构造方法可以指定用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 2.通过连接工厂,获得连接 connection 并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.创建会话 session。第一参数是是否开启事务,第二参数是消息签收的方式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地(两种 :队列/主题)。Destination 是 Queue 和 Topic 的父类
Topic topic = session.createTopic(TOPIC_NAME);
// 5.创建消息的生产者
MessageProducer producer = session.createProducer(topic);
// 6.通过 messageProducer 生产 3 条 消息发送到消息队列中
for (int i = 1; i <= 3; i++) {
// 7.创建 TextMessage 消息
TextMessage textMessage = session.createTextMessage("msg --- " + i);
// 创建 MapMessage 消息
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name", "kele");
mapMessage.setInt("age", 18);
// 8.通过 messageProducer 发送给 MQ
producer.send(textMessage);
producer.send(mapMessage);
}
// 9.关闭资源
producer.close();
session.close();
connection.close();
System.out.println("消息发送到 MQ 完成");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
与之对应的 消费者演示,如果接收 MapMessage 数据,必须指定该类
public class JmsConsummer_topic {
public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer messageConsumer = session.createConsumer(topic);
messageConsumer.setMessageListener( (message) -> {
// 判断消息是哪种类型之后,再强转。
if (null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("****消费者text的消息:"+textMessage.getText());
}catch (JMSException e) {
}
}
if (null != message && message instanceof MapMessage){
MapMessage mapMessage = (MapMessage)message;
try {
System.out.println("****消费者的map消息:"+mapMessage.getString("name"));
System.out.println("****消费者的map消息:"+mapMessage.getInt("age"));
}catch (JMSException e) {
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# JMS 的消息属性
如果需要除消息头字段之外的值,那么可以使用消息属性。他是识别 / 去重 / 重点标注等操作,非常有用的方法。
他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。
下图是设置消息属性的 API:
生产者演示:具体看 15-19 行代码
public class JmsProduceTopic {
// linux 上部署的activemq 的 IP 地址 + activemq 的端口号
public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
public static final String TOPIC_NAME = "topic001";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
for (int i = 1; i <= 3; i++) {
TextMessage textMessage = session.createTextMessage("msg --- " + i);
// 调用 Message 的 set*Property() 方法,就能设置消息属性。根据 value 的数据类型的不同,有相应的 API。
textMessage.setStringProperty("From","ZhangSan@qq.com");
textMessage.setByteProperty("Spec", (byte) 1);
textMessage.setBooleanProperty("Invalide",true);
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("消息发送到 MQ 完成");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
与之对应的 消费者演示:具体看 18-20 行代码
public class JmsConsumerTopic {
// linux 上部署的 activemq 的 IP 地址 + activemq 的端口号(默认 61616)
public static final String ACTIVE_URL = "tcp://192.168.199.27:61616";
public static final String TOPIC_NAME = "topic001";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(message -> {
if(null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费的消息:" + textMessage.getText());
System.out.println("消息属性:"+textMessage.getStringProperty("From"));
System.out.println("消息属性:"+textMessage.getByteProperty("Spec"));
System.out.println("消息属性:"+textMessage.getBooleanProperty("Invalide"));
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 让主线程不要结束。因为一旦主线程结束了,其他的线程(如此处的监听消息的线程)也都会被迫结束。
// 实际开发中,我们的程序会一直运行,这句代码都会省略
System.in.read();
consumer.close();
session.close();
connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
编辑此页 (opens new window)
更新时间: 2024/11/02, 09:43:06