1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
| package xxx;
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat;
import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit;
public class ZooKeeperDistributedLock implements Watcher {
private ZooKeeper zk; private String locksRoot = "/locks"; private String lockKey; private String waitNode; private String lockNode; private CountDownLatch latch; private CountDownLatch connectedLatch = new CountDownLatch(1); private int sessionTimeout = 30000;
public ZooKeeperDistributedLock(String zkAddress, String lockKey) { this.lockKey = lockKey; try { zk = new ZooKeeper(zkAddress, sessionTimeout, this); connectedLatch.await(); } catch (IOException | InterruptedException e) { throw new LockException(e); } }
@Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { connectedLatch.countDown(); return; }
if (this.latch != null) { this.latch.countDown(); } }
public void acquireDistributedLock() { try { if (this.tryLock()) { return; } else { waitForLock(waitNode, sessionTimeout); } } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } }
public boolean tryLock() { try { lockNode = zk.create(locksRoot + "/" + lockKey, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); List<String> locks = zk.getChildren(locksRoot, false); Collections.sort(locks); if(lockNode.equals(locksRoot+"/"+ locks.get(0))){ return true; } int previousLockIndex = -1; for(int i = 0; i < locks.size(); i++) { if(lockNode.equals(locksRoot + "/" + locks.get(i))) { previousLockIndex = i - 1; break; } } this.waitNode = locks.get(previousLockIndex); } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } return false; }
private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException { Stat stat = zk.exists(locksRoot + "/" + waitNode, true); if (stat != null) { this.latch = new CountDownLatch(1); this.latch.await(waitTime, TimeUnit.MILLISECONDS); this.latch = null; } return true; }
public void unlock() { try { System.out.println("unlock " + lockNode); zk.delete(lockNode, -1); lockNode = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } }
public class LockException extends RuntimeException { private static final long serialVersionUID = 1L;
public LockException(String e) { super(e); }
public LockException(Exception e) { super(e); } } }
|