一、是什么
1、JavaEE
是一套使用Java进行企业级应用开发的,大家一致遵循的13个核心规范工业标准。JavaEE平台提供了一个基于组件的方法来加快设计,开发。装配及部署企业应用程序。
- JDBC(Java Databease)数据库连接
- JNDI(Java Naming and Directory Interfaces)Java的命令和目录接口
- EJB(Enterprise JavaBean)
- RMI(Remote Method Invoke)远程方法调用
- Java IDL(Interface Description Language)/CORBA(Common Object Broker Architecture)接口定义语言/共用对象请求代理程序体系结构
- JSP(Java Server Page)
- Servlet
- XML(Extensible Markup Language)可标记白标记语言
- JMS(Java Message Service)Java消息服务
- JTA(Java Transaction API)Java事务API
- JTS(Java Transaction Service)Java事务服务
- JavaMail
- JAF(JavaBean Activation Framework)
2、JMS
Java Message Service(Java消息服务是JavaEE中的一个技术
什么是Java消息服务
Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。
二、MQ中间件的其他落地产品
消息队列的详细比较
| 特性 | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
|---|---|---|---|---|
| PRODUCER-CUMSUMER | 支持 | 支持 | 支持 | 支持 |
| PUBLISH-SUBSCRIBE | 支持 | 支持 | 支持 | 支持 |
| REQUEST-REPLY | 支持 | 支持 | - | 支持 |
| API完备性 | 高 | 高 | 高 | 低(静态配置) |
| 多语言支持 | 支持,Java优先 | 语言无关 | 支持,Java优先 | 支持 |
| 单机吞吐量 | 万级 | 万级 | 十万级 | 单机万级 |
| 消息延迟 | - | 微秒级 | 毫秒级 | - |
| 可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 高 |
| 消息丢失 | - | 低 | 理论上不会丢失 | - |
| 消息重复 | - | 可控制 | 理论上会有重复 | - |
| 文档的完备性 | 高 | 高 | 高 | 中 |
| 提供快速入门 | 有 | 有 | 有 | 无 |
| 首次部署难度 | - | 低 | 中 | 高 |
三、JMS的组成结构和特点
1、JMS Provider:实现JMS接口和规范的消息中间件,也就是我们说的MQ服务器
2、JMS Producer:消息生产者,创建和发送JMS消息的客户端应用
3、JMS Consumer:消息消费者,接收和处理JMS消息的客户端应用
4、JSM Message
消息头
JMSDestination:消息发送的目的地,主要是指Queue和Topic;
JMSDeliveryMode
持久模式和非持久模式。
一条持久性的消息:应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。
一条非持久的消息:最多会传递一次,这意味着服务器出现故障,该消息将会永远丢失。
JMSExpiration
可以设置消息在一定时间后过期,默认是永不过期
消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。
如果timeToLive值等于0,则JMSExpiration被设为0,表示该消息永不过期。
如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息被清除。
JMSPriority
消息优先级,从0-9十个级别,0-4是普通消息5-9是加急消息。
JMS不要求MQ严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是4级。
JMSMessageID:唯一标识每个消息的标识由MQ产生。
消息属性
- 封装具体的消息数据
- 5种消息格式
- TxtMessage:普通字符串消息,包含一个String
- MapMessage:一个Map类型的消息,key为Strng类型,而值为Java基本类型
- BytesMessage:二进制数组消息,包含一个byte[]
- StreamMessage:Java数据流消息,用标准流操作来顺序填充和读取
- ObjectMessage:对象消息,包含一个可序列化的Java对象
- 发送和接收的消息体类型必须一致对应
消息体
如果需要除消息字段以外的值,那么可以使用消息属性
识别/去重/重点标注等操作非常有用的方法
是什么
四、JMS的可靠性
1、PERSISTENT:持久性
参数设置说明
非持久
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
非持久化:当服务器宕机,消息不存在。
持久
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT)
持久化: 当服务器宕机,消息依然存在。
Queue默认是持久
持久的Queue
演示:
1
2
3MessageProducer messageProducer = session.createProducer(queue);
//设置通过session创建出来的生产者生产的Queue消息为持久性
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);结论:
1
2
3持久化消息
这是队列的默认传递模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。
可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。
持久的Topic
先启动定阅消费者再启动定阅生产者
持久的发布主题生产者:
1 | package activeMQ; |
控制台:
订阅者在线:
订阅者不在线:
持久的定阅主题消费者:
1 | package activeMQ; |
控制台:
订阅者在线
订阅者不在线
类似微信公众号订阅发布
2、Transaction:事务
producer提交时的事务
false::只要执行send,就进入到队列中;关闭事务,那第2个签收参数的设置需要有效。
true:先执行send再执行commit,消息才被真正提交到队列中;消息需要需要批量提交,需要缓冲处理。
事务偏生产者/签收偏消费者
代码 生产者:
1 | package activeMQ; |
消费者:
1 | package activeMQ; |
3、Acknowledge:三种签收方式
Session.AUTO_ACKNOWLEDGE
当客户端从receiver或onMessage成功返回时,Session自动签收客户端的这条消息的收条。
Session.CLIENT_ACKNOWLEDGE
客户端通过调用消息(Message)的acknowledge方法签收消息。在这种情况下,签收发生在Session层面:签收一个已经消费的消息会自动地签收这个Session所有已消费的收条。
Session.DUPS_OK_ACKNOWLEDGE
Session不必确保对传送消息的签收,这个模式可能会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用。
非事务
- 自动签收(默认):Session.AUTO_ACKNOWLEDGE
- 手动签收:Session.CLIENT_ACKNOWLEDGE;客户端调用acknowledge方法手动签收
- 允许重复消息
事务
生产事务开启,只有commit后才能将全部消息变为已消费
消息生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32package activeMQ;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Author: chenyawei
* @Date: 2020/7/29 11:26 上午
* @Description: 消息生产者 : JMS的可靠性-签收
*/
public class Jms_Transaction_AUTOACK_Producer {
private static final String ACTIVEMQ_URL = "tcp://192.168.33.10:61616";
private static final String ACTIVEMQ_QUEUE_NAME = "Queue-ACK-NoTransaction";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection;
connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i<3; i++){
TextMessage textMessage = session.createTextMessage("Transaction_AUTOACK-msg: " + i);
producer.send(textMessage);
}
session.commit();
System.out.println("发送完成");
producer.close();
session.close();
connection.close();
}
}
消息消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39package activeMQ;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @Author: chenyawei
* @Date: 2020/7/29 11:32 上午
* @Description: 消息消费者 : JMS的可靠性-签收
*/
public class Jms_Transaction_CLIENTACK_Consumer {
private static final String ACTIVEMQ_URL = "tcp://192.168.33.10:61616";
private static final String ACTIVEMQ_QUEUE_NAME = "Queue-ACK-Transaction";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//消费者设置了手动签收,就必须自己签收,向服务器发送我已经收到消息了
//开启事务如果不提交,就算手动签收,也是无效的
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
textMessage.acknowledge();
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
}
}由于消费者开启了事务,没有提交事务(就算手动签收也没用),服务器认为,消费者没有收到消息
签收和事务的关系:
在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。
非事务性会话中,消息何时被确认取决于创建会话时的应答模式(ACKNOWLEDGE)。
五、JMS的点对点总结
点对点模型是基于队列的,生产者发送消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和我们平时给朋友发送短信类似。
- 如果在Session关闭时有部分消息被收到但还没有被签收(acknowledge),那当消费者下次连接到相同的队列时,这些消息还会被再次接收
- 队列可以长久的保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的链接状态,充分体现了异步传输模式的优势
六、JMS的发布订阅总结
非持久订阅
非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收发到某个主题的消息。
如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。
一句话:先订阅注册才能接受到发布,只给订阅者发布消息。
持久订阅
客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户再次连接到MQ的时候,会根据消费者的ID得到所有当自己处于离线时发送到主题的消息
当持久订阅状态下,不能恢复或重新派送一个未签收的消息。
持久订阅才能恢复或重新派送一个未签收的消息。
用哪个?
当所有的消息必须被接收,则用持久订阅。当消息丢失能够被容忍,则用非持久订阅
If you like this blog or find it useful for you, you are welcome to comment on it. You are also welcome to share this blog, so that more people can participate in it. If the images used in the blog infringe your copyright, please contact the author to delete them. Thank you !