RocketMQ
RocketMQ整理
概念
brokerMQ服务节点topic主题message queue消息队列,类似于kafka中的partition
producer生产者,先去连接
name server,查询到对应的broker信息,再去连接brokerconsumer消费者,先去连接
name server,查询到对应的broker信息,再去连接brokername server命名服务器,用于存储
broker信息
启动命令
先启动name server
./bin/mqnamesrv
然后再启动broker
./bin/mqbroker -n {name.server.url}
./bin/mqbroker -c {config}
#-n 指定nameserver的ip和端口
#-c 以指定配置文件为参数启动
#端口默认为 9876这时候启动应该会报错
原因:
RocketMQ的默认配置是生产环境级别的,JVM的内存达到8G,如果机器配置不够就会报错
解决:
修改
runbroker.sh脚本,将JVM参数 -Xms,-Xmx,设置为机器可以承受的大小修改
runserver.sh脚本,将JVM参数 -Xms,-Xmx,设置为机器可以承受的大小
Console端测试
RocketMQ提供了quick start的example
配置环境变量
NAMESRV_ADDR为name server的url执行
./bin/tools.sh org.apache.rocketmq.example.quickstart.Producer来启动一个生产者,自动发送测试消息执行
./bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer来启动一个消费者来接收消息
JAVA客户端测试
引入RocketMQ依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.2</version>
</dependency>Java示例代码
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
?
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("test-group");
// Specify name server addresses.
producer.setNamesrvAddr("172.16.55.185:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest11" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}执行main方法会报错,提示timeout
原因:
因为RocketMQ默认启动broker的时候,分配的ip只有服务器内网访问权限,外网无法访问
解决:
通过配置文件指定broker的启动ip为服务器外网能够访问的ip
然后重启broker,使用-c参数指定配置文件路径
UI管理工具(了解)
rocketmq-console,可以同过github下载
具体使用方式上网搜索
Topic创建
DefaultMQProducer producer = new DefaultMQProducer("HAOKE_IM");
producer.setNamesrvAddr("172.16.55.185:9876");
producer.start();
/**
* key:broker名称
* newTopic:topic名称
* queueNum:队列数(分区)默认4个
*/
producer.createTopic("broker_name", "topic", 8);消息发送
同步模式
String msgStr = "用户A发送消息给用户B";
Message msg = new Message("haoke_im_topic","SEND_MSG",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult sendResult = producer.send(msg);
System.out.println("消息状态:" + sendResult.getSendStatus());
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息queue:" + sendResult.getMessageQueue());
System.out.println("消息offset:" + sendResult.getQueueOffset());异步模式
String msgStr = "用户A发送消息给用户B";
Message msg = new Message("broker_name","SEND_MSG",msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("消息状态:" + sendResult.getSendStatus());
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息queue:" + sendResult.getMessageQueue());
System.out.println("消息offset:" + sendResult.getQueueOffset());
}
@Override
public void onException(Throwable e) {
System.out.println("发送失败!" + e);
}
});Message数据结构
| 字段名 | 默认值 | 说明 |
|---|---|---|
topic | null | 必填,线下环境不需要申请,线上环境需要申请后才能使用 |
Body | null | 必填,二进制形式,序列化由应用决定,producer 与 consumer 要协商好序列化形式 |
Tags | null | 选填,类似于 Gmail 为每封邮件设置的标签,方便服务器过滤使用。目前只支持每个消息设置一个 tag,所以也可以类比为 Notify 的 MessageType 概念 |
Keys | null | 选填,代表这条消息的业务关键词,服务器会根据 keys 创建哈希索引,设置后,可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能保证 key 唯一,例如订单号,商品 Id 等 |
Flag | 0 | 选填,完全由应用来设置,RocketMQ 不做干预 |
DelayTimeLevel | 0 | 选填,消息延时级别,0 表示不延时,大于 0 会延时特定的时间才会被消费 |
WaitStoreMsgOK | TRUE | 选填,表示消息是否在服务器落盘后才返回应答 |
消息接收
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("172.16.55.185:9876");
// 订阅topic,* 表示接收此Topic下的所有消息,相当于一个匹配符
//如果为 "tag1"就表示只订阅tag为"tag1"的消息 还可以组合订阅 "tag1 || tag2 || tag3"
consumer.subscribe("topic_name", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
System.out.println(new String(msg.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
System.out.println("收到消息->" + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();消息过滤器
RocketMQ支持根据用户自定义属性进行过滤
过滤表达式类似于SQL的where,如:a > 5 AND b = ‘abc‘
RocketMQ默认没有开启消息过滤器
所以需要先开启才能使用
开启方式:
在配置文件中新增enablePropertyFilter = true
生产者发送带有条件的消息
String msgStr = "大家好我叫小美,今年18岁";
Message msg = new Message("topic_name","tag1",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("age", "18");
msg.putUserProperty("sex", "女");
// 发送消息
SendResult sendResult = producer.send(msg);消费者使用过滤器接收
// 订阅topic,接收此Topic下的所有消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("172.16.55.185:9876");
// 订阅topic,* 表示接收此Topic下的所有消息,相当于一个匹配符
consumer.subscribe("topic_name", MessageSelector.bySql("age>=18 AND sex=‘女‘"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
System.out.println(new String(msg.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
System.out.println("收到消息->" + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();消息顺序
在某些业务中,consumer在消费消息时,是需要按照生产者发送消息的顺序进行消费的
比如在电商系统中,订 单的消息,会有创建订单、订单支付、订单完成,如果消息的顺序发生改变,那么这样的消息就没有意义了
生产者代码实现
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("172.16.55.185:9876");
producer.start();
for (int i = 0; i < 100; i++) {
String msgStr = "order --> " + i;
int orderId = i % 10; // 模拟生成订单id
Message message = new Message("topic","tag",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
//orderId实际传入到lambda表达式(MessageQueueSelector)第三个参数
SendResult sendResult = producer.send(message, (mqs, msg, arg) -> {
//orderId范围0-9之间
Integer id = (Integer) arg;
//topic默认queue个数为4 所以index范围0-4之间
int index = id % mqs.size();
//凡是orderId对4取余为0的选用第1个queue
//凡是orderId对4取余为1的选用第2个queue
//凡是orderId对4取余为2的选用第3个queue
//凡是orderId对4取余为3的选用第4个queue
//这样一来就保证了满足某一条件的消息能够总是发送到特定的queue中
return mqs.get(index);
}, orderId);
System.out.println(sendResult);
}
producer.shutdown();消费者代码
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("172.16.55.185:9876");
consumer.subscribe("topic", "tag");
//使用MessageListenerOrderly这种顺序监听器,可以满足按顺序消费同一队列中的消息,不同的线程处理不同的队列
//比如thread-pool-1专门消费queue-0中的消息
//比如thread-pool-2专门消费queue-1中的消息
//比如thread-pool-3专门消费queue-2中的消息
//比如thread-pool-4专门消费queue-3中的消息
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New
Messages: " + msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();分布式事务
随着项目越来越复杂,越来越服务化,就会导致系统间的事务问题,这个就是分布式事务问题。 分布式事务分类有这几种:
基于单个JVM,数据库分库分表了(跨多个数据库)。
基于多JVM,服务拆分了(不跨数据库)。
基于多JVM,服务拆分了 并且数据库分库分表了。
解决分布式事务问题的方案有很多,使用消息实现只是其中的一种
实现原理
半消息
生产者发送一条
半消息MQ不会将
半消息发送给消费者等生产者发送一条
commit消息,MQ才会将半消息发送给消费者生产者发送一条
rollback消息,MQ将半消息给回滚消息回查
当
commit或者rollback消息在网络中丢失MQ会回调生产者提供的
check方法,来确认消息的状态是commit或者rollback
生产者代码
TransactionMQProducer producer = new
TransactionMQProducer("producer_group");
producer.setNamesrvAddr("172.16.55.185:9876");
// 设置自定义事务监听器
producer.setTransactionListener(new TransactionListenerImpl());
producer.start();
// 发送消息
Message message = new Message("pay_topic", "用户A给用户B转账500元".getBytes("UTF-8"));
producer.sendMessageInTransaction(message, null);
Thread.sleep(999999);
producer.shutdown();TransactionListenerImpl.java
public class TransactionListenerImpl implements TransactionListener {
private static Map<String, LocalTransactionState> STATE_MAP = new HashMap<>();
/**
* 执行具体的业务逻辑
*
* @param msg 发送的消息对象
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
System.out.println("用户A账户减500元.");
Thread.sleep(500); //模拟调用服务
// System.out.println(1/0);
System.out.println("用户B账户加500元.");
Thread.sleep(800);
STATE_MAP.put(msg.getTransactionId(),
LocalTransactionState.COMMIT_MESSAGE);
// 二次提交确认
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
e.printStackTrace();
}
STATE_MAP.put(msg.getTransactionId(),
LocalTransactionState.ROLLBACK_MESSAGE);
// 回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
}
/**
* 消息回查
*
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return STATE_MAP.get(msg.getTransactionId());
}
}消费者push和pull模式
push模式客户端与服务端建立连接后,当服务端有消息时,将消息推送到客户端
pull模式客户端不断的轮询请求服务端,来获取新的消息
但在具体实现时,push和pull模式都是采用消费端主动拉取的方式,即consumer轮询从broker拉取消息
push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒 MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的pull方式里,取消息的过程需要用户自己写,首先通过打算消费的topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开 始offset,直到取完了,再换另一个MessageQueue
消息模式
集群模式
同一个
Group下的消费者每个消费者消费一部分
topic数据同一个
Group下的消费者合起来消费的数据就是topic的所有数据广播模式
同一个
Group下的消费者每一个消费的数据就是
topic的所有数据
// 集群模式 consumer.setMessageModel(MessageModel.CLUSTERING); // 广播模式 consumer.setMessageModel(MessageModel.BROADCASTING);
重复消息
造成消息重复的根本原因是:网络不可达
只要通过网络交换数据,就无法避免这个问题
所以解决这个问题的办 法就是绕过这个问题
那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理
消费端处理消息的业务逻辑保持
幂等性保证每条消息都有
唯一编号且保证消息处理成功与去重表的日志同时出现
持久化
RocketMQ中的消息数据存储,采用了零拷贝技术(使用 mmap + write 方式)
文件系统采用 Linux Ext4 文件系 统进行存储
在RocketMQ中,消息数据是保存在磁盘文件中,为了保证写入的性能,RocketMQ尽可能保证顺序写入,顺序写 入的效率比随机写入的效率高很多。 RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的
CommitLog是真正存储数据的文件
ConsumeQueue是索引文件,存储数据指向到物理文件的配置。
Consume Queue相当于kafka中的partition,是一个逻辑队列,存储了这个Queue在CommiLog中的起始 offset,log大小和MessageTag的hashCode
每次读取消息队列先读取consumerQueue,然后再通过consumerQueue去CommitLog中拿到消息主体
同步异步写入
RocketMQ 为了提高性能,会尽可能地保证磁盘的顺序写入
消息在通过 producer 写入 RocketMQ 的时候,有两 种写磁盘方式
分别是同步写入与异步写入
同步
在返回写成功状态时,消息已经被写入磁盘
消息写入内存的缓冲区后,立刻通知写入线程写入
等待写入完成,写入线程 执行完成后唤醒等待的线程,返回消息写成功的状态
异步
在返回写成功状态时,消息可能只是被写入了内存的 缓冲区
写操作的返回快,吞吐量大
当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入
配置指定写入方式
#-- 异步 flushDiskType = ASYNC_FLUSH #-- 同步 flushDiskType = SYNC_FLUSH