分享

zookeeper下的分布式锁

 CevenCheng 2012-05-13

zookeeper是hadoop下面的一个子项目, 用来进行分布式系统之间的相互协调。

在zookeeper源码包的recipe目录下有一个互斥锁lock的实现范例,笔者对其简要包装,以便看起来更为明了:

Java代码  收藏代码
  1. package org.apache.zookeeper.recipes.lock;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.zookeeper.KeeperException;  
  6. import org.apache.zookeeper.Watcher;  
  7. import org.apache.zookeeper.ZooKeeper;  
  8.   
  9. public class DistributedLock {  
  10.   
  11.     private WriteLock lock;  
  12.     private String lockPath = "/lock";  
  13.     private ZooKeeper zooKeeper ;  
  14.   
  15.     public DistributedLock(ZooKeeper zooKeeper){  
  16.         this.zooKeeper = zooKeeper;  
  17.     }  
  18.   
  19.     /** 
  20.      * 获得锁 
  21.      *  
  22.      * Author:  chenkangxian 
  23.      * 
  24.      * Last Modification Time: 2012-4-6 
  25.      * 
  26.      * @return 获得锁是否成功 
  27.      */  
  28.     public boolean lock(){  
  29.         lock = new WriteLock(zooKeeper, lockPath, null);  
  30.         try {  
  31.             while (true) {  
  32.                 if (lock.lock()) {  
  33.                     return true;  
  34.                 }  
  35.   
  36.             }  
  37.         } catch (KeeperException e) {  
  38.             e.printStackTrace();  
  39.             return false;  
  40.         } catch (InterruptedException e) {  
  41.             e.printStackTrace();  
  42.             return false;  
  43.         }  
  44.   
  45.     }  
  46.   
  47.     /** 
  48.      * 解锁 
  49.      *  
  50.      * Author:  chenkangxian 
  51.      * 
  52.      * Last Modification Time: 2012-4-6 
  53.      * 
  54.      */  
  55.     public void unlock(){  
  56.         lock.unlock();  
  57.     }  
  58.       
  59.     public static void main(String args[]){  
  60.   
  61.         try {  
  62.             Watcher wh=new Watcher(){  
  63.                 public void process(org.apache.zookeeper.WatchedEvent event)  
  64.                 {  
  65.                     System.out.println(event.toString());  
  66.                 }  
  67.             };  
  68.   
  69.             ZooKeeper zooKeeper = new ZooKeeper("localhost:2181"20000, wh);  
  70.             final DistributedLock distributedLock = new DistributedLock(zooKeeper);  
  71.               
  72.             for(int i = 0; i < 100 ; i ++){  
  73.                 Thread thread = new Thread(new Runnable(){  
  74.                       
  75.                     @Override  
  76.                     public void run() {  
  77.                         if(distributedLock.lock()){  
  78.                             System.out.println("获得锁---------------");  
  79.                               
  80.                         }  
  81.   
  82.                         distributedLock.unlock();  
  83.                           
  84.                     }  
  85.                       
  86.                 });  
  87.             }  
  88.   
  89.             Thread.sleep(2000*1000);  
  90.   
  91.         } catch (IOException e) {  
  92.             e.printStackTrace();  
  93.         } catch (InterruptedException e) {  
  94.             e.printStackTrace();  
  95.         }  
  96.   
  97.     }  
  98. }  

 

WirteLock实现:

 

