zookeeper 分布式任务

网上说了一大堆关于zookeeper选举,参差不齐,貌似没有啥代码可以copy,把自己也给绕进去了,这里结合一些网上的看法,自己写了代码,关于zookeeper的。

思路大概是这样:

1、每个节点进来查询是否有任务发布路径,如果没有,创建任务发布路径,自己成为领导;这个过程加锁,所以集群只有一个领导。

2、每个应用都可以发布任务。把命令写在发布路径的data里;所有应用监听到发布路径修改,执行任务类型;每个任务监听到任务后,会失去监听,那么再监听回任务发布路径,实现无限监听;

3、所有节点监听到领导挂了,再选举一个节点做为领导,具有容灾性质;

zookeeper要结合分布式内存存储工具(像memcached、redis、MQ工具等),能发挥很强大的分布式计算功能。有些很强大的工具都用到zookeeper了,像hadoop、storm、阿里dubbo等等。

这里基zookeeper的选举,没有那么复杂,抛砖引玉了

2014-04-27:经过一个项目的熏陶,对zookeeper有了一定的理解,Paxos是zookeeper的核心算法,我们是在zookeeper基础上做开发,算法理解就行了,但别给算法给绑死了,没有哪个程序要求一定要实现什么算法,必须知道zookeeper的作用是协调分布式应用。

/**

*

*@authorlyq

*

*/

publicclassZkAppCenterimplementsWatcher{

//privatestaticZkAppInfoinfo=null;

publicstaticfinalStringappGroup="/APP";

privateStringhosts=BaseCode.getValue("ZK_HOST");

privateIntegerSESSION_TIMEOUT=Integer.parseInt(BaseCode.getValue("ZK_SESSION_TIMEOUT"));

StringtaskPath=appGroup+"_leader";

publicCountDownLatchlockLatch=null;

privateZooKeeperzk=null;

protectedstaticfinalLoggerlog=Logger.getLogger(ZkAppCenter.class);

privateStringappName=null;

//publicstaticZkAppInfogetInstance(){

//if(info==null){

//info=newZkAppInfo();

//}

//returninfo;

//}

privateZooKeepergetZK()throwsException{

if(zk==null){

zk=newZooKeeper(hosts,SESSION_TIMEOUT,this);

}

returnzk;

}

/**

*

*@return

*@throwsException

*/

publicStringregisterApp()throwsException{

if(getZK().exists(appGroup,false)==null){

getZK().create(appGroup,appGroup.getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);

}

ZkLocklock=newZkLock(appGroup+"lock",this.getZK()){//创建应用构建必须是有序的,锁在我上篇文字

@Override

publicObjectdoAction(){

try{

Stringaddr=InetAddress.getLocalHost().getHostAddress();

appName=zk.create(appGroup+"/"+addr+"-",newbyte[0],Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);

if(zk.exists(taskPath,false)==null){

zk.create(taskPath,"leader".getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);//创建任务发布路径

}

zk.getData(taskPath,true,null);

//listenChain();

//listenLeader();//监听leader节点

returnnull;

}catch(Exceptione){

thrownewRuntimeException(e);

}//每个应用注册进来必须监听leader创建的leader路径

}

};

lockLatch=lock.latch;

lock.execute();

returnappName;

}

//发布应用任务时,只需要修改taskPath,因为所有应用都有watchtaskPath,所以所有应用都执行了该任务;(任务集合可以放在集群内存队列里面,让每个应用去拿)

publicvoiddoTask(StringtaskName)throwsException{

ZkTasktaskBean=ZkTaskFactory.getInstance().getTask(taskName);

Objectobj=taskBean.execute();

if(obj!=null){

this.getZK().setData(taskPath,taskName.getBytes(),-1);

}else{

this.getZK().setData(taskPath,taskName+":end",-1);//结束命令

}

}

publicIntegergetAppsSize()throwsException{

List<String>list=this.getZK().getChildren(appGroup,false);

IntegerappsSize=list==null?0:list.size();

if(appsSize==0){

log.error("集群失败,没有'"+appGroup+"'该应用群组里。");

}

returnappsSize;

}

publicstaticvoidmain(String[]args)throwsException{

try{

finalCountDownLatchlatchtask=newCountDownLatch(10);

for(inti=0;i<10;i++){

finalintia=i;

newThread(){

publicvoidrun(){

ZkAppCenterreg=newZkAppCenter();

try{

reg.registerApp();

latchtask.await();

Thread.sleep(4000);

if(ia==9){

List<ZkTaskBean>list=newArrayList();

for(inti=0;i<8000;i++){

list.add(newDoloadTaskBean());

}

newZkPlan(BaseCode.RD_DOLOAD_TASKS,list,reg.getAppsSize(),1000);

reg.doTask("TestTask");

}

}catch(Exceptione){

e.printStackTrace();

}

}

}.start();

latchtask.countDown();

}

Thread.sleep(Long.MAX_VALUE);

}finally{

}

}

//2014-04-27lyq这里有个问题,如果process执行太久,会导致其他任务发布不了,估计是加锁了,所以process方法中执行任务时,最好开启一个子线程:

newThread(){

publicvoidrun(){

try{

doTask(task);

}catch(Exceptione){

e.printStackTrace();

}

}

}.start();

这样各种类型任务互不干扰,项目实战经验啊。其实想过为每个任务建zookeeper连接,但是zookeeper默认就10个连接,建太多连接还要抽象出代码去同意管理连接,因为连接是长连接,如果不抽象这块连接的程序,那么当程序挂了要费很大的劲去为每个连接建立容灾的程序;还是单个连接的路线吧。还有一些容灾的方法,跟本博客redis做法是一样的:如果失去连接,让所有任务进入等待,关闭zookeeper,重新在zookeeper注册应用。再让zookeeper任务继续执行,这里在getZk()这个方法放入监听锁~~

多线程和分布式的应用,可以充分体验锁的魅力。

zookeeper真心不错~~~

@Override

publicvoidprocess(WatchedEventevent){

try{

StringeventPath=event.getPath();

if(eventPath!=null&&eventPath.contains("lock")){

this.lockLatch.countDown();//锁

}elseif(event.getType()==EventType.None&&event.getState()==KeeperState.SyncConnected){

Stringaddr=InetAddress.getLocalHost().getHostAddress();

log.info(addr+"register......");

}elseif(event.getType()==EventType.NodeDeleted){

log.info(eventPath+"节点删除..........");

if(eventPath.equals(this.taskPath)){

this.registerApp();//监听到父节点删除,那么重新触发注册,防止领导节点挂掉、

}

}else{

Stringtask=newString(this.getZK().getData(eventPath,true,null));//每次监听到任务,继续监听

doTask(task);

//}

this.log.info("this.appName:"+this.appName);

//

}

}catch(Exceptione){

thrownewRuntimeException(e);

}

}

}

相关推荐