JMS操作ActiveMQ

点击量:85

主要包含对原生jms模式操作和spring整合jms

消息中间件 ActiveMQ

能解决的事:

1.解耦
2.异步
3.削峰

常见的消息中间件

1.ActiveMQ   apache
2.RabbitMQ  
3.ZeroMQ
3.RocketMQ   alibaba
4.Kafka
java通过JMS来操作MQ(Kafka没法操作)
一个MQ软件中有多个队列

JMS可以存放5种消息

1.Text
2.MapMessage
3.ObjectMessage
4.BytesMessage
5.StreamMessage     java原始数据流

消息队列的模式

1.点对点模型
一个生产中生产的消息只能被一个消费者消费
2.发布订阅模型
一个生产者生产的消息可以让多个消费者同时消费,消费后消息还在mq
中不消失

具体操作
1.运行MQ软件
2.在浏览器打开ActiveMQ的web控制台
3.编写java代码

JMS实际编码
原生JMS 点对点 生产者,消费者
原生JMS发布订阅 生产者消费者
Spring点对点 生产者,消费者
Spring发布订阅 生产者,消费者

原生JMS方式

点对点消息

原生JMS  点对点,生产者
1.创建生产者项目
2.导入依赖
activemq-all
3.编写java代码
//创建工厂
ConnectionFactory cf=new ActiveMQConnectionFactory("tcp://localhost:61616")
   
//创建连接
Connection conn = cf.createConnection();

//启动连接
conn.start();

//创建session
conn.createSession("false","") //arg1:是否启用事务 arg2事务的类型
   
//创建MQ的目的地
Destinnation des =session.createQueue("队列名字");
   
//创建生产者
MessageProducer pro=session.createProducer()
   
//创建消息
XXXXMessage message=session.createXXMessage();
   
//发送消息
pro.send(message)
   
//关闭资源
pro.close()
session.close()
conn.close()
1.创建消费者项目
2.导入依赖
3.编写java代码
//创建工厂
ConnectionFactory cf=new ActiveMQConnectionFactory("tcp://localhost:61616")
   
//创建连接
Connection conn = cf.createConnection();

//启动连接
conn.start();

//创建session
conn.createSession("false","") //arg1:是否启用事务 arg2事务的类型
   
//创建MQ的目标对象
Destinnation des =session.createQueue("队列名字");
   
//创建消费者
MessageConsumer pro=session.createConsumer()
   
//借助监听来接收消息
consumer.setMessageListener(new MessageListener(){
   public void onMessage(Message message){
       TextMessage tm=(TextMessage)message;
       String text=textMessage.getText();
       sout(text)
  }
});

//不间断监听,点击退出
System.in.read()

//关闭资源
consumer.close()
session.close()
conn.close()

订阅模式

生产者
//创建MQ目标时更改为topic
session.createTopic("name");
消费者
//创建MQ目标时更改为topic
session.createTopic("name");

*** 订阅模式 ***
消费者不会消费连接之前已有的消息

Spring整合JMS (spring-jms)

1.导入依赖
spring-jms
activemq-all
spring-基础包
2.添加配置文件
3.编写java代码

生产者配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
      xmlns:context="http://www.springframework.org/schema/context"
      xmlns:tx="http://www.springframework.org/schema/tx"
      xmlns:task="http://www.springframework.org/schema/task"
      xmlns:amq="http://activemq.apache.org/schema/core"
      xmlns:jms="http://www.springframework.org/schema/jms"
      xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/jms
       http://www.springframework.org/schema/jms/spring-jms.xsd
