Storm的配置

2.2    Storm的配置
 
2.2.1  Storm的配置类型
 
Storm有大量的配置,可以调整Nimbus、Supervisor、拓扑运行的参数,其中有些配置是不能修改的系统配置,而其他配置是可以修改的。
 
每个配置会有一个默认值,该值定义在Storm代码库的defaults.yaml文件中。在Nimbus和Supervisor的类路径中定义一个storm.yaml文件,可以覆盖这些配置值。使用StormSubmitter提交拓扑的时候,可以定义一个指定拓扑的配置,但是只能覆盖前缀为TOPOLOGY的配置项。
 
Storm 0.7.0以后的版本开始允许在Spout/Bolt中覆盖配置,可以修改的配置主要有:
"topology.debug"。
"topology.max.spout.pending"。
"topology.max.task.parallelism"。
"topology.kryo.register"。
 
topology.kryo.register与其他的配置有所不同,它的序列化会应用到拓扑上的所有组件。
 
Storm的Java API也提供了两种方式指定组件的配置。
 
内部的(Internally)
 
在Spout或者Bolt类中,覆盖getComponentConfiguration方法,返回组件配置的Map对象。
getComponentConfiguration方法定义如下:
Map<String, Object> getComponentConfiguration()
 
外部的(Externally)
 
使用TopologyBuilder类的setSpout方法返回SpoutDeclarer对象,使用setBolt方法返回BoltDeclarer对象。SpoutDeclarer与BoltDeclarer实现了ComponentConfigurationDeclarer接口,该接口有addConfiguration方法和addConfigurations方法,可以通过调用这两个方法来覆盖组件的配置。
 
SpoutDeclarer接口的定义代码如下:
public interface SpoutDeclarer extends 
ComponentConfigurationDeclarer<SpoutDeclarer> {
    
}
 
BoltDeclarer接口的定义代码如下:
public interface BoltDeclarer extends InputDeclarer<BoltDeclarer>,
ComponentConfigurationDeclarer<BoltDeclarer> {
    
}
 
ComponentConfigurationDeclarer接口的定义代码如下:
public interface ComponentConfigurationDeclarer<T 
extends ComponentConfigurationDeclarer> {
    T addConfigurations(Map conf);
    T addConfiguration(String config, Object value);
    T setDebug(boolean debug);
    T setMaxTaskParallelism(Number val);
    T setMaxSpoutPending(Number val);
    T setNumTasks(Number val);
}
 
Storm配置值的优先顺序为:
defaults.yaml < storm.yaml < 特定拓扑的配置 < 内部特定组件的配置 < 外部特定组件的配置

-------------------------------------------------

public interface IComponent extends Serializable {
    void declareOutputFields(OutputFieldsDeclarer var1);

    Map<String, Object> getComponentConfiguration();
}


public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> conf = new HashMap<String, Object>();
        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
        return conf;
    }

--------------------------------------------------

又看了一下RollingTopWords,让我比较关注的地方是在RollingCountBolt类里,是通过方法TupleHelpers.isTickTuple(tuple)来判断是否应该发射当前窗口数据,但是判断的依据一开始让我很迷惑,居然是判断该tuple是否来源于“__system”的组件和“__tick”流。 
    
    作为对storm了解不多的人,我真的糊涂了,tuple不都是上游的spout发射来的吗,哪里冒出来源不同的tuple。 

    好吧,我就开始猜了,莫非有个隐藏的spout?或者RollingCountBolt自己给自己发什么特殊的tuple。 

    正毫无头绪时,奇迹出现了,我把鼠标移到Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS这个常量上时,出现了一行小提示: 

How often a tick tuple from the "__system" component and "__tick" stream should be sent to tasks. Meant to be used as a component-specific configuration. 

哦,在方法getComponentConfiguration() 里 

        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); 

这句话告诉系统,需要按照emitFrequencyInSeconds的频率,产生来源于“__system”的组件和“__tick”流的tuple给task。 

     好了,这就是RollingTopWords中,定时产生特殊tuple的方法,对于我这种新手,算是又有点进步了。

------------------------------------------------

滑动窗口在监控和统计应用的场景比较广泛,比如每隔一段时间(10s)统计最近30s的请求量或者异常次数,根据请求或者异常次数采取相应措施;这里说一下滑动窗口在storm中实现的原理。参见下图:

窗口大小为30s,每10s就统计一次,那么窗口一共有3个slot,可以对窗口建立长度为3的数组;在storm的blot中在10s内通过execute(tuple)功能不停的把接收的tuple进行count个数(假如内置变量为tuple_count),每10s会自动触发滑动窗口的移动工作(Array[slot3]=》Array[slot2],Array[slot2]=》Array[slot1]),并存储当前tuple_count值到Array[slot3],随之可以进行统计窗口的数据了。

那么如何每10s进行自动触发,storm有一个TickTuple可以满足这个要求,

"__system"component会定时往task发送"__tick"stream的tuple

发送频率由TOPOLOGY_TICK_TUPLE_FREQ_SECS来配置,可以在default.ymal里面配置

也可以在代码里面通过getComponentConfiguration()来进行配置,

publicMap<String,Object>getComponentConfiguration(){

Map<String,Object>conf=newHashMap<String,Object>();

conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,emitFrequencyInSeconds);

returnconf;

配置完成后,storm就会定期的往task发送ticktuple

只需要通过isTickTuple来判断是否为tickTuple,就可以完成定时触发的功能

publicstaticbooleanisTickTuple(Tupletuple){

returntuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)\\SYSTEM_COMPONENT_ID=="__system"

&&tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);\\SYSTEM_TICK_STREAM_ID=="__tick"

}

相关推荐