flume消息处理的监控信息使用
Flume现在使用越来越多,在使用过程中难免发现性能瓶颈或者消息丢失的问题。在遇到这些问题的时候,第一想到的是通过java自带命令去分析问题和使用一些日志去定位问题。
Flume在处理消息时自带了很多counter,并可以以JMX、Ganglia、JSON等方式发布出来,在需要的时候,在启动脚本中增加该配置项即可使用:
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545
增加启动后,可输入http://188.1.186.XXX:34545/metrics 得到监控信息的json数据
如果在linux上运行,直接执行 curl -XGET '188.1.186.XXX:34545/metrics'

得到消息,可以通过放到eclipse中,命名一个json文件,ctrl+shift+f 格式化下:
{
"SINK.k1": {
"ConnectionCreatedCount": "1",
"ConnectionClosedCount": "0",
"Type": "SINK",
"BatchCompleteCount": "0",
"BatchEmptyCount": "0",
"EventDrainAttemptCount": "7908340",
"StartTime": "1514878638909",
"EventDrainSuccessCount": "7657343",
"BatchUnderflowCount": "250997",
"StopTime": "0",
"ConnectionFailedCount": "0"
},
"CHANNEL.c1": {
"ChannelCapacity": "1000000",
"ChannelFillPercentage": "0.0",
"Type": "CHANNEL",
"ChannelSize": "0",
"EventTakeSuccessCount": "7908340",
"EventTakeAttemptCount": "7908466",
"StartTime": "1514878638906",
"EventPutAttemptCount": "7908340",
"EventPutSuccessCount": "7908340",
"StopTime": "0"
},
"SOURCE.r1": {
"KafkaEventGetTimer": "6468875",
"AppendBatchAcceptedCount": "0",
"EventAcceptedCount": "7908340",
"AppendReceivedCount": "0",
"StartTime": "1514878643588",
"AppendBatchReceivedCount": "0",
"KafkaCommitTimer": "156254",
"EventReceivedCount": "7908340",
"Type": "SOURCE",
"AppendAcceptedCount": "0",
"OpenConnectionCount": "0",
"KafkaEmptyCount": "0",
"StopTime": "0"
}
}通过对source、channel、sink中的指标了解系统的处理瓶颈。
如果自己开发的插件,同样可以使用这些counter来完成统计信息的输出,如:
public class ElasticSearchSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchSink.class);
private BulkProcessor bulkProcessor;private SinkCounter sinkCounter;
@Override
public void configure(Context context) {
...
buildIndexBuilder(context);
buildSerializer(context);
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
bulkProcessor = new BulkProcessorBuilder().buildBulkProcessor(context, client);
}
@Override
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction txn = channel.getTransaction();
txn.begin();
try {
Event event = channel.take();
if (event != null) {
String body = new String(event.getBody(), Charsets.UTF_8);
sinkCounter.incrementEventDrainAttemptCount();
if (!Strings.isNullOrEmpty(body)) {
String index = indexBuilder.getIndex(event);
String type = indexBuilder.getType(event);
String id = indexBuilder.getId(event);
XContentBuilder xContentBuilder = serializer.serialize(event);
if(index!=null && xContentBuilder != null) {
if (!StringUtil.isNullOrEmpty(id)) {
bulkProcessor.add(new IndexRequest(index, type, id)
.source(xContentBuilder));
sinkCounter.incrementEventDrainSuccessCount();<br /> ...<br />@Override
public synchronized void start() {
sinkCounter.start();
sinkCounter.incrementConnectionCreatedCount();
super.start();
}
...
总结:
1.遇到flume性能问题,在启动脚本增加-Dflume.monitoring.type=http -Dflume.monitoring.port=34545 开启监控日志
2.通过http://<hostname>:<port>/metrics获取信息
3.自定义开发插件时,可以使用flume已有counter记录统计信息
备注:更多内容,参考官方文档http://flume.apache.org/FlumeUserGuide.html#monitoring