2019-01-09 · Develop

ZooKeeper 分布式锁

背景

在同一个 JVM 中,为了保证对一个资源的有序访问,如往文件中写入数据,可以使用 synchronized 或者 ReentrantLock 来实现对资源的互斥访问,如果两个程序在不同的 JVM 中,并且都要往同一个文件中写数据,如何保证互斥访问呢?这时就需要用到分布式锁了。

目前实现分布式的方式主要有如下几种方式

distributed-lock
(图片来至网络)

这里主要讲下使用 ZooKeeper 来实现的分布式可重入排他锁。

原理

原理是所有的客户端在 /locks 节点下创建临时序列节点并且会知道自己创建的节点名,然后获取 /locks 节点下的所有子节点并进行排序,如果自己是最小的节点,那么获取锁成功。其他的客户端监听 /locks 节点的子节点的变化事件,以便在释放锁的时候再次去争取锁。

释放锁的情况有两种:

上面这种实现方式比较简单,但是有个严重问题就是当监听的客户端很多时,在锁被释放时,会有大量的客户端争抢锁资源,造成性能浪费。这种现象称为“群惊现象”。其次是这种分布式算法的问题是不能随时查看多少客户端在等待这个锁,以及到底是哪个客户端获取到了这个锁。那么如何解决上面的问题呢?
改进的方式就是等待获取锁的客户端只监听比自己小的那个节点的删除事件这样如果有节点删除,只会影响监听它的下一个节点进行获取锁操作。获取锁的流程如下图所示

zookeeper-distributed-lock

Curator 代码实现

这篇文章里面我们学会了使用 Curator API 来操作 ZooKeeper 的节点,现在我们就可以使用 Curator 的 API 进行分布式可重入排他锁的实现。当然 Curator 提供的锁功能不只这一种。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.19.124:2181", retryPolicy);
InterProcessMutex lock = new InterProcessMutex(client, "/lock/lockName");
// 获取 10 分钟的锁 - 时间根据实际业务处理时间
try {
    if(lock.acquire(10, TimeUnit.MINUTES)) {
        try {
            // 即使是获取到了分布式锁,在实际业务中也需要检查数据是否已经被处理
            // TODO 处理业务逻辑
        } finally {
            // 释放锁
            lock.release();
        }
    }
} catch (Exception e) {
    // TODO 处理 ZooKeeper 异常
}

使用起来就是这么的方便。

可重入的源码实现

private boolean internalLock(long time, TimeUnit unit) throws Exception
{
    /*
        Note on concurrency: a given lockData instance
        can be only acted on by a single thread so locking isn't necessary
    */

    Thread currentThread = Thread.currentThread();

    LockData lockData = threadData.get(currentThread);
    if ( lockData != null )
    {
        // re-entering
        lockData.lockCount.incrementAndGet();
        return true;
    }

    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
        LockData newLockData = new LockData(currentThread, lockPath);
        threadData.put(currentThread, newLockData);
        return true;
    }

    return false;
}

上面的代码实现了所的可重入功能。每个 InterProcessMutex 实例都会持有一个 ConcurrentMap 类型的 threadData 对象。通过判断当前线程 threadData 是否有值判断是否可以重入该锁并将 lockCount 进行累加,如果没有则进行锁的抢夺。