百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

分布式锁用Redis好?还是Zookeeper好?

liuian 2024-12-25 14:00 69 浏览

提到锁大家肯定有了解,像 Synchronized、ReentrantLock,在单进程情况下,多个线程访问同一资源,可以用它们来保证线程的安全性。

不过目前互联网项目越来越多的项目采用集群部署,也就是分布式情况,这两种锁就有些不够用了。

来两张图举例说明下,本地锁的情况下:


分布式锁情况下:


就其思想来说,就是一种“我全都要”的思想,所有服务都到一个统一的地方来取锁,只有取到锁的才能继续执行下去。


说完思想,下面来说一下具体的实现。

Redis 实现

为实现分布式锁,在 Redis 中存在 SETNX key value 命令,意为 set if not exists(如果不存在该 key,才去 set 值),就比如说是张三去上厕所,看厕所门锁着,他就不进去了,厕所门开着他才去。


可以看到,第一次 set 返回了 1,表示成功,但是第二次返回 0,表示 set 失败,因为已经存在这个 key 了。

当然只靠 setnx 这个命令可以吗?当然是不行的,试想一种情况,张三在厕所里,但他在里面一直没有释放,一直在里面蹲着,那外面人想去厕所全部都去不了,都想锤死他了。

Redis 同理,假设已经进行了加锁,但是因为宕机或者出现异常未释放锁,就造成了所谓的“死锁”。


聪明的你们肯定早都想到了,为它设置过期时间不就好了,可以 SETEX key seconds value 命令,为指定 key 设置过期时间,单位为秒。

但这样又有另一个问题,我刚加锁成功,还没设置过期时间,Redis 宕机了不就又死锁了,所以说要保证原子性吖,要么一起成功,要么一起失败。

当然我们能想到的 Redis 肯定早都为你实现好了,在 Redis 2.8 的版本后,Redis 就为我们提供了一条组合命令 SET key value ex seconds nx,加锁的同时设置过期时间。


就好比是公司规定每人最多只能在厕所呆 2 分钟,不管释放没释放完都得出来,这样就解决了“死锁”问题。

但这样就没有问题了吗?怎么可能。

试想又一种情况,厕所门肯定只能从里面开啊,张三上完厕所后张四进去锁上门,但是外面人以为还是张三在里面,而且已经过了 3 分钟了,就直接把门给撬开了,一看里面却是张四,这就很尴尬啊。

换成 Redis 就是说比如一个业务执行时间很长,锁已经自己过期了,别人已经设置了新的锁,但是当业务执行完之后直接释放锁,就有可能是删除了别人加的锁,这不是乱套了吗。

所以在加锁时候,要设一个随机值,在删除锁时进行比对,如果是自己的锁,才删除。

多说无益,烦人,直接上代码:

//基于jedis和lua脚本来实现 
privatestaticfinal String LOCK_SUCCESS = "OK"; 
privatestaticfinal Long RELEASE_SUCCESS = 1L; 
privatestaticfinal String SET_IF_NOT_EXIST = "NX"; 
privatestaticfinal String SET_WITH_EXPIRE_TIME = "PX"; 
 
@Override 
public String acquire() { 
    try { 
        // 获取锁的超时时间,超过这个时间则放弃获取锁 
        long end = System.currentTimeMillis() + acquireTimeout; 
        // 随机生成一个 value 
        String requireToken = UUID.randomUUID().toString(); 
        while (System.currentTimeMillis() < end) { 
            String result = jedis 
                .set(lockKey, requireToken, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); 
            if (LOCK_SUCCESS.equals(result)) { 
                return requireToken; 
            } 
            try { 
                Thread.sleep(100); 
            } catch (InterruptedException e) { 
                Thread.currentThread().interrupt(); 
            } 
        } 
    } catch (Exception e) { 
        log.error("acquire lock due to error", e); 
    } 
 
    returnnull; 
} 
 
