分布式锁

什么是分布式锁

问题描述:随着业务发展,原单体单机部署的系统被演化为分布式集群后,由于分布式系统多线程、多进程且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力,为了解决这个问题,需要一种跨JVM的互斥锁来控制共享资源的访问,这就是分布式锁要解决的问题

与分布式锁相对的就是单体结构中的锁(单机锁),我们在写多线程程序时,避免同时操作一个共享变量产生问题,通常会使用一把锁来互斥以保证共享变量的准确,其使用范围是在同一个JVM进程中,如果换做是不同机器的JVM进程,需要同时操作一个共享资源,如何互斥呢,这时就需要引入分布式锁了

分布式锁应具备的条件

  • 在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行
  • 高可用的获取锁与释放锁
  • 高性能的获取锁与释放锁
  • 具备可重入特性
  • 具备锁失效机制,即自动解锁,防止死锁
  • 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败
    image-20250705160806941.png

分布式锁实现方案

  • 基于缓存(Redis)
  • 基于Zookeeper

未添加分布式锁存在的问题

实现秒杀下单扣减库存案例

@RestController
public class IndexController {
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    
    @RequestMapping("/deductStock")
    public String deductStock() {
        
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0) {
            stock -= 1; // 扣减库存
            stringRedisTemplate.opsForValue().set("stock", stock.toString());
            System.out.println("扣减库存成功,当前库存为:" + stock);
        }else {
            System.out.println("库存不足,扣减库存失败!");
        }
        return "end";
    }
    
}
​

测试部署在两台服务上,进行库存扣减测试,发现超卖

添加线程同步锁

@RestController
public class IndexController {
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    
    @RequestMapping("/deductStock")
    public String deductStock() {
        
        synchronized(this) {
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            if(stock > 0) {
                stock -= 1; // 扣减库存
                stringRedisTemplate.opsForValue().set("stock", stock.toString());
                System.out.println("扣减库存成功,当前库存为:" + stock);
            }else {
                System.out.println("库存不足,扣减库存失败!");
            }
        }
        return "end";
    }
    
}

模拟高并发场景进行压力测试,发现超卖问题依然存在

在分布式环境下,synchronized是不起作用的,因为一个synchronized只在一个tomcat的JVM进程内有效

在一个分布式系统下,如何解决呢

使用Redis的setnx命令

@RestController
public class IndexController {
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    
    @RequestMapping("/deductStock")
    public String deductStock() {
        
        Boolean bool = stringRedisTemplate.opsForValue().setIfAbsent("lock","1");
        if(!bool) {
            return "end";
        }
        synchronized(this) {
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            if(stock > 0) {
                stock -= 1; // 扣减库存
                stringRedisTemplate.opsForValue().set("stock", stock.toString());
                System.out.println("扣减库存成功,当前库存为:" + stock);
            }else {
                System.out.println("库存不足,扣减库存失败!");
            }
        }
        stringRedisTemplate.delete("lock");
        return "end";
    }
    
}

上述代码是否已经完善了呢

假设:运行到synchronized之后抛出异常,那么delete的语句将永远执行不到,则会阻塞后面所有线程的执行,这样就造成了死锁

使用try-finally

@RestController
public class IndexController {
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    
    @RequestMapping("/deductStock")
    public String deductStock() {
        
        try {
            Boolean bool = stringRedisTemplate.opsForValue().setIfAbsent("lock","1");
            if(!bool) {
                return "end";
            }
            synchronized(this) {
                int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
                if(stock > 0) {
                    stock -= 1; // 扣减库存
                    stringRedisTemplate.opsForValue().set("stock", stock.toString());
                    System.out.println("扣减库存成功,当前库存为:" + stock);
                }else {
                    System.out.println("库存不足,扣减库存失败!");
                }
            }
        } finally {
               stringRedisTemplate.delete("lock");
               return "end"; 
        }
    }
    
}

上述代码会有问题吗

如果执行到synchronized代码后,程序宕机,这样导致分布式锁也释放不了

解决方案:给分布式锁加上时间

添加分布式锁的过期时间

Boolean bool = stringRedisTemplate.opsForValue().setIfAbsent("lock","1", 10, TimeUnit.SECONDS);

线程ID作为分布式锁的value

@RestController
public class IndexController {
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    
    @RequestMapping("/deductStock")
    public String deductStock() {
    
        String value = UUID.randomUUID().toString();
        
        try {
            Boolean bool = stringRedisTemplate.opsForValue().setIfAbsent("lock",value , 10, TimeUnit.SECONDS);
            if(!bool) {
                return "end";
            }
            synchronized(this) {
                int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
                if(stock > 0) {
                    stock -= 1; // 扣减库存
                    stringRedisTemplate.opsForValue().set("stock", stock.toString());
                    System.out.println("扣减库存成功,当前库存为:" + stock);
                }else {
                    System.out.println("库存不足,扣减库存失败!");
                }
            }
        } finally {
               String str = stringRedisTemplate.opsForValue().get("locl");
               if(str != null) {
                   if(str.equals(value)) {
                       stringRedisTemplate.delete("lock");
                   }
               }
               return "end"; 
        }
    }
    
}

