一、IDEA创建Maven工程 1、pom.xml中: 1 2 3 4 5 6 7 org.apache.activemq activemq-all 5.15.11 org.apache.xbean xbean-spring 4.15
2、JMS编码总体架构
回忆一下以前的JDBC编码套路:
第一步:注册驱动(仅仅只做一次)
Class.forName(“com.mysql.jdbc.com”);
第二步:建立连接(Connection)
DriverManager.getConnection(url,user,password);
第三步:创建运行SQL语句(Statement)
connection.createStatement();
第四步:运行语句
rs.executeQuery(sql);
第五步:处理结果集(ResultSet)
第六步:释放资源
3、简介目的地Destination:队列(Queue)和主题(Topic)
4、在点对点的消息传递域中,目的地被称为队列(queue) 案例:
4-1、消息生产者: 代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package com.demo.activemq.queue;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsProduce { private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616" ; private static final String QUEUE_NAME = "queue01" ; public static void main (String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false , Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); MessageProducer messageProducer = session.createProducer(queue); for (int i = 0 ; i TextMessage textMessage = session.createTextMessage("msg---hello" + i); messageProducer.send(textMessage); } messageProducer.close(); session.close(); System.out.println("****消息发布到MQ队列完成" ); } }
4-2、控制台说明:
Number Of Pending Messages=等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。
Number Of Consumers=消费者数量,消费者端的消费者数量。
Messages Enqueued=进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
Messages Dequeued=出队消息数,可以理解为是消费者消费掉的数量。
总结:
当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。
当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。
当再来一条消息时,等待消费的消息是1,进入队列的消息就是2。
4-3、消息消费者 代码1-阻塞式消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package com.demo.activemq.queue;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsConsumer { private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616" ; private static final String QUEUE_NAME = "queue01" ; public static void main (String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false , Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); MessageConsumer messageConsumer = session.createConsumer(queue); while (true ) { TextMessage textMessage = (TextMessage) messageConsumer.receive(); if (textMessage != null ) { System.out.println("****消费者接收到的消息: " + textMessage.getText()); }else { break ; } } messageConsumer.close(); session.close(); connection.close(); } }
代码2-异步监听式消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package com.demo.activemq.queue;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;import java.io.IOException;public class JmsConsumer2 { private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616" ; private static final String QUEUE_NAME = "queue01" ; public static void main (String[] args) throws JMSException, IOException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false , Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); MessageConsumer messageConsumer = session.createConsumer(queue); messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage (Message message) { if (message != null && message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("****消费者接收到的消息: " + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); System.out.println("执行了39行" ); System.in.read(); messageConsumer.close(); session.close(); connection.close(); } }
控制台:
5、在发布订阅消息传递域中,目的地被称为主题(topic) 发布主题生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package com.demo.activemq.topic;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsProducer_Topic { public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616" ; public static final String TOPIC_NAME = "topic01" ; public static void main (String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false , Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(TOPIC_NAME); MessageProducer messageProducer = session.createProducer(topic); for (int i = 0 ; i < 3 ; i++ ) TextMessage textMessage = session.createTextMessage("TOPIC_NAME---" + i); messageProducer.send(textMessage); } messageProducer.close(); session.close(); connection.close(); System.out.println("****TOPIC_NAME消息发布到MQ完成" ); } }
订阅主题消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package com.demo.activemq.topic;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;import java.io.IOException;public class JmsConsumer_Topic { public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616" ; public static final String TOPIC_NAME = "topic01" ; public static void main (String[] args) throws JMSException, IOException { System.out.println("我是1号消费者" ); ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false , Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(TOPIC_NAME); MessageConsumer messageConsumer = session.createConsumer(topic); messageConsumer.setMessageListener(message -> { if (message instanceof TextMessage){ try { String text = ((TextMessage) message).getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); System.in.read(); } }
先启动订阅者再启动生产者,不然发送的消息是废消息
小总结:
两大模式特效
比较项目
Topic队列模式
Queue队列模式
工作模式
“订阅-发布”模式,如果当前没有订阅者,消息将会被丢弃,如果有多个订阅者,那么这些订阅者都会收到消息
“负载均衡”模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息也只会发送给其中一个消费者,并且要求消费者ack信息
有无状态
无状态
Queue数据默认会在mq服务器上已文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面,也可以配置成DB存储
传递完整性
如果没有订阅者,消息会被丢弃
消息不会被丢弃
处理效率
由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异
由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会有明显降低。当然不同消息协议的具体性能也是有差异的
欢迎访问 chenyawei 的博客, 若有问题或者有好的建议欢迎留言,笔者看到之后会及时回复。 评论点赞需要github账号登录,如果没有账号的话请点击 github 注册, 谢谢 !
If you like this blog or find it useful for you, you are welcome to comment on it. You are also welcome to share this blog, so that more people can participate in it. If the images used in the blog infringe your copyright, please contact the author to delete them. Thank you !