@Override 
public boolean release(String identify) { 
    if (identify == null) { 
        returnfalse; 
    } 
    //通过lua脚本进行比对删除操作,保证原子性 
    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; 
    Object result = new Object(); 
    try { 
        result = jedis.eval(script, Collections.singletonList(lockKey), 
            Collections.singletonList(identify)); 
        if (RELEASE_SUCCESS.equals(result)) { 
            log.info("release lock success, requestToken:{}", identify); 
            returntrue; 
        } 
    } catch (Exception e) { 
        log.error("release lock due to error", e); 
    } finally { 
        if (jedis != null) { 
            jedis.close(); 
        } 
    } 
 
    log.info("release lock failed, requestToken:{}, result:{}", identify, result); 
    returnfalse; 
} 

思考:加锁和释放锁的原子性可以用 lua 脚本来保证,那锁的自动续期改如何实现呢?

Redisson 实现

Redisson 顾名思义,Redis 的儿子,本质上还是 Redis 加锁,不过是对 Redis 做了很多封装,它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。

在引入 Redisson 的依赖后,就可以直接进行调用:

<dependency> 
    <groupId>org.redisson</groupId> 
    <artifactId>redisson</artifactId> 
    <version>3.13.4</version> 
</dependency> 

先来一段 Redisson 的加锁代码:

private void test() { 
    //分布式锁名  锁的粒度越细,性能越好 
    RLock lock = redissonClient.getLock("test_lock"); 
    lock.lock(); 
    try { 
        //具体业务...... 
    } finally { 
        lock.unlock(); 
    } 
} 

就是这么简单,使用方法 jdk 的 ReentrantLock 差不多,并且也支持 ReadWriteLock(读写锁)、Reentrant Lock(可重入锁)、Fair Lock(公平锁)、RedLock(红锁)等各种锁,详细可以参照redisson官方文档来查看。


那么 Redisson 到底有哪些优势呢?锁的自动续期(默认都是 30 秒),如果业务超长,运行期间会自动给锁续上新的 30s,不用担心业务执行时间超长而锁被自动删掉。

加锁的业务只要运行完成,就不会给当前续期,即便不手动解锁,锁默认在 30s 后删除,不会造成死锁问题。

前面也提到了锁的自动续期,我们来看看 Redisson 是如何来实现的。

先说明一下,这里主要讲的是 Redisson 中的 RLock,也就是可重入锁,有两种实现方法:

// 最常见的使用方法 
lock.lock(); 
 
// 加锁以后10秒钟自动解锁 
// 无需调用unlock方法手动解锁 
lock.lock(10, TimeUnit.SECONDS); 

而只有无参的方法是提供锁的自动续期操作的,内部使用的是“看门狗”机制,我们来看一看源码。


不管是空参还是带参方法,它们都调用的是同一个 lock 方法,未传参的话时间传了一个 -1,而带参的方法传过去的就是实际传入的时间。

继续点进 scheduleExpirationRenewal 方法:


点进 renewExpiration 方法:


总结一下,就是当我们指定锁过期时间,那么锁到时间就会自动释放。如果没有指定锁过期时间,就使用看门狗的默认时间 30s,只要占锁成功,就会启动一个定时任务,每隔 10s 给锁设置新的过期时间,时间为看门狗的默认时间,直到锁释放。

小结:虽然 lock() 有自动续锁机制,但是开发中还是推荐使用 lock(time,timeUnit),因为它省掉了整个续期带来的性能损,可以设置过期时间长一点,搭配 unlock()。

若业务执行完成,会手动释放锁,若是业务执行超时,那一般我们服务也都会设置业务超时时间,就直接报错了,报错后就会通过设置的过期时间来释放锁。

public void test() { 
    RLock lock = redissonClient.getLock("test_lock"); 
    lock.lock(30, TimeUnit.SECONDS); 
    try { 
        //.......具体业务 
    } finally { 
        //手动释放锁 
        lock.unlock(); 
    } 
} 

基于 Zookeeper 来实现分布式锁

很多小伙伴都知道在分布式系统中,可以用 ZK 来做注册中心,但其实在除了做祖册中心以外,用 ZK 来做分布式锁也是很常见的一种方案。

