rocketMq实战(3)-console和运维
rocketmq这么复杂的东西,没有运维工具可搞不定啊。
哈哈,别急,官方提供了一个WEB项目,可以查看rocketmq数据和执行一些操作。
而且我自己也添加了一些功能
官网:https://github.com/rocketmq/rocketmq-console
运行修改一下namesers的地址,注意多个地址用 分号 分割
下面是成功页面,cluster查询broker集群的tps和出队入队情况。
topic查询生产和消费信息
connection查询生产则和消费者的连接信息。
其他的不讲了,多试试就明白了。
注意consumer这一项,可以查询消息积压,这是我们最关心的。
首先更正一个观念 对与broker是没有积压这个概念的,只有consumer有积压的概念。
看下图可以查询consumer的消费情况,下图是查询指定consumer的页面 每个queue对应的broker的生产和消费情况。
Diff Total 是总的挤压数
重点介绍我自己开发的几个功能
一 ,查询所有consumer的积压
如果想查询所有consumer的挤压情况,抱歉没有,只能自己开发。
这个功能的原理就是 通过ssh Java客户端去执行 命令 获取返回数据,在任何的broker执行都可以(如果童鞋们知道更好的实现方式 请告知)。
命令资料如下
通过sa账号执行 sh /opt/ali-rocketmq/devenv/bin/mqadmin consumerProgress -n \"10.103.16.77:9876;10.103.16.15:9876\" 报出
()
ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!
解决方法
在bin/mqadmin 脚本写入 export JAVA_HOME=/usr/local/java .
具体代码实现见附件
二,定时查询某个topic的挤压数 ,报警功能
代码如下
/** * 单条consumer的定时提醒 * @author chenchangqun * */ public class SingleNotifyTask { static final Logger logger = LoggerFactory.getLogger(SingleNotifyTask.class); private int delayMils;//延迟启动时间,单位 毫秒 private int periodMils;//周期时间 间隔,单位 毫秒 private String consumerName; private INotifyInvoke notifyInvoke;//通知操作 @Resource(name="consumerService") private ConsumerService consumerService; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "MQNotifyTaskScheduledThread"); } }); public void init(){ scheduledExecutorService.scheduleAtFixedRate(new Runnable(){ @Override public void run() { try { Table table = consumerService.consumerProgress(consumerName); String diffCountStr = table.getExtData().get("Diff Total:"); if (StringUtils.isBlank(diffCountStr)) { logger.error("diff Total is null,consumer=" + consumerName); return; } int diffCount = Integer.parseInt(diffCountStr); notifyInvoke.invoke(diffCount,consumerName); } catch (Throwable e) { logger.error("queryConsumerState fail, consumer=" + consumerName, e); } } }, delayMils, periodMils, TimeUnit.MILLISECONDS); } public void setDelayMils(int delayMils) { this.delayMils = delayMils; } public void setPeriodMils(int periodMils) { this.periodMils = periodMils; } public void setConsumerName(String consumerName) { this.consumerName = consumerName; } public void setNotifyInvoke(INotifyInvoke notifyInvoke) { this.notifyInvoke = notifyInvoke; } }
通过 定时调用rocketmq的API查询挤压数,根据实现类的逻辑执行报警
/** * 通知操作 interface * @author chenchangqun * */ public interface INotifyInvoke { /** * 执行通知的具体操作 * @param diffCount */ public void invoke(int diffCount,String consumerName); }
/** * 操作通知具体实现类 * @author chenchangqun * */ public class NotifyInvokeImpl implements INotifyInvoke { private int thresholdValue;//积压阀值,用作是否发出提醒 @Override public void invoke(int diffCount,String consumerName) { if (diffCount > thresholdValue) { System.out.println("single check invoke ," + consumerName + " geater thean thresholdValue ,do something,thresholdValue=" + thresholdValue); } } public void setThresholdValue(int thresholdValue) { this.thresholdValue = thresholdValue; } }
配置如下
<bean id="singleNotifyTask" class="com.alibaba.rocketmq.common.SingleNotifyTask" init-method="init"> <property name="consumerName" value="firstSpringConsumer"></property> <property name="delayMils" value="1000"></property> <property name="periodMils" value="3000"></property> <property name="notifyInvoke" ref="notifyInvoke"></property> </bean>
<!-- 具体的通知动作 --> <bean id="notifyInvoke" class="com.alibaba.rocketmq.common.NotifyInvokeImpl" > <property name="thresholdValue" value="1"></property> </bean>
附件中给出具体代码,仅供参考