使用activemq
说说在项目里是怎么使用activemq(简称为amq)进行通信的。
有2个系统,面向不同的用户,简称为系统A和系统B。本文为了简单,只将系统A作为 队列A.CreateMessage的生产端,系统B作为 队列A.CreateMessage的消费端,传输的message可为一个设计好的类的对象,本文为了简单,传输的是一个String对象。
另外,系统A也可以作为另一队列QC的消费端,系统B作为队列QC的生产端。
1.下载一个apache-activemq-5.10.2,根据系统类型(操作系统位数),选择启动bin目录下的win32或win64目录下的activemq.bat文件。启动后,打开浏览器,输入localhost:8161/admin/queues.jsp,
如果页面是下面这样的

输入用户名:admin,密码:admin就OK了。

2.amq也启动了,那么接下来是在系统A加上amq相关内容。
项目目录结构如下:

系统A的applicationContext-amq.xml文件:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.10.2.xsd">
<!--
使用spring的listenerContainer,消息用持久化保存,服务器重启不会丢失
-->
<!-- 连接外部的activeMQ-->
<amq:connectionFactory id="jmsConnectionFactory" userName="admin" password="admin" brokerURL="tcp://localhost:61616"></amq:connectionFactory>
<!-- Spring JmsTemplate config -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<!-- lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsConnectionFactory" />
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>
<!-- converter -->
<bean id="defaultMessageConverter" class="com.pack.app.amq.DefaultMessageConverter" />
<!-- ActiveMQ destinations -->
<!-- 使用Queue方式-->
<amq:queue name="QUEUE" physicalName="TESTQ" />
<bean id="queueMessageProducer" class="tools.amq.QueueMessageProducer">
<property name="template" ref="jmsTemplate" />
<property name="destination" ref="QUEUE" />
</bean>
<!-- consumer for queue -->
<bean id="queueConsumer" class="tools.amq.QueueConsumer" />
<!-- Message Listener for queue -->
<bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="queueConsumer" />
<!-- may be other method -->
<property name="defaultListenerMethod" value="receive" />
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>
<!-- listener container,MDP无需实现接口 -->
<bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="QUEUE" />
<property name="messageListener" ref="queueListener" />
</bean><!-- 测试 向MQ发消息 -->
<amq:queue name="CreateMessage" physicalName="A.CreateMessage" />
<!-- 生产数据 -->
<bean id="createMessageProducer" class="com.pack.app.amq.producer.CreateMessageProducer">
<property name="template" ref="jmsTemplate" />
<property name="destination" ref="CreateMessage" />
</bean>
</beans>DefaultMessageConverter.java
public class DefaultMessageConverter implements MessageConverter {
/**
* Logger for this class
*/
private static final Log log = LogFactory.getLog(DefaultMessageConverter.class);
public Message toMessage(Object obj, Session session) throws JMSException {
if (log.isDebugEnabled()) {
log.debug("toMessage(Object, Session) - start");
}
// check Type
ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session.createObjectMessage();
HashMap<String, byte[]> map = new HashMap<String, byte[]>();
try {
// POJO must implements Seralizable
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
map.put("POJO", bos.toByteArray());
objMsg.setObjectProperty("Map", map);
} catch (IOException e) {
e.printStackTrace();
log.error("toMessage(Object, Session)", e);
}
return objMsg;
}
public Object fromMessage(Message msg) throws JMSException {
if (log.isDebugEnabled()) {
log.debug("fromMessage(Message) - start");
}
if (msg instanceof ObjectMessage) {
HashMap<String, byte[]> map = (HashMap<String, byte[]>) ((ObjectMessage) msg).getObjectProperty("Map");
try {
// POJO must implements Seralizable
ByteArrayInputStream bis = new ByteArrayInputStream(map.get("POJO"));
ObjectInputStream ois = new ObjectInputStream(bis);
Object returnObject = ois.readObject();
return returnObject;
} catch (IOException e) {
e.printStackTrace();
log.error("fromMessage(Message)", e);
} catch (ClassNotFoundException e) {
e.printStackTrace();
log.error("fromMessage(Message)", e);
}
return null;
} else {
throw new JMSException("Msg:[" + msg + "] is not Map");
}
}
}QueueMessageProducer.java
import org.springframework.jms.core.JmsTemplate;
import javax.jms.Queue;
/**
* Date: 2015-7-1
* Time: 17:14:23
*/
public class QueueMessageProducer {
private JmsTemplate template;
private Queue destination;
public void setTemplate(JmsTemplate template) {
this.template = template;
}
public void setDestination(Queue destination) {
this.destination = destination;
}
public void send(FooMessage message) {
template.convertAndSend(this.destination, message);
}
}CreateMessageProducer.java(消息生产者)
import javax.jms.Queue;
import org.springframework.jms.core.JmsTemplate;
public class CreateMessageProducer {
private JmsTemplate template;
private Queue destination;
public void setTemplate(JmsTemplate template) {
this.template = template;
}
public void setDestination(Queue destination) {
this.destination = destination;
}
public void send(String str) {
template.convertAndSend(this.destination, str);
System.out.println("system A send message to system B~~~~~~~~~~");
}
}3.在系统B加上amq相关内容。
applicationContext-amq.xml文件
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.10.2.xsd">
<!--
使用spring的listenerContainer,消息用持久化保存,服务器重启不会丢失
-->
<!-- 连接外部的activeMQ-->
<amq:connectionFactory id="jmsConnectionFactory" userName="admin" password="admin" brokerURL="tcp://localhost:61616"></amq:connectionFactory>
<!-- ActiveMQ destinations -->
<!-- 使用Queue方式-->
<amq:queue name="QUEUE" physicalName="TESTQ" />
<!-- Spring JmsTemplate config -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<!-- lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsConnectionFactory" />
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>
<!-- converter -->
<bean id="defaultMessageConverter" class="com.pack.app.amq.DefaultMessageConverter" />
<bean id="queueMessageProducer" class="tools.amq.QueueMessageProducer">
<property name="template" ref="jmsTemplate" />
<property name="destination" ref="QUEUE" />
</bean>
<!-- consumer for queue -->
<bean id="queueConsumer" class="tools.amq.QueueConsumer" />
<!-- Message Listener for queue -->
<bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="queueConsumer" />
<!-- may be other method -->
<property name="defaultListenerMethod" value="receive" />
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>
<!-- listener container,MDP无需实现接口 -->
<bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="QUEUE" />
<property name="messageListener" ref="queueListener" />
</bean>
<!-- 测试 接收消息 -->
<amq:queue name="CreateMessage" physicalName="A.CreateMessage" />
<!-- 接收数据 -->
<bean id="createMessageConsumer" class="com.pack.app.amq.consumer.CreateMessageConsumer" />
<!-- 监听 -->
<bean id="createMessageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="createMessageConsumer" />
<!-- may be other method -->
<property name="defaultListenerMethod" value="process" />
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>
<!-- listener container,MDP无需实现接口 -->
<bean id="createMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="CreateMessage" />
<property name="messageListener" ref="createMessageListener" />
<!-- 消费者个数 -->
<!-- <property name="concurrentConsumers" value="4"></property> -->
</bean>
</beans>DefaultMessageConverter.java、QueueMessageProducer.java、QueueConsumer.java与系统A一样。 CreateMessageConsumer.java
public class CreateMessageConsumer {
@Autowired
public AgentService agentService;
public void process(String str) {
System.out.println("system B receive message from system A ");
agentService.agentPath(str);
}
}4.启动系统A和系统B的应用,只要系统A往队列A.CreateMessage产生消息,系统B会自动接收到消息。
相关推荐
xinglun 2020-02-14
胡献根 2020-07-18
胡献根 2020-07-05
jiangtie 2020-06-10
onlylixiaobei 2020-06-09
xinglun 2020-06-02
方新德 2020-05-31
Java高知 2020-05-20
Java高知 2020-05-08
Java高知 2020-05-03
onlylixiaobei 2020-05-02
Java高知 2020-04-22
胡献根 2020-04-22
heweiyabeijing 2020-04-21
方新德 2020-04-20
胡献根 2020-04-10
onlylixiaobei 2020-04-10
方新德 2020-04-08