使用Redisson实现分布式锁

Redisson实现Redis分布式锁非常方便,它是一个基于Redis实现的,提供了一些分布式锁的实现

添加Redisson依赖

<dependancy>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.16.1</version>
</dependancy>

获取Redisson客户端实例

RedissonClient是Redisson的核心接口,用于创建连接Redis的客户端对象

可以使用Redisson的config对象来配置连接Redis的参数

@Bean
public Redisson redisson() {
    Config config = new Config();
    config.useSingleServer()
        .setAddress("redis://127.0.0.1:6379")
        .setPassword("123456");
    RedissonClient client = Redisson.create(config);
    return (Redisson)client;
}

获取分布式锁

@RestController
public class IndexController {
    
    @Resource
    private Redisson redisson;
    
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    
    @RequestMapping("/deductStock")
    public String deductStock() {
        
        RLock myLock = redisson.getLock("myLock"); // 分布式锁对象
        
        try {
            myLock.lock(); // 持有锁
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            if(stock > 0) {
                stock -= 1; // 扣减库存
                stringRedisTemplate.opsForValue().set("stock", stock.toString());
                System.out.println("扣减库存成功,当前库存为:" + stock);
            }else {
                System.out.println("库存不足,扣减库存失败!");
            }
        } finally {
            myLock.unlock();
             return "end";
        }
        
    }
    
}
​
​

通过Redisson,非常简单就可以实现

Redisson相关锁介绍

可重入锁

可重入锁指同一个线程可以多次获得同一把锁。这种类型的锁可以避免死锁发生,因为它允许同一线程在没有释放锁的情况下多次获得锁

可重入:利用hash结构记录线程id和重入次数,利用watchDog,每隔一段时间(releaseTime/3),重置超时时间

Redisson实现了一个分布式的可重入锁RLock,它支持自动续租,保证了锁的持有者在宕机或无响应时,锁会自动释放,它通过利用Redis的特性,有效地完成锁定和解锁操作

可重入锁的实际应用场景

可重入锁非常适合那些需要连续几次访问一个资源的场景

如:一个电商平台的订单创建过程中,从验证库存到最终下单可能需要多次进行资源锁定,可重入锁可以确保整个过程的同步执行

public class ReentrantLockExample {
    public static void main(String[] args) {
        Config config = new Config();
        config.useClusterServers()
            .addNodeAddress("redis://127.0.0.1:6379");
        RedissonClient redisson = Redisson.create(config);
        
        RLock lock = redisson.getLock("anyLock");
        try {
            // 支持自动续租
            lock.lock();
            // 处理业务逻辑
        } finally {
            lock.unlock();
        }
    }
}

联锁

联锁是一种同步机制,用于同时获取多个锁

在分布式系统中,当需要对多个独立资源进行操作时,联锁确保所有的资源都被锁定,以进行安全的原子操作

联锁的使用场景

联锁在需要执行跨多个资源的复合操作时非常有用,例如:在两个账户间进行资金转账时,需要同时锁定两个账户的资源

public class MultiLockExample {
    public static void main(String[] args) {
        Config config = new Config();
        config.useClusterServers()
            .addNodeAddress("redis://127.0.0.1:6379");
        RedissonClient redisson = Redisson.create(config);
        
        RLock lock1 = redisson.getLock("lock1");
        RLock lock2 = redisson.getLock("lock2");
        RLock lock3 = redisson.getLock("lock3");
        
        MultiLock multiLock = new MultiLock(lock1, lock2, lock3);
        try {
            // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
            boolean res = multiLock.tryLock(100, 10, TimeUnit.SECONDS);
            if(res) {
                // 处理跨多个资源的复合操作
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            multiLock.unlock();
        }
    }
}

RedLock

红锁时一种在多个独立的Redis节点上提供分布式锁的算法

旨在通过在多个节点上使用锁来提高容错性

如果单个节点宕机,使用RedLock算法的系统仍能保持正常运作

public class RedLockExample {
    public static void main(String[] args) {
        Config config1 = new Config();
        config1.useSingleServer().setAddress("reids://127.0.0.1:6379");
        RedissonClient redisson1 = Redisson.create(config1);
        
        Config config2 = new Config();
        config2.useSingleServer().setAddress("reids://127.0.0.1:6380");
        RedissonClient redisson2 = Redisson.create(config1);
        
        Config config3 = new Config();
        config3.useSingleServer().setAddress("reids://127.0.0.1:6381");
        RedissonClient redisson1 = Redisson.create(config1);
        
        RLock lock1 = redisson1.getLock("lock");
        RLock lock2 = redisson2.getLock("lock");
        RLock lock3 = redisson3.getLock("lock");
        
        RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
        
        try {
            // 尝试加锁,最多等待100秒,上锁后10秒自动解锁
            boolean res = redLock.tryLock(100, 10, TimeUnit.SECONDS);
            if(res) {
                // 执行任务
            }
        } catch(InterruptedException e) {
            e.printStackTrace();
        } finally {
            redLock.unlock();
        }
    }
}
最后修改:2025 年 07 月 07 日
如果觉得我的文章对你有用,请随意赞赏