分布式锁
什么是分布式锁
问题描述:随着业务发展,原单体单机部署的系统被演化为分布式集群后,由于分布式系统多线程、多进程且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力,为了解决这个问题,需要一种跨JVM的互斥锁来控制共享资源的访问,这就是分布式锁要解决的问题
与分布式锁相对的就是单体结构中的锁(单机锁),我们在写多线程程序时,避免同时操作一个共享变量产生问题,通常会使用一把锁来互斥以保证共享变量的准确,其使用范围是在同一个JVM进程中,如果换做是不同机器的JVM进程,需要同时操作一个共享资源,如何互斥呢,这时就需要引入分布式锁了
分布式锁应具备的条件
- 在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行
- 高可用的获取锁与释放锁
- 高性能的获取锁与释放锁
- 具备可重入特性
- 具备锁失效机制,即自动解锁,防止死锁
- 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败
分布式锁实现方案
- 基于缓存(Redis)
- 基于Zookeeper
未添加分布式锁存在的问题
实现秒杀下单扣减库存案例
@RestController
public class IndexController {
@Resource
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deductStock")
public String deductStock() {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if(stock > 0) {
stock -= 1; // 扣减库存
stringRedisTemplate.opsForValue().set("stock", stock.toString());
System.out.println("扣减库存成功,当前库存为:" + stock);
}else {
System.out.println("库存不足,扣减库存失败!");
}
return "end";
}
}
测试部署在两台服务上,进行库存扣减测试,发现超卖
添加线程同步锁
@RestController
public class IndexController {
@Resource
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deductStock")
public String deductStock() {
synchronized(this) {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if(stock > 0) {
stock -= 1; // 扣减库存
stringRedisTemplate.opsForValue().set("stock", stock.toString());
System.out.println("扣减库存成功,当前库存为:" + stock);
}else {
System.out.println("库存不足,扣减库存失败!");
}
}
return "end";
}
}
模拟高并发场景进行压力测试,发现超卖问题依然存在
在分布式环境下,synchronized是不起作用的,因为一个synchronized只在一个tomcat的JVM进程内有效
在一个分布式系统下,如何解决呢
使用Redis的setnx命令
@RestController
public class IndexController {
@Resource
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deductStock")
public String deductStock() {
Boolean bool = stringRedisTemplate.opsForValue().setIfAbsent("lock","1");
if(!bool) {
return "end";
}
synchronized(this) {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if(stock > 0) {
stock -= 1; // 扣减库存
stringRedisTemplate.opsForValue().set("stock", stock.toString());
System.out.println("扣减库存成功,当前库存为:" + stock);
}else {
System.out.println("库存不足,扣减库存失败!");
}
}
stringRedisTemplate.delete("lock");
return "end";
}
}
上述代码是否已经完善了呢
假设:运行到synchronized之后抛出异常,那么delete的语句将永远执行不到,则会阻塞后面所有线程的执行,这样就造成了死锁
使用try-finally
@RestController
public class IndexController {
@Resource
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deductStock")
public String deductStock() {
try {
Boolean bool = stringRedisTemplate.opsForValue().setIfAbsent("lock","1");
if(!bool) {
return "end";
}
synchronized(this) {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if(stock > 0) {
stock -= 1; // 扣减库存
stringRedisTemplate.opsForValue().set("stock", stock.toString());
System.out.println("扣减库存成功,当前库存为:" + stock);
}else {
System.out.println("库存不足,扣减库存失败!");
}
}
} finally {
stringRedisTemplate.delete("lock");
return "end";
}
}
}
上述代码会有问题吗
如果执行到synchronized代码后,程序宕机,这样导致分布式锁也释放不了
解决方案:给分布式锁加上时间
添加分布式锁的过期时间
Boolean bool = stringRedisTemplate.opsForValue().setIfAbsent("lock","1", 10, TimeUnit.SECONDS);
线程ID作为分布式锁的value
@RestController
public class IndexController {
@Resource
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deductStock")
public String deductStock() {
String value = UUID.randomUUID().toString();
try {
Boolean bool = stringRedisTemplate.opsForValue().setIfAbsent("lock",value , 10, TimeUnit.SECONDS);
if(!bool) {
return "end";
}
synchronized(this) {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if(stock > 0) {
stock -= 1; // 扣减库存
stringRedisTemplate.opsForValue().set("stock", stock.toString());
System.out.println("扣减库存成功,当前库存为:" + stock);
}else {
System.out.println("库存不足,扣减库存失败!");
}
}
} finally {
String str = stringRedisTemplate.opsForValue().get("locl");
if(str != null) {
if(str.equals(value)) {
stringRedisTemplate.delete("lock");
}
}
return "end";
}
}
}
使用Redisson实现分布式锁
Redisson实现Redis分布式锁非常方便,它是一个基于Redis实现的,提供了一些分布式锁的实现
添加Redisson依赖
<dependancy>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.1</version>
</dependancy>
获取Redisson客户端实例
RedissonClient是Redisson的核心接口,用于创建连接Redis的客户端对象
可以使用Redisson的config对象来配置连接Redis的参数
@Bean
public Redisson redisson() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456");
RedissonClient client = Redisson.create(config);
return (Redisson)client;
}
获取分布式锁
@RestController
public class IndexController {
@Resource
private Redisson redisson;
@Resource
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deductStock")
public String deductStock() {
RLock myLock = redisson.getLock("myLock"); // 分布式锁对象
try {
myLock.lock(); // 持有锁
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if(stock > 0) {
stock -= 1; // 扣减库存
stringRedisTemplate.opsForValue().set("stock", stock.toString());
System.out.println("扣减库存成功,当前库存为:" + stock);
}else {
System.out.println("库存不足,扣减库存失败!");
}
} finally {
myLock.unlock();
return "end";
}
}
}
通过Redisson,非常简单就可以实现
Redisson相关锁介绍
可重入锁
可重入锁指同一个线程可以多次获得同一把锁。这种类型的锁可以避免死锁发生,因为它允许同一线程在没有释放锁的情况下多次获得锁
可重入:利用hash结构记录线程id和重入次数,利用watchDog,每隔一段时间(releaseTime/3),重置超时时间
Redisson实现了一个分布式的可重入锁RLock,它支持自动续租,保证了锁的持有者在宕机或无响应时,锁会自动释放,它通过利用Redis的特性,有效地完成锁定和解锁操作
可重入锁的实际应用场景
可重入锁非常适合那些需要连续几次访问一个资源的场景
如:一个电商平台的订单创建过程中,从验证库存到最终下单可能需要多次进行资源锁定,可重入锁可以确保整个过程的同步执行
public class ReentrantLockExample {
public static void main(String[] args) {
Config config = new Config();
config.useClusterServers()
.addNodeAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
RLock lock = redisson.getLock("anyLock");
try {
// 支持自动续租
lock.lock();
// 处理业务逻辑
} finally {
lock.unlock();
}
}
}
联锁
联锁是一种同步机制,用于同时获取多个锁
在分布式系统中,当需要对多个独立资源进行操作时,联锁确保所有的资源都被锁定,以进行安全的原子操作
联锁的使用场景
联锁在需要执行跨多个资源的复合操作时非常有用,例如:在两个账户间进行资金转账时,需要同时锁定两个账户的资源
public class MultiLockExample {
public static void main(String[] args) {
Config config = new Config();
config.useClusterServers()
.addNodeAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
RLock lock1 = redisson.getLock("lock1");
RLock lock2 = redisson.getLock("lock2");
RLock lock3 = redisson.getLock("lock3");
MultiLock multiLock = new MultiLock(lock1, lock2, lock3);
try {
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = multiLock.tryLock(100, 10, TimeUnit.SECONDS);
if(res) {
// 处理跨多个资源的复合操作
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
multiLock.unlock();
}
}
}
RedLock
红锁时一种在多个独立的Redis节点上提供分布式锁的算法
旨在通过在多个节点上使用锁来提高容错性
如果单个节点宕机,使用RedLock算法的系统仍能保持正常运作
public class RedLockExample {
public static void main(String[] args) {
Config config1 = new Config();
config1.useSingleServer().setAddress("reids://127.0.0.1:6379");
RedissonClient redisson1 = Redisson.create(config1);
Config config2 = new Config();
config2.useSingleServer().setAddress("reids://127.0.0.1:6380");
RedissonClient redisson2 = Redisson.create(config1);
Config config3 = new Config();
config3.useSingleServer().setAddress("reids://127.0.0.1:6381");
RedissonClient redisson1 = Redisson.create(config1);
RLock lock1 = redisson1.getLock("lock");
RLock lock2 = redisson2.getLock("lock");
RLock lock3 = redisson3.getLock("lock");
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
try {
// 尝试加锁,最多等待100秒,上锁后10秒自动解锁
boolean res = redLock.tryLock(100, 10, TimeUnit.SECONDS);
if(res) {
// 执行任务
}
} catch(InterruptedException e) {
e.printStackTrace();
} finally {
redLock.unlock();
}
}
}