http://activemq.apache.org/schema/core
       http://activemq.apache.org/schema/core/activemq-core.xsd">

   <!--spring包扫描-->
   <context:component-scan base-package="com.li"></context:component-scan>

   <!--将ActiveMQ工厂注入容器,并传入MQ的地址-->
   <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
       <constructor-arg name="brokerURL" value="tcp://127.0.0.1:61616/"></constructor-arg>
   </bean>

   <!--springMQ工厂包装ActiveMQ工厂-->
   <bean class="org.springframework.jms.connection.CachingConnectionFactory" id="cachingConnectionFactory">
       <property name="targetConnectionFactory" ref="connectionFactory"></property>
   </bean>

   <!--创建JmsTemplate,用于发送消息-->
   <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
       <property name="connectionFactory" ref="cachingConnectionFactory"></property>
   </bean>

   <!--创建目标对象,用于指定队列类型和队列名称-->
   <bean id="textQueue" class="org.apache.activemq.command.ActiveMQQueue">
       <constructor-arg name="name" value="lsli-queue"></constructor-arg>
   </bean>


   <!--<bean id="textTopic">
       <constructor-arg name="name" value="test-spring-topic"></constructor-arg>
   </bean>-->
</beans>

生产者发送消息(java)

public class Producer {
   public static void main(String[] args) {

       ClassPathXmlApplicationContext ac = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-*.xml");
       JmsTemplate jmsTemplate = (JmsTemplate) ac.getBean("jmsTemplate");
       Destination textQueue = (Destination) ac.getBean("textQueue");

      jmsTemplate.send(textQueue, new MessageCreator() {
          @Override
          public Message createMessage(Session session) throws JMSException {
              TextMessage textMessage = session.createTextMessage();
              textMessage.setText("这是队列发送的消息");
              System.out.println("消息发送成功");
              return textMessage;
          }
      });
  }
}

消费者配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
      xmlns:context="http://www.springframework.org/schema/context"
      xmlns:tx="http://www.springframework.org/schema/tx"
      xmlns:task="http://www.springframework.org/schema/task"
      xmlns:amq="http://activemq.apache.org/schema/core"
      xmlns:jms="http://www.springframework.org/schema/jms"
      xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/jms
       http://www.springframework.org/schema/jms/spring-jms.xsd
http://activemq.apache.org/schema/core
       http://activemq.apache.org/schema/core/activemq-core.xsd">

   <!--spring包扫描-->
   <context:component-scan base-package="com.li"></context:component-scan>

   <!--将ActiveMQ工厂注入容器,并传入MQ的地址-->
   <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
       <constructor-arg name="brokerURL" value="tcp://127.0.0.1:61616/"></constructor-arg>
   </bean>

   <!--springMQ工厂包装ActiveMQ工厂-->
   <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory" >
       <property name="targetConnectionFactory" ref="connectionFactory"></property>
   </bean>


   <!--创建目标对象,用于指定队列类型和队列名称-->
   <bean id="lsli-q" class="org.apache.activemq.command.ActiveMQQueue">
       <constructor-arg name="name" value="lsli-queue"></constructor-arg>
   </bean>

   <bean id="consumer" class="com.li.Consumer"></bean>

  <!-- 配置监听器方式1,只能用于指定名称-->
   <jms:listener-container destination-type="queue" connection-factory="cachingConnectionFactory">
       <jms:listener destination="PPP" ref="consumer"></jms:listener>
   </jms:listener-container>

   <!--监听器-->
   <!--<bean id="jmsContainer"       >
       <property name="connectionFactory" ref="connectionFactory" />
       <property name="destination" ref="lsli-q" />
       <property name="messageListener" ref="consumer" />
   </bean>-->

</beans>

消费者监听器

//必须实现MessageListener接口
public class Consumer implements MessageListener {

   @Override
   public void onMessage(Message message) {
       //强转message到指定类型
       TextMessage textMessage = (TextMessage) message;
       try {
           String text = textMessage.getText();
           System.out.println(text);
      } catch (JMSException e) {
           e.printStackTrace();
      }
  }
}


感谢您的阅读
文章由作者个人总结,欢迎指出文章错误



上一篇:Kotlin(1)-基本语法

下一篇:Kotlin(2)-类与对象