先来看一下 ZK 中是如何创建一个节点的?ZK 中存在 create [-s] [-e] path [data] 命令,-s 为创建有序节点,-e 创建临时节点。


这样就创建了一个父节点并为父节点创建了一个子节点,组合命令意为创建一个临时的有序节点。

而 ZK 中分布式锁主要就是靠创建临时的顺序节点来实现的。至于为什么要用顺序节点和为什么用临时节点不用持久节点?先考虑一下,下文将作出说明。

同时还有 ZK 中如何查看节点?ZK 中 ls [-w] path 为查看节点命令,-w 为添加一个 watch(监视器),/ 为查看根节点所有节点,可以看到我们刚才所创建的节点,同时如果是跟着指定节点名字的话为查看指定节点下的子节点。

后面的 00000000 为 ZK 为顺序节点增加的顺序。注册监听器也是 ZK 实现分布式锁中比较重要的一个东西。


下面来看一下 ZK 实现分布式锁的主要流程:

当第一个线程进来时会去父节点上创建一个临时的顺序节点。

第二个线程进来发现锁已经被持有了,就会为当前持有锁的节点注册一个 watcher 监听器。

第三个线程进来发现锁已经被持有了,因为是顺序节点的缘故,就会为上一个节点去创建一个 watcher 监听器。

当第一个线程释放锁后,删除节点,由它的下一个节点去占有锁。

看到这里,聪明的小伙伴们都已经看出来顺序节点的好处了。非顺序节点的话,每进来一个线程进来都会去持有锁的节点上注册一个监听器,容易引发“羊群效应”。


这么大一群羊一起向你飞奔而来,不管你顶不顶得住,反正 ZK 服务器是会增大宕机的风险。

而顺序节点的话就不会,顺序节点当发现已经有线程持有锁后,会向它的上一个节点注册一个监听器,这样当持有锁的节点释放后,也只有持有锁的下一个节点可以抢到锁,相当于是排好队来执行的,降低服务器宕机风险。

至于为什么使用临时节点,和 Redis 的过期时间一个道理,就算 ZK 服务器宕机,临时节点会随着服务器的宕机而消失,避免了死锁的情况。

下面来上一段代码的实现:

public class ZooKeeperDistributedLock implements Watcher { 
 
    private ZooKeeper zk; 
    private String locksRoot = "/locks"; 
    private String productId; 
    private String waitNode; 
    private String lockNode; 
    private CountDownLatch latch; 
    private CountDownLatch connectedLatch = new CountDownLatch(1); 
    private int sessionTimeout = 30000; 
 
    public ZooKeeperDistributedLock(String productId) { 
        this.productId = productId; 
        try { 
            String address = "192.168.189.131:2181,192.168.189.132:2181"; 
            zk = new ZooKeeper(address, sessionTimeout, this); 
            connectedLatch.await(); 
        } catch (IOException e) { 
            throw new LockException(e); 
        } catch (KeeperException e) { 
            throw new LockException(e); 
        } catch (InterruptedException e) { 
            throw new LockException(e); 
        } 
    } 
 
    public void process(WatchedEvent event) { 
        if (event.getState() == KeeperState.SyncConnected) { 
            connectedLatch.countDown(); 
            return; 
        } 
 
        if (this.latch != null) { 
            this.latch.countDown(); 
        } 
    } 
 
