MySQL, Oracle, Linux, 软件架构及大数据技术知识分享平台

网站首页 > 精选文章 / 正文

分布式锁的实现 分布式锁的实现原理

2024-12-19 13:19 huorong 精选文章 4 ℃ 0 评论

前言:

很多业务场景下都需要添加锁,尤其是在分布式系统中,确保数据一致性和防止并发问题至关重要。其中,Redis 作为一个高性能的键值存储,常常被用来实现分布式锁。使用 Redisson 这个开源库,可以非常方便地在我们的 Java 应用中实现分布式锁。讲下分布式锁实现。

使用的技术选型:

  • SpringBoot、EL表达式、Redisson。
  • 可能有的小伙伴对EL表达式不太熟悉,我简单介绍下,具体的大家可以到网上查阅。

EL 表达式在 Spring 中用于动态获取和操作对象的属性,通常用于 JSP 和 Thymeleaf 等视图模板引擎中。它的特点包括:

  • 简洁性:使用简洁的语法来访问对象属性和方法。
  • 动态性:允许在运行时动态评估表达式。
  • 安全性:通过表达式语言提供的安全特性,可以避免直接暴露 Java 对象。

设计思路及依赖

SpingBoot的AOP技术对指定的注解进行拦截,使用EL表达式实现动态获取方法参数,基于Redisson 锁的相关操作。

  • 依赖如下:
 <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.26.1</version>
</dependency>

注解定义

直接在方法上添加这个注解,就能实现分布式锁的功能了。

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {

    String prefix() default ""; // 使用 SpEL 表达式动态生成前缀

    String key();               // 使用 SpEL 表达式从方法参数中获取

    long leaseTime() default 10; // 锁的持有时间

    TimeUnit timeUnit() default TimeUnit.SECONDS; // 持有时间单位

    long waitTime();// 锁的等待时间
}
其中:
leaseTime 锁的持有时间
waitTime 锁的等待时间
timeUnit 时间单位
key 锁的key是什么
prefix key的前缀

切面

基于Redisson的分布式锁实现

@Aspect
@Component
@Slf4j
@ConditionalOnProperty(name = "cache.type", havingValue = "redis", matchIfMissing = true)
public class RedisLockAspect {

    @Autowired
    private RedissonClient redissonClient;

    @Around("@annotation(distributedLock)")
    public Object around(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) throws Throwable {
        // 获取 SpEL 表达式的 key 值
        // 解析 prefix 和 key
        String prefix = ELUtils.parseExpression(distributedLock.prefix(), joinPoint);
        String key = ELUtils.parseExpression(distributedLock.key(), joinPoint);
        String lockKey = prefix + key; // 组合成完整的锁 key
        long leaseTime = distributedLock.leaseTime();
        TimeUnit timeUnit = distributedLock.timeUnit();
        RLock lock = redissonClient.getLock(lockKey);

        log.info("start lock {}", lockKey);
        try {
            // 尝试获取锁
            if (lock.tryLock(distributedLock.waitTime(), leaseTime, timeUnit)) {
                // 获取到锁,执行方法
                return joinPoint.proceed();
            } else {
                // 未获取到锁,抛出异常或自定义处理逻辑
                throw new RuntimeException("Unable to acquire lock for key: " + key);
            }
        } finally {
            if (lock.isHeldByCurrentThread()) {
                log.info("end lock {}", lockKey);
                // 只有是当前线程才去释放锁
                lock.unlock();
            }
        }
    }
}

基于ReentrantLock单机版本

@Aspect
@Component
@Slf4j
@ConditionalOnProperty(name = "cache.type", havingValue = "local", matchIfMissing = true)
public class LocalLockAspect {

    private final ExpressionParser parser = new SpelExpressionParser();
    private final ConcurrentHashMap<String, ReentrantLock> lockMap = new ConcurrentHashMap<>();

    @Around("@annotation(distributedLock)")
    public Object around(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) throws Throwable {
        // 解析 prefix 和 key
        String prefix = ELUtils.parseExpression(distributedLock.prefix(), joinPoint);
        String key = ELUtils.parseExpression(distributedLock.key(), joinPoint);
        String lockKey = prefix + key; // 组合成完整的锁 key

        // 获取 leaseTime
        long leaseTime = distributedLock.leaseTime();
        TimeUnit timeUnit = distributedLock.timeUnit();

        // 从 map 中获取锁
        ReentrantLock lock = lockMap.computeIfAbsent(lockKey, k -> new ReentrantLock());
        boolean acquired = false;

        log.info("Attempting to lock {}", lockKey);
        try {
            // 尝试获取锁,设置等待时间
            acquired = lock.tryLock(leaseTime, timeUnit);
            if (acquired) {
                log.info("Lock acquired: {}", lockKey);
                // 获取到锁,执行方法
                return joinPoint.proceed();
            } else {
                log.warn("Unable to acquire lock for key: {}", lockKey);
                throw new RuntimeException("Unable to acquire lock for key: " + lockKey);
            }
        } finally {
            if (acquired) {
                lock.unlock();
                log.info("Lock released: {}", lockKey);
                // 锁释放后,移除锁对象,避免内存泄漏
                lockMap.remove(lockKey);
            }
        }
    }

}

工具类

public class ELUtils {

    private static final ExpressionParser parser = new SpelExpressionParser();

    public static String parseExpression(String expression, ProceedingJoinPoint joinPoint) {
        if (expression.contains("#") || expression.contains("T(")) {
            MethodSignature signature = (MethodSignature) joinPoint.getSignature();
            Method method = signature.getMethod();

            // 创建解析上下文并设置方法参数
            StandardEvaluationContext context = new StandardEvaluationContext();
            Object[] args = joinPoint.getArgs();
            String[] paramNames = signature.getParameterNames();
            for (int i = 0; i < paramNames.length; i++) {
                context.setVariable(paramNames[i], args[i]);
            }

            // 解析 SpEL 表达式
            return parser.parseExpression(expression).getValue(context, String.class);
        } else {
            // 如果是普通字符串,直接返回
            return expression;
        }
    }
}
}

可以设置锁的key前缀是 login_lock:,key 是 请求参数中的 phone 的值,等待时间是0s,锁的持续时间是10s。

@DistributedLock(prefix = "login_lock:",key = "#loginRequest.phone",waitTime = 0,leaseTime = 10)
 public BaseResponse userLogin(@RequestBody LoginRequest loginRequest) {}

总结

分布式锁是分布式系统中不可或缺的组成部分,合理选择和实现分布式锁可以有效提升系统的稳定性和一致性。在设计时需要充分考虑系统的特性、性能需求及潜在的复杂性。

Tags:thymeleaf语法

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言