rocketMq实战(3)-console和运维


 
 rocketmq这么复杂的东西,没有运维工具可搞不定啊。

哈哈,别急,官方提供了一个WEB项目,可以查看rocketmq数据和执行一些操作。

而且我自己也添加了一些功能

官网:https://github.com/rocketmq/rocketmq-console

运行修改一下namesers的地址,注意多个地址用 分号 分割


rocketMq实战(3)-console和运维
 

下面是成功页面,cluster查询broker集群的tps和出队入队情况。

topic查询生产和消费信息

connection查询生产则和消费者的连接信息。

其他的不讲了,多试试就明白了。

注意consumer这一项,可以查询消息积压,这是我们最关心的。

首先更正一个观念 对与broker是没有积压这个概念的,只有consumer有积压的概念。


rocketMq实战(3)-console和运维
 看下图可以查询consumer的消费情况,下图是查询指定consumer的页面 每个queue对应的broker的生产和消费情况。

Diff Total 是总的挤压数


rocketMq实战(3)-console和运维
 

 重点介绍我自己开发的几个功能

一 ,查询所有consumer的积压

如果想查询所有consumer的挤压情况,抱歉没有,只能自己开发。

这个功能的原理就是 通过ssh Java客户端去执行 命令 获取返回数据,在任何的broker执行都可以(如果童鞋们知道更好的实现方式 请告知)。

命令资料如下

RocketMQ命令整理:http://jameswxx.iteye.com/blog/2091971
 

rocketMq实战(3)-console和运维
 通过sa账号执行 sh /opt/ali-rocketmq/devenv/bin/mqadmin consumerProgress -n \"10.103.16.77:9876;10.103.16.15:9876\"  报出

()

ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!

解决方法

 在bin/mqadmin 脚本写入 export JAVA_HOME=/usr/local/java .

 
rocketMq实战(3)-console和运维
 

具体代码实现见附件

 二,定时查询某个topic的挤压数 ,报警功能

代码如下

/**
 * 单条consumer的定时提醒
 * @author chenchangqun
 *
 */
public class SingleNotifyTask {
    static final Logger logger = LoggerFactory.getLogger(SingleNotifyTask.class);
	private int delayMils;//延迟启动时间,单位 毫秒
	private int periodMils;//周期时间 间隔,单位 毫秒
	private 	String consumerName;
	   private INotifyInvoke notifyInvoke;//通知操作	
	@Resource(name="consumerService")
    private ConsumerService consumerService;
       private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "MQNotifyTaskScheduledThread");
        }
    });
	public void init(){
   scheduledExecutorService.scheduleAtFixedRate(new Runnable(){
	@Override
			public void run() {
				try {
					Table table = consumerService.consumerProgress(consumerName);
					String diffCountStr = table.getExtData().get("Diff Total:");
					if (StringUtils.isBlank(diffCountStr)) {
						logger.error("diff Total is null,consumer=" + consumerName);
						return;
					}
					int diffCount = Integer.parseInt(diffCountStr);
					notifyInvoke.invoke(diffCount,consumerName);
				

				} catch (Throwable e) {
					logger.error("queryConsumerState fail, consumer=" + consumerName, e);
				}

			}
	}, delayMils, periodMils, TimeUnit.MILLISECONDS);
	}
 
	public void setDelayMils(int delayMils) {
		this.delayMils = delayMils;
	}
	public void setPeriodMils(int periodMils) {
		this.periodMils = periodMils;
	}
	public void setConsumerName(String consumerName) {
		this.consumerName = consumerName;
	}

	public void setNotifyInvoke(INotifyInvoke notifyInvoke) {
		this.notifyInvoke = notifyInvoke;
	}
	
}

 通过 定时调用rocketmq的API查询挤压数,根据实现类的逻辑执行报警

/**
 * 通知操作 interface
 * @author chenchangqun
 *
 */
public interface INotifyInvoke {

/**
 * 执行通知的具体操作
 * @param diffCount
 */
public void  invoke(int diffCount,String consumerName);
}
/**
 * 操作通知具体实现类
 * @author chenchangqun
 *
 */
public class NotifyInvokeImpl implements INotifyInvoke {
private int thresholdValue;//积压阀值,用作是否发出提醒
	@Override
	public void invoke(int diffCount,String  consumerName) {
		if (diffCount > thresholdValue) {
			System.out.println("single check invoke ," + consumerName
					+ "  geater thean thresholdValue ,do something,thresholdValue=" + thresholdValue);
		}
	}
	public void setThresholdValue(int thresholdValue) {
		this.thresholdValue = thresholdValue;
	}
}

配置如下

<bean id="singleNotifyTask" class="com.alibaba.rocketmq.common.SingleNotifyTask" init-method="init">
   <property name="consumerName" value="firstSpringConsumer"></property>
   <property name="delayMils" value="1000"></property>
   <property name="periodMils" value="3000"></property>
   <property name="notifyInvoke" ref="notifyInvoke"></property> 
   </bean>
<!-- 具体的通知动作 -->   
<bean id="notifyInvoke" class="com.alibaba.rocketmq.common.NotifyInvokeImpl" >
   <property name="thresholdValue" value="1"></property>
   </bean>

附件中给出具体代码,仅供参考

相关推荐