springboot 整合activeMQ

yml配置文件


  1. spring:
  2. activemq:
  3. broker-url: tcp://127.0.0.1:61616
  4. in-memory: true
  5. user: admin
  6. password: admin
  7. pool:
  8. enabled: false
  9. use-exponential-back-off: true # 是否在每次尝试重新发送失败后,增长这个等待时间
  10. maximum-redeliveries: 10 # 重发次数,默认为6次 这里设置为10次
  11. initial-redelivery-delay: 1 # 重发时间间隔,默认为1秒
  12. back-off-multiplier: 2 # 第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
  13. use-collision-avoidance: false # 是否避免消息碰撞
  14. maximum-redelivery-delay: -1 # 设置重发最大拖延时间-1 表示没有拖延只有use-collision-avoidance(true)为true时生效

ActiveMQConfig配置


  1. import org.apache.activemq.ActiveMQConnectionFactory;
  2. import org.apache.activemq.RedeliveryPolicy;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.jms.annotation.EnableJms;
  7. import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
  8. import org.springframework.jms.config.JmsListenerContainerFactory;
  9. import org.springframework.jms.core.JmsTemplate;
  10. /**
  11. * @description: active配置类
  12. * @author: Mr.ZHAO
  13. * @cereate: 2018/08/23 09:20:50
  14. */
  15. @EnableJms
  16. @Configuration
  17. public class ActiveMQConfig {
  18. @Value("${spring.activemq.user}")
  19. private String user;
  20. @Value("${spring.activemq.password}")
  21. private String password;
  22. @Value("${spring.activemq.broker-url}")
  23. private String brokerUrl;
  24. @Value("${spring.activemq.use-exponential-back-off}")
  25. private Boolean useExponentialBackOff;
  26. @Value("${spring.activemq.maximum-redeliveries}")
  27. private Integer maximumRedeliveries;
  28. @Value("${spring.activemq.initial-redelivery-delay}")
  29. private Integer initialRedeliveryDelay;
  30. @Value("${spring.activemq.back-off-multiplier}")
  31. private Integer backOffMultiplier;
  32. @Value("${spring.activemq.use-collision-avoidance}")
  33. private Boolean useCollisionAvoidance;
  34. @Value("${spring.activemq.maximum-redelivery-delay}")
  35. private Integer maximumRedeliveryDelay;
  36. @Bean
  37. public RedeliveryPolicy redeliveryPolicy() {
  38. RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
  39. //是否在每次尝试重新发送失败后,增长这个等待时间
  40. redeliveryPolicy.setUseExponentialBackOff(useExponentialBackOff);
  41. //重发次数,默认为6次 这里设置为10次
  42. redeliveryPolicy.setMaximumRedeliveries(maximumRedeliveries);
  43. //重发时间间隔,默认为1秒
  44. redeliveryPolicy.setInitialRedeliveryDelay(initialRedeliveryDelay);
  45. //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
  46. redeliveryPolicy.setBackOffMultiplier(backOffMultiplier);
  47. //是否避免消息碰撞
  48. redeliveryPolicy.setUseCollisionAvoidance(useCollisionAvoidance);
  49. //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
  50. redeliveryPolicy.setMaximumRedeliveryDelay(maximumRedeliveryDelay);
  51. return redeliveryPolicy;
  52. }
  53. @Bean
  54. public ActiveMQConnectionFactory activeMQConnectionFactory(RedeliveryPolicy redeliveryPolicy) {
  55. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(user, password, brokerUrl);
  56. activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
  57. return activeMQConnectionFactory;
  58. }
  59. @Bean
  60. public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory) {
  61. JmsTemplate jmsTemplate = new JmsTemplate();
  62. //进行持久化配置 1表示非持久化,2表示持久化
  63. jmsTemplate.setDeliveryMode(2);
  64. jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
  65. //客户端签收模式
  66. jmsTemplate.setSessionAcknowledgeMode(4);
  67. return jmsTemplate;
  68. }
  69. @Bean
  70. public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory activeMQConnectionFactory) {
  71. DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
  72. bean.setPubSubDomain(true);
  73. bean.setConnectionFactory(activeMQConnectionFactory);
  74. return bean;
  75. }
  76. @Bean
  77. public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory activeMQConnectionFactory) {
  78. DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
  79. bean.setConnectionFactory(activeMQConnectionFactory);
  80. return bean;
  81. }
  82. }

生产者


  1. import com.renren.common.utils.JsonUtils;
  2. import org.apache.activemq.command.ActiveMQQueue;
  3. import org.apache.activemq.command.ActiveMQTopic;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.jms.core.JmsTemplate;
  6. import org.springframework.stereotype.Service;
  7. /**
  8. * @description: 生产者
  9. * @author: Mr.ZHAO
  10. * @cereate: 2018/08/23 09:30:42
  11. */
  12. @Service
  13. public class ActiveProducer {
  14. @Autowired
  15. private JmsTemplate jmsTemplate;
  16. /**
  17. * 发送队列
  18. *
  19. * @param queue 队列名
  20. * @param message 信息内容
  21. */
  22. public void sendQueue(final String queue, final Object message) {
  23. this.jmsTemplate.convertAndSend(new ActiveMQQueue(queue), JsonUtils.objectToJson(message));
  24. }
  25. /**
  26. * 发送主题
  27. *
  28. * @param topic 主题名
  29. * @param message 主题消息
  30. */
  31. public void sendTopic(final String topic, final Object message) {
  32. this.jmsTemplate.convertAndSend(new ActiveMQTopic(topic), JsonUtils.objectToJson(message));
  33. }
  34. }

消费者


  1. import org.springframework.jms.annotation.JmsListener;
  2. import org.springframework.stereotype.Component;
  3. /**
  4. * @description: 消费者
  5. * @author: Mr.ZHAO
  6. * @cereate: 2018/08/23 09:32:48
  7. */
  8. @Component
  9. public class ActiveConsumer {
  10. /**
  11. * 消费话题(topic)
  12. *
  13. * @param text
  14. */
  15. @JmsListener(destination = "topic_test", containerFactory = "jmsListenerContainerTopic")
  16. public void receiveTopic2(String text) {
  17. System.out.println("Topic Consumer2:" + text);
  18. }
  19. /***
  20. * 消费队列(queue)
  21. * @param text
  22. */
  23. @JmsListener(destination = "queue_test")
  24. public void reviceQueue(String text) {
  25. System.out.println("Queue Consumer:" + text);
  26. }
  27. }

测试


  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class DynamicDataSourceTest {
  4. @Autowired
  5. private ActiveProducer producer;
  6. @Test
  7. public void MQ() {
  8. producer.sendQueue("queue_test", "你好");
  9. producer.sendTopic("topic_test", "你------好");
  10. while (true) {
  11. }
  12. }
  13. }

springboot 整合activeMQ

相关推荐