ActiveMQ 的负载均衡集群加高可用方案

环境 linux 系统 ,而我本人用的是CentOS-7 ,运行所需的环境是java 所以需要安装jdk , 安装过程可以看我的这个博客

我采用的方案是: 三台ActiveMQ 服务器 这可以到达既可集群又可高可用,架构是这样

ActiveMQ 的负载均衡集群加高可用方案

node A 和 node B ,node A 和 node C 他们之间形成一个 Broker 这样信息同步 ,实现了负载均衡 ,如果 B挂了 C和A 还可以继续

服务 ,信息也不会丢失。如果B 挂了 这时C 就成为一个master, 这是高可用就发挥了 ,再假如 A挂了 ,此时B还能继续服务 所以这时就要马上处理A节点 ,这样就能保证我们的消息队列里面所有的数据不丢失和服务可持续性。

首先下载 ActiveMQ 命令:

wget https://mirrors.tuna.tsinghua.edu.cn/apache//activemq/5.15.4/apache-activemq-5.15.4-bin.tar.gz

下载之后解压 命令

tar -zvxf apache-activemq-5.15.4-bin.tar.gz

而我三台ActiveMQ 服务器目前是放在同一台linux虚拟机 最好是三台分开不同机器 不然 如果linux 机器挂了就完了 。

解压之后,复制三分ActiveMQ 成三个节点 命名为 activemq_a ,activemq_b, activemq_c

activemq_a 页面访问端口是 8161,服务端口61616

activemq_b 页面访问端口是 8162,服务端口61617

activemq_c 页面访问端口是 8163,服务端口61618

然后在创建一个文件存放持久化的数据 命令:

mkdir shareFile

然后修改对应配置文件

①修改node a 的配置文件 :

vim activemq_a/conf/activemq.xml

打开编辑添加如下 代码 代码是放在这个 transportConnectors标签前后位置


  1. <networkConnectors>
  2. <networkConnector name="brokes_network" uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)" />
  3. </networkConnectors>

然后保全退回

这是我的配置

node A 配好了

接下来是 node B

vim activemq_b/conf/activemq.xml

打开编辑添加如下 代码 修改持久化数据存放的地方


  1. <persistenceAdapter>
  2. <kahaDB directory="/usr/sofk/shareFile/kahadb"/>
  3. </persistenceAdapter>

修改服务端口号

<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>

代码是放在这个 transportConnectors标签前后位置


  1. <networkConnectors>
  2. <networkConnector name="net_a" uri="static:(tcp://127.0.0.1:61616)" />
  3. </networkConnectors>

然后保全退回

在打开 jetty.xml 修改管理页面的端口 命令:

vim activemq_b/conf/jetty.xml


  1. <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
  2. <!-- the default port number for the web console -->
  3. <property name="host" value="0.0.0.0"/>
  4. <property name="port" value="8162"/> <!-- 修改这里 -->
  5. </bean>

然后保全退回

node B 就可以了

接下来 配置node C 配置地方差不多

[java] view plain copy

  1. <code class="language-java"><persistenceAdapter>
  2. <kahaDB directory="/usr/sofk/shareFile/kahadb"/> <!-- 修改路径-->
  3. </persistenceAdapter></code>

<!--修改端口-->

<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>

<!--修改网络-->


  1. <networkConnectors>
  2. <networkConnector name="net_a" uri="static:(tcp://127.0.0.1:61616)" />
  3. </networkConnectors>

然后保全退出

然后再修改jetty.xml


  1. <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
  2. <!-- the default port number for the web console -->
  3. <property name="host" value="0.0.0.0"/>
  4. <property name="port" value="8163"/> <!-- 修改这里 -->
  5. </bean>

然后保全退出

这样所有的节点就可以了

接下来是启动所有服务

命令:

./activemq_a/bin/activemq start 启动A

./activemq_b/bin/activemq start 启动B

./activemq_c/bin/activemq start 启动C

然后可以自己查看一下端口情况。

接下下就是在页面访问了

http://10.0.0.190:8161/admin/

http://10.0.0.190:8162/admin/

http://10.0.0.190:8163/admin/

这时会发现 C 服务是访问不到 那时因为他是一个Slave 是不提服务的 他是等待B点挂了 他是自己启动服务的

接下是 代码的实现


  1. public class ConnectionUtil {
  2. public static Connection getConnection(String url) throws JMSException {
  3. ConnectionFactory factory=new ActiveMQConnectionFactory(url);
  4. Connection connection = factory.createConnection();
  5. connection.start();
  6. return connection;
  7. }
  8. }
  9. public class Producer {
  10. private static final String QUEUENAME ="test-queue";
  11. private static final String URL ="tcp://10.0.0.190:61616";
  12. ///失效转移 生产者为高可用
  13. private static final String FAILOVERURL ="failover:(tcp://10.0.0.190:61617,tcp://10.0.0.190:61618)?randomize=true";
  14. //队列模式
  15. public void send() throws JMSException {
  16. Connection connection = ConnectionUtil.getConnection(FAILOVERURL);
  17. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  18. Queue queue = session.createQueue(QUEUENAME);
  19. MessageProducer producer = session.createProducer(queue);
  20. TextMessage textMessage=null;
  21. for (int i = 0; i < 100; i++) {
  22. textMessage = session.createTextMessage("测试数据:" + i);
  23. producer.send(textMessage);
  24. }
  25. producer.close();
  26. session.close();
  27. connection.close();
  28. }}
  29. public static void main(String[] args) throws JMSException {
  30. new Producer().send();
  31. System.out.println("发送完成");
  32. }
  33. public class Consumer {
  34. private static final String QUEUENAME ="test-queue";
  35. ///失效转移 负载均衡
  36. private static final String FAILOVERURL ="failover:(tcp://10.0.0.190:61616,tcp://10.0.0.190:61617,tcp://10.0.0.190:61618)?randomize=true";
  37. public void msg() throws JMSException {
  38. Connection connection = ConnectionUtil.getConnection(FAILOVERURL);
  39. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  40. Queue queue = session.createQueue(QUEUENAME);
  41. MessageConsumer consumer = session.createConsumer(queue);
  42. consumer.setMessageListener(new MessageListener() {
  43. @Override
  44. public void onMessage(Message message) {
  45. TextMessage textMessage = (TextMessage)message;
  46. try {
  47. System.out.println("这是接收到数据: = "+textMessage.getText());
  48. } catch (JMSException e) {
  49. e.printStackTrace();
  50. }
  51. }
  52. });
  53. }

  1. public static void main(String[] args) throws JMSException {
  2. new Consumer().msg();
  3. }

以上代码和配置完成 如有不对请多多指教 ,如有雷同都是在学习中的成长

相关推荐