Spring 整合 ActiveMq
Spring 整合 ActiveMq
整合步骤如下:
- 添加依赖
- 连接 mq 消息服务器
- 定义生产者/消费者
- 发送/接收消息
添加依赖
<properties>
<spring_version>4.2.4.RELEASE</spring_version>
</properties>
<dependencies>
<!--Spring-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring_version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring_version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.2.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring_version}</version>
</dependency>
<!--ActiveMq-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.3</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.commons</groupId>-->
<!--<artifactId>commons-pool2</artifactId>-->
<!--<version>2.5</version>-->
<!--</dependency>-->
<!--servlet-->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>RELEASE</version>
<scope>provided</scope>
</dependency>
<!--Test-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!--fast Json-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>连接服务器
定义一个类实现 ConnectionFactory 接口,类属性包括 brokerURL(服务器地址),userName,password,maxConnection(最大连接数, 连接池使用参数);
/**
* author: getthrough
* date: 2018/3/8
* description: 连接工厂的包装类
*/
public class ActiveMqConnectionFactoryDecoration implements ConnectionFactory {
/**apache 提供的连接池*/
// private PooledConnectionFactory pooledConnectionFactory;
private String brokerURL;
private String userName;
private String password;
private String maxConntection;
private ActiveMQConnectionFactory activeMQConnectionFactory;
public ActiveMqConnectionFactoryDecoration() {
}
public void run() throws JMSException {
activeMQConnectionFactory.setBrokerURL(brokerURL);
activeMQConnectionFactory.setUserName(userName);
activeMQConnectionFactory.setPassword(password);
activeMQConnectionFactory.createConnection();
// pooledConnectionFactory = new PooledConnectionFactory();
// pooledConnectionFactory.setMaxConnections(Integer.parseInt(maxConntection));
// pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory);
// pooledConnectionFactory.createConnection(userName, password);
}
public void stop() {
// if (null != pooledConnectionFactory) {
// pooledConnectionFactory.stop();
// }
}
public Connection createConnection() throws JMSException {
// return pooledConnectionFactory.createConnection();
return activeMQConnectionFactory.createConnection();
}
public Connection createConnection(String userName, String password) throws JMSException {
// return pooledConnectionFactory.createConnection(userName, password);
return activeMQConnectionFactory.createConnection(userName, password);
}
public void setActiveMQConnectionFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
this.activeMQConnectionFactory = activeMQConnectionFactory;
}
// ... 其他属性的getters&setters在 spring 配置文件中定义这个 bean:
<bean id="activeMqConnectionFactoryDecoration " class="mq.ActiveMqConnectionFactoryDecoration ">
<property name="activeMQConnectionFactory">
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"/>
</property>
<property name="brokerURL" value="${brokerURL}"/>
<property name="userName" value="${userName}"/>
<property name="password" value="${password}"/>
<property name="maxConntection" value="${maxConntection}"/>
</bean>创建生产者/消费者
创建一个消息发送类, 简单包装下发送消息流程
/**
* author: getthrough
* date: 2018/3/8
* description:
*/
public class ActiveMqSender {
private static JmsTemplate jmsTemplate;
private static Destination destination;
public static void sendMqMessage(final String content) {
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(content);
}
});
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
}创建一个监听器, 实现 MessageListener 接口
/**
* author: getthrough
* date: 2018/3/8
* description:
*/
// 此处ParentMessageListener实现了 MessageListener接口
public class QueueMessageListener extends ParentMessageListener {
private Logger logger = LoggerFactory.getLogger(QueueMessageListener.class);
/**
* 消息前处理
*/
public void beforeHandling() {
// doSomething ...
logger.info("before hanlding queue msg ...");
}
/**
* 消息后处理
*/
public void afterHandling() {
// doSomething ...
logger.info("after hanlding queue msg ...");
}
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
logger.info("########### consumer1 has receive the message :" + textMessage.getText() + " ############");
} catch (JMSException e) {
logger.info("########## failed to get message text! ###########");
e.printStackTrace();
}
}
} 相关推荐
胡献根 2020-07-18
胡献根 2020-07-05
jiangtie 2020-06-10
onlylixiaobei 2020-06-09
xinglun 2020-06-02
方新德 2020-05-31
Java高知 2020-05-20
Java高知 2020-05-08
Java高知 2020-05-03
onlylixiaobei 2020-05-02
Java高知 2020-04-22
胡献根 2020-04-22
heweiyabeijing 2020-04-21
方新德 2020-04-20
胡献根 2020-04-10
onlylixiaobei 2020-04-10
方新德 2020-04-08
xuedabao 2020-03-30