分布式消息系统RocketMQ
分布式消息系统RocketMQ
一、消息系统
RocketMQ是阿里开源出来的一个消息系统,2011年Linkin开源了Kafka,淘宝中间件团队在对Kafka做过充分Review之后 ,由于kafka是用scala开发的,于是重新用Java语言编写了RocketMQ 。定位于非日志的可靠消息传输 ,被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景 。
二、RocketMQ架构图
Producer:消息生产者,负责产生消息,一般有业务系统负责产生消息
Consumer: 消息消费者,负责消费消息,一般是后台系统负责异步消费
Broker: 消息中转角色,负责储存消息,转发消息,一般也称为Server
nameServer: broker注册topic信息到nameServer,producer和consumer从nameServer获取topic的路由信息
2.1 生产者
① Producer与NameServer集群,中的其中一个节点,建立长连接;
② 定期从NameServer取Topic路由信息;
③ 并向提供Topic服务的Master建立长连接,且定时向Master发送心跳;
④ Produce完全无状态,可集群部署;
2.2 消费者
① Consumer与NameServer集群,中的其中一个节点,建立长连接;
② 定期从NameServer取Topic路由信息;
③ 并向提供Topic服务的Master、Slaver建立长连接,且定时向Master、Slaver发送心跳;
④ Consumer即可从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定;
2.3 Broker
① 每个broker与nameServer集群,中的所有节点,建立长连接;
② 定时注册Topic信息到所有的NameServer;
三、RocketMQ使用
3.1 安装
https://github.com/alibaba/RocketMQ/releases
tar zxvf alibaba-rocketmq-3.2.6.tar.gz
ROCKETMQ_HOME=/root/mq/rocketmq PATH=$PATH:$ROCKETMQ_HOME/bin
3.2 修改配置文件
/root/mq/rocketmq/conf/
3.3 启动nameServer
cd /home/dev/alibaba-rocketmq/bin nohup sh mqnamesrv
3.4 启动broker
cd /home/dev/alibaba-rocketmq/bin nohup sh mqbroker -c /home/dev/alibaba-rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
3.5 应用消费端
<bean id="consumerNotifyManager" class="com.mengka.mq.client.NotifyManagerBean" init-method="init"> <property name="groupId" value="OPEN_PROJECT_C_GROUP_TEST" /> <property name="name" value="open_project" /> <property name="topic" value="ACCTRANS-CHANGE-MESSAGE"/> <property name="ctype" value="PUSH"/> <property name="namesrvAddr" value="127.0.0.1:9876"/> <property name="messageListener" ref="taaMessageListener" /> </bean>
/** * User: mengka * Date: 15-4-16-下午3:40 */ @Component public class TaaMessageListener implements MessageListenerConcurrently { private static final Logger log = LoggerFactory.getLogger(TaaMessageListener.class); @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> list, ConsumeConcurrentlyContext Context) { for (MessageExt messageExt : list) { log.info("---------------, receive message, id = " + messageExt.getMsgId() + " , ip = " + messageExt.getBornHost() + " , tags = " + messageExt.getTags() + " , conetnt = " + new String(messageExt.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
3.6 生产端
<bean id="producterNotifyManager" class="com.mengka.mq.client.NotifyManagerBean" init-method="initProducter"> <property name="groupId" value="P-ACCTRANS-CHANGE-MESSAGE" /> <property name="name" value="open_project" /> <property name="topic" value="ACCTRANS-CHANGE-MESSAGE"/> <property name="namesrvAddr" value="127.0.0.1:9876"/> </bean>
String serviceConfigXMLs[] = new String[]{"rocketmq/rocketmq-push-context.xml"}; ApplicationContext context = new ClassPathXmlApplicationContext(serviceConfigXMLs); NotifyManager producterNotifyManager = (NotifyManager) context.getBean("producterNotifyManager"); String key1 = "aa"; String tag1 = "mkTag"; String content = "Just for test[" + TimeUtil.toDate(new Date(), TimeUtil.format_1); Message message1 = new StringMessage(key1,tag1,content); SendResult result1 = producterNotifyManager.sendMessage(message1);