Java代码  收藏代码
  1. package org.apache.zookeeper.recipes.lock;  
  2.   
  3. import org.apache.log4j.Logger;  
  4. import org.apache.zookeeper.KeeperException;  
  5. import org.apache.zookeeper.WatchedEvent;  
  6. import org.apache.zookeeper.Watcher;  
  7. import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;  
  8. import org.apache.zookeeper.ZooKeeper;  
  9. import org.apache.zookeeper.data.ACL;  
  10. import org.apache.zookeeper.data.Stat;  
  11.   
  12. import java.util.List;  
  13. import java.util.SortedSet;  
  14. import java.util.TreeSet;  
  15.   
  16.   
  17.   
  18. public class WriteLock extends ProtocolSupport {  
  19.     private static final Logger LOG = Logger.getLogger(WriteLock.class);  
  20.   
  21.     private final String dir;  
  22.     private String id;  
  23.     private ZNodeName idName;  
  24.     private String ownerId;  
  25.     private String lastChildId;  
  26.     private byte[] data = {0x120x34};  
  27.     private LockListener callback;  
  28.     private LockZooKeeperOperation zop;  
  29.       
  30.     /** 
  31.      * zookeeper contructor for writelock 
  32.      * @param zookeeper zookeeper client instance 
  33.      * @param dir the parent path you want to use for locking 
  34.      * @param acls the acls that you want to use for all the paths,  
  35.      * if null world read/write is used. 
  36.      */  
  37.     public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl) {  
  38.         super(zookeeper);  
  39.         this.dir = dir;  
  40.         if (acl != null) {  
  41.             setAcl(acl);  
  42.         }  
  43.         this.zop = new LockZooKeeperOperation();  
  44.     }  
  45.       
  46.     /** 
  47.      * zookeeper contructor for writelock with callback 
  48.      * @param zookeeper the zookeeper client instance 
  49.      * @param dir the parent path you want to use for locking 
  50.      * @param acl the acls that you want to use for all the paths 
  51.      * @param callback the call back instance 
  52.      */  
  53.     public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl,   
  54.             LockListener callback) {  
  55.         this(zookeeper, dir, acl);  
  56.         this.callback = callback;  
  57.     }  
  58.   
  59.     /** 
  60.      * return the current locklistener 
  61.      * @return the locklistener 
  62.      */  
  63.     public LockListener getLockListener() {  
  64.         return this.callback;  
  65.     }  
  66.       
  67.     /** 
  68.      * register a different call back listener 
  69.      * @param callback the call back instance 
  70.      */  
  71.     public void setLockListener(LockListener callback) {  
  72.         this.callback = callback;  
  73.     }  
  74.   
  75.     /** 
  76.      * Removes the lock or associated znode if  
  77.      * you no longer require the lock. this also  
  78.      * removes your request in the queue for locking 
  79.      * in case you do not already hold the lock. 
  80.      * @throws RuntimeException throws a runtime exception 
  81.      * if it cannot connect to zookeeper. 
  82.      */  
  83.     public synchronized void unlock() throws RuntimeException {  
  84.           
  85.         if (!isClosed() && id != null) {  
  86.             // we don't need to retry this operation in the case of failure  
  87.             // as ZK will remove ephemeral files and we don't wanna hang  
  88.             // this process when closing if we cannot reconnect to ZK  
  89.             try {  
  90.                   
  91.                 ZooKeeperOperation zopdel = new ZooKeeperOperation() {  
  92.                     public boolean execute() throws KeeperException,  
  93.                         InterruptedException {  
  94.                         zookeeper.delete(id, -1);     
  95.                         return Boolean.TRUE;  
  96.                     }  
  97.                 };  
  98.                 zopdel.execute();  
  99.             } catch (InterruptedException e) {  
  100.                 LOG.warn("Caught: " + e, e);  
  101.                 //set that we have been interrupted.  
  102.                Thread.currentThread().interrupt();  
  103.             } catch (KeeperException.NoNodeException e) {  
  104.                 // do nothing  
  105.             } catch (KeeperException e) {  
  106.                 LOG.warn("Caught: " + e, e);  
  107.                 throw (RuntimeException) new RuntimeException(e.getMessage()).  
  108.                     initCause(e);  
  109.             }  
  110.             finally {  
  111.                 if (callback != null) {  
  112.                     callback.lockReleased();  
  113.                 }  
  114.                 id = null;  
  115.             }  
  116.         }  
  117.     }  
  118.       
  119.     /**  
  120.      * the watcher called on   
  121.      * getting watch while watching  
  122.      * my predecessor 
  123.      */  
  124.     private class LockWatcher implements Watcher {  
  125.         public void process(WatchedEvent event) {  
  126.             // lets either become the leader or watch the new/updated node  
  127.             LOG.debug("Watcher fired on path: " + event.getPath() + " state: " +   
  128.                     event.getState() + " type " + event.getType());  
  129.             try {  
  130.                 lock();  
  131.             } catch (Exception e) {  
  132.                 LOG.warn("Failed to acquire lock: " + e, e);  
  133.             }  
  134.         }  
  135.     }  
  136.       
  137.     /** 
  138.      * a zoookeeper operation that is mainly responsible 
  139.      * for all the magic required for locking. 
  140.      */  
  141.     private  class LockZooKeeperOperation implements ZooKeeperOperation {  
  142.           
  143.         /** find if we have been created earler if not create our node 
  144.          *  
  145.          * @param prefix the prefix node 
  146.          * @param zookeeper teh zookeeper client 
  147.          * @param dir the dir paretn 
  148.          * @throws KeeperException 
  149.          * @throws InterruptedException 
  150.          */  
  151.         private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)   
  152.             throws KeeperException, InterruptedException {  
  153.             List<String> names = zookeeper.getChildren(dir, false);  
  154.             for (String name : names) {  
  155.                 if (name.startsWith(prefix)) {  
  156.                     id = name;  
  157.                     if (LOG.isDebugEnabled()) {  
  158.                         LOG.debug("Found id created last time: " + id);  
  159.                     }  
  160.                     break;  
  161.                 }  
  162.             }  
  163.             if (id == null) {  
  164.                 id = zookeeper.create(dir + "/" + prefix, data,   
  165.                         getAcl(), EPHEMERAL_SEQUENTIAL);  
  166.   
  167.                 if (LOG.isDebugEnabled()) {  
  168.                     LOG.debug("Created id: " + id);  
  169.                 }  
  170.             }  
  171.   
  172.         }  
  173.           
  174.         /** 
  175.          * the command that is run and retried for actually  
  176.          * obtaining the lock 
  177.          * @return if the command was successful or not 
  178.          */  
  179.         public boolean execute() throws KeeperException, InterruptedException {  
  180.             do {  
  181.                 if (id == null) {  
  182.                     long sessionId = zookeeper.getSessionId();  
  183.                     String prefix = "x-" + sessionId + "-";  
  184.                     // lets try look up the current ID if we failed   
  185.                     // in the middle of creating the znode  
  186.                     findPrefixInChildren(prefix, zookeeper, dir);  
  187.                     idName = new ZNodeName(id);  
  188.                 }  
  189.                 if (id != null) {  
  190.                     List<String> names = zookeeper.getChildren(dir, false);  
  191.                     if (names.isEmpty()) {  
  192.                         LOG.warn("No children in: " + dir + " when we've just " +  
  193.                         "created one! Lets recreate it...");  
  194.                         // lets force the recreation of the id  
  195.                         id = null;  
  196.                     } else {  
  197.                         // lets sort them explicitly (though they do seem to come back in order ususally :)  
  198.                         SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>();  
  199.                         for (String name : names) {  
  200.                             sortedNames.add(new ZNodeName(dir + "/" + name));  
  201.                         }  
  202.                         ownerId = sortedNames.first().getName();  
  203.                         SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);  
  204.                         if (!lessThanMe.isEmpty()) {  
  205.                             ZNodeName lastChildName = lessThanMe.last();  
  206.                             lastChildId = lastChildName.getName();  
  207.                             if (LOG.isDebugEnabled()) {  
  208.                                 LOG.debug("watching less than me node: " + lastChildId);  
  209.                             }  
  210.                             Stat stat = zookeeper.exists(lastChildId, new LockWatcher());  
  211.                             if (stat != null) {  
  212.                                 return Boolean.FALSE;  
  213.                             } else {  
  214.                                 LOG.warn("Could not find the" +  
  215.                                         " stats for less than me: " + lastChildName.getName());  
  216.                             }  
  217.                         } else {  
  218.                             if (isOwner()) {  
  219.                                 if (callback != null) {  
  220.                                     callback.lockAcquired();  
  221.                                 }  
  222.                                 return Boolean.TRUE;  
  223.                             }  
  224.                         }  
  225.                     }  
  226.                 }  
  227.             }  
  228.             while (id == null);  
  229.             return Boolean.FALSE;  
  230.         }  
  231.     };  
  232.   
  233.     /** 
  234.      * Attempts to acquire the exclusive write lock returning whether or not it was 
  235.      * acquired. Note that the exclusive lock may be acquired some time later after 
  236.      * this method has been invoked due to the current lock owner going away. 
  237.      */  
  238.     public synchronized boolean lock() throws KeeperException, InterruptedException {  
  239.         if (isClosed()) {  
  240.             return false;  
  241.         }  
  242.         ensurePathExists(dir);  
  243.   
  244.         return (Boolean) retryOperation(zop);  
  245.     }  
  246.   
  247.     /** 
  248.      * return the parent dir for lock 
  249.      * @return the parent dir used for locks. 
  250.      */  
  251.     public String getDir() {  
  252.         return dir;  
  253.     }  
  254.   
  255.     /** 
  256.      * Returns true if this node is the owner of the 
  257.      *  lock (or the leader) 
  258.      */  
  259.     public boolean isOwner() {  
  260.         return id != null && ownerId != null && id.equals(ownerId);  
  261.     }  
  262.   
  263.     /** 
  264.      * return the id for this lock 
  265.      * @return the id for this lock 
  266.      */  
  267.     public String getId() {  
  268.        return this.id;  
  269.     }  
  270. }  
 

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多