巧用Redis實現(xiàn)分布式鎖詳細介紹
無論是synchronized還是Lock,都運行在線程級別上,必須運行在同一個JVM中。如果競爭資源的進程不在同一個JVM中時,這樣線程鎖就無法起到作用,必須使用分布式鎖來控制多個進程對資源的訪問。
分布式鎖的實現(xiàn)一般有三種方式,使用MySql數(shù)據(jù)庫行鎖,基于Redis的分布式鎖,以及基于Zookeeper的分布式鎖。本文中我們重點看一下Redis如何實現(xiàn)分布式鎖。
首先,看一下用于實現(xiàn)分布式鎖的兩個Redis基礎命令:
setnx key value
這里的setnx,是"set if Not eXists"的縮寫,表示當指定的key值不存在時,為key設定值為value。如果key存在,則設定失敗。
setex key timeout value
setex命令為指定的key設置值及其過期時間(以秒為單位)。如果key已經(jīng)存在,setex命令將會替換舊的值。
基于這兩個指令,我們能夠實現(xiàn):
使用setnx 命令,保證同一時刻只有一個線程能夠獲取到鎖使用setex 命令,保證鎖會超期釋放,從而不因一個線程長期占有一個鎖而導致死鎖。
這里將兩個命令結合在一起使用的原因是,在正常情況下,如果只使用setnx 命令,使用完成后使用delete命令刪除鎖進行釋放,不存在什么問題。但是如果獲取分布式鎖的線程在運行中掛掉了,那么鎖將不被釋放。如果使用setex 設置了過期時間,即使線程掛掉,也可以自動進行鎖的釋放。
手寫Redis分布式鎖
接下來,我們基于Redis+Spring手寫實現(xiàn)一個分布式鎖。首先配置Jedis連接池:
@Configuration
public class Config {
@Bean
public JedisPool jedisPool(){
JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(100);
jedisPoolConfig.setMinIdle(1);
jedisPoolConfig.setMaxWaitMillis(2000);
jedisPoolConfig.setTestOnBorrow(true);
jedisPoolConfig.setTestOnReturn(true);
JedisPool jedisPool=new JedisPool(jedisPoolConfig,"127.0.0.1",6379);
return jedisPool;
}
}
實現(xiàn)RedisLock分布式鎖:
public class RedisLock implements Lock {
@Autowired
JedisPool jedisPool;
private static final String key = "lock";
private ThreadLocal<String> threadLocal = new ThreadLocal<>();
@Override
public void lock() {
boolean b = tryLock();
if (b) {
return;
}
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (Exception e) {
e.printStackTrace();
}
lock();//遞歸調用
}
@Override
public boolean tryLock() {
SetParams setParams = new SetParams();
setParams.ex(10);
setParams.nx();
String s = UUID.randomUUID().toString();
Jedis resource = jedisPool.getResource();
String lock = resource.set(key, s, setParams);
resource.close();
if ("OK".equals(lock)) {
threadLocal.set(s);
return true;
}
return false;
}
//解鎖判斷鎖是不是自己加的
@Override
public void unlock(){
//調用lua腳本解鎖
String script="if redis.call(\"get\",KEYS[1]==ARGV[1] then\n"+
" return redis.call(\"del\",KEYS[1])\n"+
"else\n"+
" return 0\n"+
"end";
Jedis resource = jedisPool.getResource();
Object eval=resource.eval(script, Arrays.asList(key),Arrays.asList(threadLocal.get()));
if (Integer.valueOf(eval.toString())==0){
resource.close();
throw new RuntimeException("解鎖失敗");
}
/*
*不寫成下面這種也是因為不是原子操作,和ex、nx相同
String s = resource.get(key);
if (threadLocal.get().equals(s)){
resource.del(key);
}
*/
resource.close();
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
簡單對上面代碼中需要注意的地方做一解釋:
加鎖過程中,使用SetParams 同時設置nx和ex的值,保證原子操作通過ThreadLocal保存key對應的value,通過value來判斷鎖是否當前線程自己加的,避免線程錯亂解鎖釋放鎖的過程中,使用lua腳本進行刪除,保證Redis在執(zhí)行此腳本時不執(zhí)行其他操作,從而保證操作的原子性
但是,這段手寫的代碼可能會存在一個問題,就是不能保證業(yè)務邏輯一定能被執(zhí)行完成,因為設置了鎖的過期時間可能導致過期。
Redisson
基于上面存在的問題,我們可以使用Redisson分布式可重入鎖。Redisson內(nèi)部提供了一個監(jiān)控鎖的看門狗,它的作用是在Redisson實例被關閉前,不斷的延長鎖的有效期。
引入依賴:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.10.7</version>
</dependency>
配置RedissonClient,然后我們對常用方法進行測試。
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
Config config=new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redissonClient= Redisson.create(config);
return redissonClient;
}
}
lock()
先寫一個測試接口:
@GetMapping("/lock")
public String test() {
RLock lock = redissonClient.getLock("lock");
lock.lock();
System.out.println(Thread.currentThread().getName()+" get redisson lock");
try {
System.out.println("do something");
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
System.out.println(Thread.currentThread().getName()+ " release lock");
return "locked";
}
進行測試,同時發(fā)送兩個請求,redisson鎖生效:

lock(long leaseTime, TimeUnit unit)
Redisson可以給lock()方法提供leaseTime參數(shù)來指定加鎖的時間,超過這個時間后鎖可以自動釋放。測試接口:
@GetMapping("/lock2")
public String test2() {
RLock lock = redissonClient.getLock("lock");
lock.lock(10,TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName()+" get redisson lock");
try {
System.out.println("do something");
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+ " release lock");
return "locked";
}
運行結果:

可以看出,在第一個線程還沒有執(zhí)行完成時,就釋放了redisson鎖,第二個線程進入后,兩個線程可以同時執(zhí)行被鎖住的代碼邏輯。這樣可以實現(xiàn)無需調用unlock方法手動解鎖。
tryLock(long waitTime, long leaseTime, TimeUnit unit)
tryLock方法會嘗試加鎖,最多等待waitTime秒,上鎖以后過leaseTime秒自動解鎖;如果沒有等待時間,鎖不住直接返回false。
@GetMapping("/lock3")
public String test3() {
RLock lock = redissonClient.getLock("lock");
try {
boolean res = lock.tryLock(5, 30, TimeUnit.SECONDS);
if (res){
try{
System.out.println(Thread.currentThread().getName()+" 獲取到鎖,返回true");
System.out.println("do something");
TimeUnit.SECONDS.sleep(20);
}finally {
lock.unlock();
System.out.println(Thread.currentThread().getName()+" 釋放鎖");
}
}else {
System.out.println(Thread.currentThread().getName()+" 未獲取到鎖,返回false");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return "lock";
}
運行結果:

可見在第一個線程獲得鎖后,第二個線程超過等待時間仍未獲得鎖,返回false放棄獲得鎖的過程。
除了以上單機Redisson鎖以外,還支持我們之前提到過的哨兵模式和集群模式,只需要改變Config的配置即可。以集群模式為例:
@Bean
public RedissonClient redissonClient(){
Config config=new Config();
config.useClusterServers().addNodeAddress("redis://172.20.5.170:7000")
.addNodeAddress("redis://172.20.5.170:7001")
.addNodeAddress("redis://172.20.5.170:7002")
.addNodeAddress("redis://172.20.5.170:7003")
.addNodeAddress("redis://172.20.5.170:7004")
.addNodeAddress("redis://172.20.5.170:7005");
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
RedLock紅鎖
下面介紹一下Redisson紅鎖RedissonRedLock,該對象也可以用來將多個RLock對象關聯(lián)為一個紅鎖,每個RLock對象實例可以來自于不同的Redisson實例。
RedissonRedLock針對的多個Redis節(jié)點,這多個節(jié)點可以是集群,也可以不是集群。當我們使用RedissonRedLock時,只要在大部分節(jié)點上加鎖成功就算成功??匆幌率褂茫?/p>
@GetMapping("/testRedLock")
public void testRedLock() {
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://172.20.5.170:6379");
RedissonClient redissonClient1 = Redisson.create(config1);
Config config2 = new Config();
config2.useSingleServer().setAddress("redis://172.20.5.170:6380");
RedissonClient redissonClient2 = Redisson.create(config2);
Config config3 = new Config();
config3.useSingleServer().setAddress("redis://172.20.5.170:6381");
RedissonClient redissonClient3 = Redisson.create(config3);
String resourceName = "REDLOCK";
RLock lock1 = redissonClient1.getLock(resourceName);
RLock lock2 = redissonClient2.getLock(resourceName);
RLock lock3 = redissonClient3.getLock(resourceName);
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLock;
try {
isLock = redLock.tryLock(5, 30, TimeUnit.SECONDS);
if (isLock) {
System.out.println("do something");
TimeUnit.SECONDS.sleep(20);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
redLock.unlock();
}
}
相對于單Redis節(jié)點來說,RedissonRedLock的優(yōu)點在于防止了單節(jié)點故障造成整個服務停止運行的情況;并且在多節(jié)點中鎖的設計,及多節(jié)點同時崩潰等各種意外情況有自己獨特的設計方法。使用RedissonRedLock,性能方面會比單節(jié)點Redis分布式鎖差一些,但可用性比普通鎖高很多。
總結
到此這篇關于巧用Redis實現(xiàn)分布式鎖詳細介紹的文章就介紹到這了,更多相關Redis分布式鎖內(nèi)容請搜索本站以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持本站!
版權聲明:本站文章來源標注為YINGSOO的內(nèi)容版權均為本站所有,歡迎引用、轉載,請保持原文完整并注明來源及原文鏈接。禁止復制或仿造本網(wǎng)站,禁止在非maisonbaluchon.cn所屬的服務器上建立鏡像,否則將依法追究法律責任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學習參考,不代表本站立場,如有內(nèi)容涉嫌侵權,請聯(lián)系alex-e#qq.com處理。
關注官方微信