Spring整合RabbitMQ

需要依赖的jar包

/* RabbitMQ of Gradle */
compile("com.rabbitmq:amqp-client:5.1.2")
compile("org.springframework.amqp:spring-rabbit:2.0.1.RELEASE")

/* RabbitMQ of Maven*/
<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.1.2</version>
</dependency>
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>2.0.1.RELEASE</version>
</dependency>

direct交换器

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <description>Spring RabbitMQ 整合配置</description>
    <!--
        virtual-host:虚拟主机,用于权限管理,默认为 /
    -->
    <rabbit:connection-factory id="connectionFactory"
                               host="192.168.41.128"
                               port="5672"
                               username="admin"
                               password="admin"
                               virtual-host="/"/>

    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- 咨询订单创建成功 -->
    <rabbit:queue id="q_consult_create_id" name="q_consult_create_name" durable="true" auto-delete="false" exclusive="false"/>
    <!-- 咨询订单支付成功 -->
    <rabbit:queue id="q_order_pay_id" name="q_order_pay_name" durable="true" auto-delete="false" exclusive="false"/>

    <!-- direct exchange -->
    <rabbit:direct-exchange name="myDirectExchangeName" id="myDirectExchangeId">
        <rabbit:bindings>
            <rabbit:binding queue="q_consult_create_id" key="q_consult_create_routeKey"/>
            <rabbit:binding queue="q_order_pay_id" key="q_order_pay_routeKey"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- spring amqpTemplate声明,此处exchange不能使用id指定 -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="myDirectExchangeName"/>

    <!-- 消费者 -->
    <bean id="consumerListener" class="cn.gov.wu.rabbit.ConsumerListener"/>
    <rabbit:listener-container connection-factory="connectionFactory">
        <!-- 使用queues或者queue-names都可以 -->
        <rabbit:listener ref="consumerListener" method="receiveMsg" queues="q_consult_create_id,q_order_pay_id"/>
        <!--<rabbit:listener ref="consumerListener" method="receiveMsg" queue-names="q_consult_create_name,q_order_pay_name"/>-->
    </rabbit:listener-container>

</beans>
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/rabbitmq-config-direct.xml");
    AmqpTemplate amqpTemplate = context.getBean("amqpTemplate", AmqpTemplate.class);

    String routingKey = "q_consult_create_routeKey";
    amqpTemplate.convertAndSend(routingKey, "给队列 q_consult_create 发送消息");

    routingKey = "q_order_pay_routeKey";
    amqpTemplate.convertAndSend(routingKey, "给队列 q_order_pay 发送消息");
    context.close();

  

topic交换器

特别注意:发往topic转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <description>Spring RabbitMQ 整合配置</description>
    <!--
        virtual-host:虚拟主机,用于权限管理,默认为 /
    -->
    <rabbit:connection-factory id="connectionFactory"
                               host="192.168.41.128"
                               port="5672"
                               username="admin"
                               password="admin"
                               virtual-host="/"/>

    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- 咨询订单创建成功 -->
    <rabbit:queue id="q_topic_consult_create_id" name="q_topic_consult_create_name" durable="false" auto-delete="false" exclusive="false"/>
    <!-- 咨询订单支付成功 -->
    <rabbit:queue id="q_topic_order_pay_id" name="q_topic_order_pay_name" durable="false" auto-delete="false" exclusive="false"/>

    <!-- topic exchange -->
    <rabbit:topic-exchange name="myTopicExchangeName" id="myTopicExchangeId" durable="false">
        <rabbit:bindings>
            <rabbit:binding pattern="#.create.*.routeKey" queue="q_topic_consult_create_id"/>
            <rabbit:binding pattern="q.*.*.routeKey" queue="q_topic_order_pay_id"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!-- spring amqpTemplate声明,此处exchange不能使用id指定 -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="myTopicExchangeName"/>

    <!-- 消费者 -->
    <bean id="consumerListener" class="cn.gov.wu.rabbit.ConsumerListener"/>
    <rabbit:listener-container connection-factory="connectionFactory">
        <!-- 使用queues或者queue-names都可以 -->
        <rabbit:listener ref="consumerListener" method="receiveMsg" queues="q_topic_consult_create_id,q_topic_order_pay_id"/>
    </rabbit:listener-container>

</beans>
// 发往topic转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/rabbitmq-config-topic.xml");
AmqpTemplate amqpTemplate = context.getBean("amqpTemplate", AmqpTemplate.class);

String exchangeName = "myTopicExchangeName";

String routingKey1 = "a.create.consult.routeKey";
String routingKey2 = "q.create.consult.routeKey";
amqpTemplate.convertAndSend(exchangeName, routingKey1, "给exchange myTopicExchangeName 发送消息, routeKey为 a.create.consult.routeKey");
amqpTemplate.convertAndSend(exchangeName, routingKey2, "给exchange myTopicExchangeName 发送消息, routeKey为 b.create.consult.routeKey");

String routingKey3 = "q.order.pay.routeKey";
amqpTemplate.convertAndSend(exchangeName, routingKey3, "给exchange myTopicExchangeName 发送消息, routeKey为 q.order.pay.routeKe");
context.close();

  

fanout交换器

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <description>Spring RabbitMQ 整合配置</description>
    <!--
        virtual-host:虚拟主机,用于权限管理,默认为 /
    -->
    <rabbit:connection-factory id="connectionFactory"
                               host="192.168.41.128"
                               port="5672"
                               username="admin"
                               password="admin"
                               virtual-host="/"/>

    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- 咨询订单创建成功 -->
    <rabbit:queue id="q_fanout_consult_create_id" name="q_topic_consult_create_name" durable="false" auto-delete="false" exclusive="false"/>
    <!-- 咨询订单支付成功 -->
    <rabbit:queue id="q_fanout_order_pay_id" name="q_topic_order_pay_name" durable="false" auto-delete="false" exclusive="false"/>

    <!-- fanout exchange -->
    <rabbit:fanout-exchange name="myFanoutExchangeName" id="myFanoutExchangeId">
        <rabbit:bindings>
            <rabbit:binding queue="q_fanout_consult_create_id"/>
            <rabbit:binding queue="q_fanout_order_pay_id"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <!-- spring amqpTemplate声明,此处exchange不能使用id指定 -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="myFanoutExchangeName"/>

    <!-- 消费者 -->
    <bean id="consumerListener" class="cn.gov.wu.rabbit.ConsumerListener"/>
    <rabbit:listener-container connection-factory="connectionFactory">
        <!-- 使用queues或者queue-names都可以 -->
        <rabbit:listener ref="consumerListener" method="receiveMsg" queues="q_fanout_consult_create_id,q_fanout_order_pay_id"/>
    </rabbit:listener-container>

</beans>
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/rabbitmq-config-fanout.xml");
AmqpTemplate amqpTemplate = context.getBean("amqpTemplate", AmqpTemplate.class);
amqpTemplate.convertAndSend("发送fanout消息");
context.close();

相关推荐