    public void acquireDistributedLock() { 
        try { 
            if (this.tryLock()) { 
                return; 
            } else { 
                waitForLock(waitNode, sessionTimeout); 
            } 
        } catch (KeeperException e) { 
            throw new LockException(e); 
        } catch (InterruptedException e) { 
            throw new LockException(e); 
        } 
    } 
    //获取锁 
    public boolean tryLock() { 
        try { 
        // 传入进去的locksRoot + “/” + productId 
        // 假设productId代表了一个商品id,比如说1 
        // locksRoot = locks 
        // /locks/10000000000,/locks/10000000001,/locks/10000000002 
        lockNode = zk.create(locksRoot + "/" + productId, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); 
 
        // 看看刚创建的节点是不是最小的节点 
        // locks:10000000000,10000000001,10000000002 
        List<String> locks = zk.getChildren(locksRoot, false); 
        Collections.sort(locks); 
 
        if(lockNode.equals(locksRoot+"/"+ locks.get(0))){ 
            //如果是最小的节点,则表示取得锁 
            return true; 
        } 
 
        //如果不是最小的节点,找到比自己小1的节点 
      int previousLockIndex = -1; 
            for(int i = 0; i < locks.size(); i++) { 
        if(lockNode.equals(locksRoot + “/” + locks.get(i))) { 
                    previousLockIndex = i - 1; 
            break; 
        } 
       } 
 
       this.waitNode = locks.get(previousLockIndex); 
        } catch (KeeperException e) { 
            throw new LockException(e); 
        } catch (InterruptedException e) { 
            throw new LockException(e); 
        } 
        return false; 
    } 
 
    private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException { 
        Stat stat = zk.exists(locksRoot + "/" + waitNode, true); 
        if (stat != null) { 
            this.latch = new CountDownLatch(1); 
            this.latch.await(waitTime, TimeUnit.MILLISECONDS); 
            this.latch = null; 
        } 
        return true; 
    } 
 
    //释放锁 
    public void unlock() { 
        try { 
            System.out.println("unlock " + lockNode); 
            zk.delete(lockNode, -1); 
            lockNode = null; 
            zk.close(); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } catch (KeeperException e) { 
            e.printStackTrace(); 
        } 
    } 
    //异常 
    public class LockException extends RuntimeException { 
        private static final long serialVersionUID = 1L; 
 
        public LockException(String e) { 
            super(e); 
        } 
 
        public LockException(Exception e) { 
            super(e); 
        } 
    } 
} 

总结

既然明白了 Redis 和 ZK 分别对分布式锁的实现,那么总该有所不同的吧。没错,我都帮大家整理好了:

实现方式的不同,Redis 实现为去插入一条占位数据,而 ZK 实现为去注册一个临时节点。

遇到宕机情况时,Redis 需要等到过期时间到了后自动释放锁,而 ZK 因为是临时节点,在宕机时候已经是删除了节点去释放锁。

Redis 在没抢占到锁的情况下一般会去自旋获取锁,比较浪费性能,而 ZK 是通过注册监听器的方式获取锁,性能而言优于 Redis。

不过具体要采用哪种实现方式,还是需要具体情况具体分析,结合项目引用的技术栈来落地实现。

作者:whynot_0

编辑:陶家龙

出处:juejin.im/post/6891571079702118407

相关推荐

驱动网卡(怎么从新驱动网卡)
驱动网卡(怎么从新驱动网卡)

网卡一般是指为电脑主机提供有线无线网络功能的适配器。而网卡驱动指的就是电脑连接识别这些网卡型号的桥梁。网卡只有打上了网卡驱动才能正常使用。并不是说所有的网卡一插到电脑上面就能进行数据传输了,他都需要里面芯片组的驱动文件才能支持他进行数据传输...

2026-01-30 00:37 liuian

win10更新助手装系统(微软win10更新助手)

1、点击首页“系统升级”的按钮,给出弹框,告诉用户需要上传IMEI码才能使用升级服务。同时给出同意和取消按钮。华为手机助手2、点击同意,则进入到“系统升级”功能华为手机助手华为手机助手3、在检测界面,...

windows11专业版密钥最新(windows11专业版激活码永久)

 Windows11专业版的正版密钥,我们是对windows的激活所必备的工具。该密钥我们可以通过微软商城或者通过计算机的硬件供应商去购买获得。获得了windows11专业版的正版密钥后,我...

手机删过的软件恢复(手机删除过的软件怎么恢复)
手机删过的软件恢复(手机删除过的软件怎么恢复)

操作步骤:1、首先,我们需要先打开手机。然后在许多图标中找到带有[文件管理]文本的图标,然后单击“文件管理”进入页面。2、进入页面后,我们将在顶部看到一行文本:手机,最新信息,文档,视频,图片,音乐,收藏,最后是我们正在寻找的[更多],单击...

