MQ

2-Java编码实现ActiveMQ通讯

Posted by Chenyawei on 2020-05-18
Words 2.1k and Reading Time 9 Minutes
Viewed Times

一、IDEA创建Maven工程

1、pom.xml中:

1
2
3
4
5
6
7
org.apache.activemq
activemq-all
5.15.11

org.apache.xbean
xbean-spring
4.15

2、JMS编码总体架构

img

回忆一下以前的JDBC编码套路:

  • 第一步:注册驱动(仅仅只做一次)

    Class.forName(“com.mysql.jdbc.com”);

  • 第二步:建立连接(Connection)

    DriverManager.getConnection(url,user,password);

  • 第三步:创建运行SQL语句(Statement)

    connection.createStatement();

  • 第四步:运行语句

    rs.executeQuery(sql);

  • 第五步:处理结果集(ResultSet)

  • 第六步:释放资源

3、简介目的地Destination:队列(Queue)和主题(Topic)

img

4、在点对点的消息传递域中,目的地被称为队列(queue)

案例:

4-1、消息生产者:

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce {
private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的URL,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话session
//两个参数transacted=事务,acknowledgeMode=确认模式(签收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列queue还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
//6.通过使用消息生产者,生产三条消息,发送到MQ的队列里面
for (int i = 0; i
//7.创建消息
TextMessage textMessage = session.createTextMessage("msg---hello" + i);//理解为一个字符串
//8.通过messageProducer发送给MQ队列
messageProducer.send(textMessage);
}
//9.关闭资源
messageProducer.close();
session.close();
System.out.println("****消息发布到MQ队列完成");
}
}
4-2、控制台说明:

img

Number Of Pending Messages=等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。

Number Of Consumers=消费者数量,消费者端的消费者数量。

Messages Enqueued=进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。

Messages Dequeued=出队消息数,可以理解为是消费者消费掉的数量。

总结:

  • 当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。
  • 当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。
  • 当再来一条消息时,等待消费的消息是1,进入队列的消息就是2。
4-3、消息消费者

代码1-阻塞式消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 简单消息消费者
*/
public class JmsConsumer {
private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的URL,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话session
//两个参数transacted=事务,acknowledgeMode=确认模式(签收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列queue还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息的消费者,指定消费哪一个队列里面的消息
MessageConsumer messageConsumer = session.createConsumer(queue);
//循环获取
while (true) {
//6.通过消费者调用方法获取队列里面的消息(发送的消息是什么类型,接收的时候就强转成什么类型)
TextMessage textMessage = (TextMessage) messageConsumer.receive();
if (textMessage != null) {
System.out.println("****消费者接收到的消息: " + textMessage.getText());
}else {
break;
}
}
//7.关闭资源
messageConsumer.close();
session.close();
connection.close();
}
}

代码2-异步监听式消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
/**
* 监听模式下的消费者
*/
public class JmsConsumer2 {
private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
//1.创建连接工厂,按照给定的URL,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话session
//两个参数transacted=事务,acknowledgeMode=确认模式(签收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列queue还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息的消费者,指定消费哪一个队列里面的消息
MessageConsumer messageConsumer = session.createConsumer(queue);
//6.通过监听的方式消费消息
/*
异步非阻塞式方式监听器(onMessage)
订阅者或消费者通过创建的消费者对象,给消费者注册消息监听器setMessageListener,
当消息有消息的时候,系统会自动调用MessageListener类的onMessage方法
我们只需要在onMessage方法内判断消息类型即可获取消息
*/
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message != null && message instanceof TextMessage) {
//7.把message转换成消息发送前的类型并获取消息内容
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("****消费者接收到的消息: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.out.println("执行了39行");
//保证控制台不关闭,阻止程序关闭
System.in.read();
//关闭资源
messageConsumer.close();
session.close();
connection.close();
}
}

控制台:

img

5、在发布订阅消息传递域中,目的地被称为主题(topic)

发布主题生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.demo.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProducer_Topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的URL,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话session
//两个参数transacted=事务,acknowledgeMode=确认模式(签收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列queue还是主题topic)
Topic topic = session.createTopic(TOPIC_NAME);
//5.创建消息的生产者
MessageProducer messageProducer = session.createProducer(topic);
//6.通过使用消息生产者,生产三条消息,发送到MQ的队列里面
for (int i = 0; i < 3; i++ )
//7.通过session创建消息
TextMessage textMessage = session.createTextMessage("TOPIC_NAME---" + i);
//8.使用指定好目的地的消息生产者发送消息
messageProducer.send(textMessage);
}
//9.关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("****TOPIC_NAME消息发布到MQ完成");
}
}

订阅主题消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.demo.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumer_Topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException, IOException {
System.out.println("我是1号消费者");
//1.创建连接工厂,按照给定的URL,采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2.通过连接工厂,获得connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话session
//两个参数transacted=事务,acknowledgeMode=确认模式(签收)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列queue还是主题topic)
Topic topic = session.createTopic(TOPIC_NAME);
//5.创建消息的消费者
MessageConsumer messageConsumer = session.createConsumer(topic);
//5.创建消息的消费者,指定消费哪一个队列里面的消息
messageConsumer.setMessageListener(message -> {
if (message instanceof TextMessage){
try {
String text = ((TextMessage) message).getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
}
}

img

先启动订阅者再启动生产者,不然发送的消息是废消息

img

小总结:

两大模式特效

img

比较项目 Topic队列模式 Queue队列模式
工作模式 “订阅-发布”模式,如果当前没有订阅者,消息将会被丢弃,如果有多个订阅者,那么这些订阅者都会收到消息 “负载均衡”模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息也只会发送给其中一个消费者,并且要求消费者ack信息
有无状态 无状态 Queue数据默认会在mq服务器上已文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面,也可以配置成DB存储
传递完整性 如果没有订阅者,消息会被丢弃 消息不会被丢弃
处理效率 由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异 由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会有明显降低。当然不同消息协议的具体性能也是有差异的

notice

欢迎访问 chenyawei 的博客, 若有问题或者有好的建议欢迎留言,笔者看到之后会及时回复。 评论点赞需要github账号登录,如果没有账号的话请点击 github 注册, 谢谢 !

If you like this blog or find it useful for you, you are welcome to comment on it. You are also welcome to share this blog, so that more people can participate in it. If the images used in the blog infringe your copyright, please contact the author to delete them. Thank you !