ZookeeperNetflix Curator 使用

curator简介

Netflix curator 是Netflix公司开源的一个Zookeeper client library,用于简化zookeeper客户端编程,包含一下几个模块:

  • curator-client - zookeeper client封装,用于取代原生的zookeeper客户端,提供一些非常有用的客户端特性
  • curator-framework - zookeeper api的高层封装,大大简化zookeeper客户端编程,添加了例如zookeeper连接管理、重试机制等
  • curator-recipes - zookeeper recipes 基于curator-framework的实现(除2PC以外)

maven dependency:

              String path = "/test_path";  
CuratorFramework client = CuratorFrameworkFactory.builder()  
        .connectString("test:2181").namespace("/test1")  
        .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))  
        .connectionTimeoutMs(5000).build();  
//create a node  
client.create().forPath("/head", new byte[0]);  
  
//delete a node in background  
client.delete().inBackground().forPath("/head");  
  
// create a EPHEMERAL_SEQUENTIAL  
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);  
  
// get the data   
client.getData().watched().inBackground().forPath("/test");  
  
// check the path exits  
client.checkExists().forPath(path);  


curator framework使用builder模式和类似nio的chain api,代码非常简洁

curator recipes 使用

InterProcessMutex

用途:进程间互斥锁

示例代码:

String lockName = "/lock1";  
InterProcessLock lock1 = new InterProcessMutex(this.curator, lockName);  
InterProcessLock lock2 = new InterProcessMutex(this.curator, lockName);  
lock1.acquire();  
boolean result = lock2.acquire(1, TimeUnit.SECONDS);  
assertFalse(result);  
lock1.release();  
result = lock2.acquire(1, TimeUnit.SECONDS);  
assertTrue(result);  

原理:每次调用acquire在/lock1节点节点下使用CreateMode.EPHEMERAL_SEQUENTIAL 创建新的ephemeral节点,然后getChildren获取所有的children,判断刚刚创建的临时节点是否为第一个,如果是,则获取锁成功;如果不是,则删除刚刚创建的临时节点。

注意: 每次accquire操作,成功,则请求zk server 2次(一次写,一次getChildren);如果失败,则请求zk server 3次(一次写,一次getChildren,一次delete)

InterProcessReadWriteLock

示例代码:

@Test  
public void testReadWriteLock() throws Exception{  
    String readWriteLockPath = "/RWLock";  
    InterProcessReadWriteLock readWriteLock1 = new InterProcessReadWriteLock(this.curator, readWriteLockPath);  
    InterProcessMutex writeLock1 = readWriteLock1.writeLock();  
    InterProcessMutex readLock1 = readWriteLock1.readLock();  
      
    InterProcessReadWriteLock readWriteLock2 = new InterProcessReadWriteLock(this.curator, readWriteLockPath);  
    InterProcessMutex writeLock2 = readWriteLock2.writeLock();  
    InterProcessMutex readLock2 = readWriteLock2.readLock();  
    writeLock1.acquire();  
      
    // same with WriteLock, can read  
    assertTrue(readLock1.acquire(1, TimeUnit.SECONDS));  
      
    // different lock, can't read while writting  
    assertFalse(readLock2.acquire(1, TimeUnit.SECONDS));  
      
    // different write lock, can't write  
    assertFalse(writeLock2.acquire(1, TimeUnit.SECONDS));  
      
    // release the write lock  
    writeLock1.release();  
      
    //both read lock can read  
    assertTrue(readLock1.acquire(1, TimeUnit.SECONDS));  
    assertTrue(readLock2.acquire(1, TimeUnit.SECONDS));  
}  

 

 
原理: 同InterProcessMutext,在ephemeral node的排序算法上做trick,write lock的排序在前。
注意: 同一个InterProcessReadWriteLock如果已经获取了write lock,则获取read lock也会成功
 
 


LeaderSelector
示例代码:
 



[java] view plain copy
 



@Test  
public void testLeader() throws Exception{  
    LeaderSelectorListener listener = new LeaderSelectorListener(){  
  
  
        @Override  
        public void takeLeadership(CuratorFramework client)  
                throws Exception {  
            System.out.println("i'm leader");  
        }  
  
        @Override  
        public void handleException(CuratorFramework client,  
                Exception exception) {  
              
        }  
  
        @Override  
        public void notifyClientClosing(CuratorFramework client) {  
              
        }};  
    String leaderPath = "/leader";  
    LeaderSelector selector1 = new LeaderSelector(this.curator, leaderPath, listener);  
    selector1.start();  
    LeaderSelector selector2 = new LeaderSelector(this.curator, leaderPath, listener);  
    selector2.start();  
    assertFalse(selector2.hasLeadership());  
}  


原理:内部基于InterProcessMutex实现,具体细节参见shared lock一节

总结

    curator还提供了很多其他的实现,具体参见https://github.com/Netflix/curator/wiki/Recipes