2026-01-29 23:55 liuian

一键ghost手动备份系统步骤(一键ghost 备份)

  步骤1、首先把装有一键GHOST装系统的U盘插在电脑上,然后打开电脑马上按F2或DEL键入BIOS界面,然后就选择BOOT打USDHDD模式选择好,然后按F10键保存,电脑就会马上重启。  步骤...

怎么创建局域网(怎么创建局域网打游戏)

  1、购买路由器一台。进入路由器把dhcp功能打开  2、购买一台交换机。从路由器lan端口拉出一条网线查到交换机的任意一个端口上。  3、两台以上电脑。从交换机任意端口拉出网线插到电脑上(电脑设置...

精灵驱动器官方下载(精灵驱动手机版下载)

是的。驱动精灵是一款集驱动管理和硬件检测于一体的、专业级的驱动管理和维护工具。驱动精灵为用户提供驱动备份、恢复、安装、删除、在线更新等实用功能。1、全新驱动精灵2012引擎,大幅提升硬件和驱动辨识能力...

一键还原系统步骤(一键还原系统有哪些)

1、首先需要下载安装一下Windows一键还原程序,在安装程序窗口中,点击“下一步”,弹出“用户许可协议”窗口,选择“我同意该许可协议的条款”,并点击“下一步”。  2、在弹出的“准备安装”窗口中,可...

电脑加速器哪个好(电脑加速器哪款好)

我认为pp加速器最好用,飞速土豆太懒,急速酷六根本不工作。pp加速器什么网页都加速,太任劳任怨了!以上是个人观点,具体性能请自己试。ps:我家电脑性能很好。迅游加速盒子是可以加速电脑的。因为有过之...

任何u盘都可以做启动盘吗(u盘必须做成启动盘才能装系统吗)

是的,需要注意,U盘的大小要在4G以上,最好是8G以上,因为启动盘里面需要装系统,内存小的话,不能用来安装系统。内存卡或者U盘或者移动硬盘都可以用来做启动盘安装系统。普通的U盘就可以,不过最好U盘...

u盘怎么恢复文件(u盘文件恢复的方法)

开360安全卫士,点击上面的“功能大全”。点击文件恢复然后点击“数据”下的“文件恢复”功能。选择驱动接着选择需要恢复的驱动,选择接入的U盘。点击开始扫描选好就点击中间的“开始扫描”,开始扫描U盘数据。...

系统虚拟内存太低怎么办(系统虚拟内存占用过高什么原因)

1.检查系统虚拟内存使用情况,如果发现有大量的空闲内存,可以尝试释放一些不必要的进程,以释放内存空间。2.如果系统虚拟内存使用率较高,可以尝试增加系统虚拟内存的大小,以便更多的应用程序可以使用更多...

剪贴板权限设置方法(剪贴板访问权限)
剪贴板权限设置方法(剪贴板访问权限)

1、首先打开iphone手机,触碰并按住单词或图像直到显示选择选项。2、其次,然后选取“拷贝”或“剪贴板”。3、勾选需要的“权限”,最后选择开启,即可完成苹果剪贴板权限设置。仅参考1.打开苹果手机设置按钮,点击【通用】。2.点击【键盘】,再...

2026-01-29 21:37 liuian

平板系统重装大师(平板重装win系统)

如果你的平板开不了机,但可以连接上电脑,那就能好办,楼主下载安装个平板刷机王到你的个人电脑上,然后连接你的平板,平板刷机王会自动识别你的平板,平板刷机王上有你平板的我刷机包,楼主点击下载一个,下载完成...

联想官网售后服务网点(联想官网售后服务热线)

联想3c服务中心是联想旗下的官方售后,是基于互联网O2O模式开发的全新服务平台。可以为终端用户提供多品牌手机、电脑以及其他3C类产品的维修、保养和保险服务。根据客户需求层次,联想服务针对个人及家庭客户...