分布式锁
分布式锁
分布式锁是控制分布式系统或不同系统之间共同访问共享资源的一种锁实现。如果不同的系统或同一个系统的不同主机之间共享了某个资源时,往往通过互斥来防止彼此干扰。
分布锁设计目的
可以保证在分布式部署的应用集群中,同一个方法在同一操作只能被一台机器上的一个线程执行。
锁的设计要求
- 这把锁要是一把可重入锁(避免死锁)
- 这把锁有高可用的获取锁和释放锁功能
- 这把锁获取锁和释放锁的性能要好…
分布式锁实现方案分析
- 获取锁的时候,使用 setnx(SETNX key val:当且仅当 key 不存在时,set 一个 key 为 val 的字符串,返回 1; 若 key 存在,则什么都不做,返回 【0】加锁,锁的 value 值为当前占有锁服务器内网 IP 编号拼接任务标识组成。在释放锁的时候进行判断。并使用 expire 命令为锁添 加一个超时时间,超过该时间则自动释放锁。
- 获取锁的时候调用 setnx, 如果返回 0,则该锁正在被被人使用,返回 1,则成功获取锁。还设置一个获取的超时时间,若超过这个时间则放弃获取锁。 setex (key,value,expire) 过期以秒为单位。
- 释放锁的时候,判断是不是该锁(即 Value 为当前服务器内网 IP 编号拼接任务标识),若是该锁,则执行 delete 进行锁释放
Redis 分布式锁应用
启动类添加注解@EnableScheduling
@EnableScheduling
服务器 log 日志打印
引入 logback-spring.xml
application.properties 加入路径引用
logging.config=classpath:logback-spring.xml
logging.path=/data/java/weblog/8082
编写任务计划类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.UnknownHostException;
import java.util.Enumeration;
@Service
public class LockNxExJob {
private static final Logger logger = LoggerFactory.getLogger(LockNxExJob.class);
@Autowired
private RedisService redisService;
@Autowired
private RedisTemplate redisTemplate;
private static String LOCK_PREFIX = "prefix_";
// 秒 分 时 日 月 年
@Scheduled(cron = "0/10 * * * * *")
public void lockJob() {
String lock = LOCK_PREFIX + "LockNxExJob";
boolean nxRet = false;
try{
//redistemplate setnx操作 返回true代表获取锁成功,false表示获取锁失败。
nxRet = redisTemplate.opsForValue().setIfAbsent(lock,getHostIp());
Object lockValue = redisService.get(lock);
//如果获取锁失败:打印出当前占用锁的服务器IP地址
if(!nxRet){
String value = (String)redisService.get(lock);
//日志记录:打印当前占用锁的服务器IP
logger.info("get lock fail,lock belong to:{}",value);
return;
}else{
//如果获取锁成功,则设置超时时间
redisTemplate.opsForValue().set(lock,getHostIp(),3600);
//日志记录:获取锁成功
logger.info("start lock lockNxExJob success");
//模拟执行业务消耗的时间
Thread.sleep(5000);
}
}catch (Exception e){
logger.error("lock error",e);
}finally {
//如果获取锁成功,则一定会释放锁
if(nxRet){
logger.info("release lock success");
redisService.remove(lock);
}
}
}
/**
* 获取本机内网IP地址方法
* @return
*/
private static String getHostIp(){
try{
Enumeration<NetworkInterface> allNetInterfaces = NetworkInterface.getNetworkInterfaces();
while (allNetInterfaces.hasMoreElements()){
NetworkInterface netInterface = (NetworkInterface) allNetInterfaces.nextElement();
Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()){
InetAddress ip = (InetAddress) addresses.nextElement();
if (ip != null
&& ip instanceof Inet4Address
&& !ip.isLoopbackAddress() //loopback地址即本机地址,IPv4的loopback范围是127.0.0.0 ~ 127.255.255.255
&& ip.getHostAddress().indexOf(":")==-1){
return ip.getHostAddress();
}
}
}
}catch(Exception e){
e.printStackTrace();
}
return null;
}
public static void main(String[] args) {
String localIP = "";
try {
localIP = getHostIp();
} catch (Exception e) {
e.printStackTrace();
}
//获取本机IP
System.out.println(localIP);
}
}
启动项目
nohup 的意思是忽略 SIGHUP 信号, 所以当运行 nohup a.jar 的时候, 关闭 shell, 那么 a.jar 进程还是存在的(对 SIGHUP 信号免疫)
nohup java -jar jar名 &
控制台输出
start lock lockNxExJob success
release lock success
start lock lockNxExJob success
release lock success
Redis 分布式锁可能出现的问题
- ServerDown 和 Redis Down 发生的时间随机性会分隔 Redis 客户端程序中的 setnx 和 setex 操作,导致其失去了原子性,从而引发被持有锁没有过期时间,分布式系统中其他线程一直处于尝试获取锁的状态。
解决方案一:采用 Lua 脚本
解决方案二:Redis 从 2.6 版本之后支持 setnx、setex 连用的原子性操作接口
Lua 脚本实现分布式锁的原子性
从 Redis 2.6.0 版本开始,通过内置的 Lua 解释器,可以使用 EVAL 命令对 Lua 脚本进行求值。
Redis 使用单个 Lua 解释器去运行所有脚本,并且, Redis 也保证脚本会以原子性(atomic)的方式执行:当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。这和使用 MULTI / EXEC 包围的事务很类似。在其他别的客户端看来,脚本的效果(effect)要么是不可见的(not visible),要么就是已完成的(already completed)。
Lua 脚本配置流程
1、在 resource 目录下面新增一个后缀名为。lua 结尾的文件
2、编写 lua 脚本
local lockKey = KEYS[1]
local lockValue = KEYS[2]
-- setnx info
local result_1 = redis.call('SETNX', lockKey, lockValue)
if result_1 == true
then
local result_2= redis.call('SETEX', lockKey,3600, lockValue)
return result_1
else
return result_1
end
3、传入 lua 脚本的 key 和 arg
4、调用 redisTemplate.execute 方法执行脚本
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
@Service
public class LuaDistributeLock {
private static final Logger logger = LoggerFactory.getLogger(LockNxExJob.class);
@Autowired
private RedisService redisService;
@Autowired
private RedisTemplate redisTemplate;
private static String LOCK_PREFIX = "lua_";
private DefaultRedisScript<Boolean> lockScript;
@Scheduled(cron = "0/10 * * * * *")
public void lockJob() {
String lock = LOCK_PREFIX + "LockNxExJob";
boolean luaRet = false;
try {
luaRet = luaExpress(lock,getHostIp());
//获取锁失败
if (!luaRet) {
String value = (String) redisService.genValue(lock);
//打印当前占用锁的服务器IP
logger.info("lua get lock fail,lock belong to:{}", value);
return;
} else { //获取锁成功
logger.info("lua start lock lockNxExJob success");
Thread.sleep(5000);
}
} catch (Exception e) {
logger.error("lock error", e);
} finally {
if (luaRet) {
logger.info("release lock success");
redisService.remove(lock);
}
}
}
/**
* 获取lua结果
* @param key
* @param value
* @return
*/
public Boolean luaExpress(String key,String value) {
//创建操作lua脚本的类
lockScript = new DefaultRedisScript<Boolean>();
lockScript.setScriptSource(
new ResourceScriptSource(new ClassPathResource("add.lua")));
lockScript.setResultType(Boolean.class);
// 封装参数
List<Object> keyList = new ArrayList<Object>();
keyList.add(key);
keyList.add(value);
Boolean result = (Boolean) redisTemplate.execute(lockScript, keyList);
return result;
}
/**
* 获取本机内网IP地址方法
*
* @return
*/
private static String getHostIp() {
try {
Enumeration<NetworkInterface> allNetInterfaces = NetworkInterface.getNetworkInterfaces();
while (allNetInterfaces.hasMoreElements()) {
NetworkInterface netInterface = (NetworkInterface) allNetInterfaces.nextElement();
Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress ip = (InetAddress) addresses.nextElement();
if (ip != null
&& ip instanceof Inet4Address
&& !ip.isLoopbackAddress() //loopback地址即本机地址,IPv4的loopback范围是127.0.0.0 ~ 127.255.255.255
&& ip.getHostAddress().indexOf(":") == -1) {
return ip.getHostAddress();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
控制台输出
lua start lock lockNxExJob success
release lock success
lua start lock lockNxExJob success
release lock success
RedisConnection 实现分布式锁
Redis2.6 版本以后才支持,采用 redisTemplate 操作 redisConnection 实现 setnx 和 setex 两个命令连用。
分布式锁优化分析之解锁注意事项
Maven 依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-core</artifactId>
<scope>test</scope>
<version>1.3.2</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-core</artifactId>
<scope>test</scope>
<version>1.3.2</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
</dependencies>
添加 unlock.lua
根据同一个 key 下的不同 value 进行判断,value 是持有锁进程的 ip 地址,这样每个进程只能自己释放自己的锁,无法释放不属于自己的锁,从而解决了 ServerA 执行时间超过了锁的过期时间,ServerA 的 finally 中解锁操作,误解锁 ServerB 的锁。
local lockKey = KEYS[1]
local lockValue = KEYS[2]
-- get key
local result_1 = redis.call('get', lockKey)
if result_1 == lockValue
then
local result_2 = redis.call('del', lockKey)
return result_2
else
return false
end
RedisConnection 实现 setNx 和 setEx 命令的原子性 + 细粒度解锁
@Component
public class JedisDistributedLock {
private final Logger logger = LoggerFactory.getLogger(JedisDistributedLock.class);
private static String LOCK_PREFIX = "JedisDistributedLock_";
private DefaultRedisScript<Boolean> lockScript;
@Resource
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private RedisService redisService;
public static final String UNLOCK_LUA;
static {
StringBuilder sb = new StringBuilder();
sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
sb.append("then ");
sb.append(" return redis.call(\"del\",KEYS[1]) ");
sb.append("else ");
sb.append(" return 0 ");
sb.append("end ");
UNLOCK_LUA = sb.toString();
}
@Scheduled(cron = "0/10 * * * * *")
public void lockJob() {
String lock = LOCK_PREFIX + "JedisNxExJob";
boolean lockRet = false;
try {
lockRet = this.setLock(lock, 600);
//获取锁失败
if (!lockRet) {
String value = (String) redisService.genValue(lock);
//打印当前占用锁的服务器IP
logger.info("jedisLockJob get lock fail,lock belong to:{}", value);
return;
} else {
//获取锁成功
logger.info("jedisLockJob start lock lockNxExJob success");
Thread.sleep(5000);
}
} catch (Exception e) {
logger.error("jedisLockJob lock error", e);
} finally {
if (lockRet) {
logger.info("jedisLockJob release lock success");
releaseLock(lock,getHostIp());
}
}
}
public boolean setLock(String key, long expire) {
try {
Boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
return connection.set(key.getBytes(), getHostIp().getBytes(), Expiration.seconds(expire) ,RedisStringCommands.SetOption.ifAbsent());
}
});
return result;
} catch (Exception e) {
logger.error("set redis occured an exception", e);
}
return false;
}
public String get(String key) {
try {
RedisCallback<String> callback = (connection) -> {
JedisCommands commands = (JedisCommands) connection.getNativeConnection();
return commands.get(key);
};
String result = redisTemplate.execute(callback);
return result;
} catch (Exception e) {
logger.error("get redis occured an exception", e);
}
return "";
}
/**
* 释放锁操作
* @param key
* @param value
* @return
*/
private boolean releaseLock(String key, String value) {
lockScript = new DefaultRedisScript<Boolean>();
lockScript.setScriptSource(
new ResourceScriptSource(new ClassPathResource("unlock.lua")));
lockScript.setResultType(Boolean.class);
// 封装参数
List<Object> keyList = new ArrayList<Object>();
keyList.add(key);
keyList.add(value);
Boolean result = (Boolean) redisTemplate.execute(lockScript, keyList);
return result;
}
/**
* 获取本机内网IP地址方法
*
* @return
*/
private static String getHostIp() {
try {
Enumeration<NetworkInterface> allNetInterfaces = NetworkInterface.getNetworkInterfaces();
while (allNetInterfaces.hasMoreElements()) {
NetworkInterface netInterface = (NetworkInterface) allNetInterfaces.nextElement();
Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress ip = (InetAddress) addresses.nextElement();
if (ip != null
&& ip instanceof Inet4Address
&& !ip.isLoopbackAddress() //loopback地址即本机地址,IPv4的loopback范围是127.0.0.0 ~ 127.255.255.255
&& ip.getHostAddress().indexOf(":") == -1) {
return ip.getHostAddress();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}