网站的虚拟主机到期,百度域名多少钱,网站建设质量保证金,wordpress oss 插件为什么会存在分布式锁#xff1f;
经典场景-扣库存#xff0c;多人去同时购买一件商品#xff0c;首先会查询判断是否有剩余#xff0c;如果有进行购买并扣减库存#xff0c;没有提示库存不足。假如现在仅存有一件商品#xff0c;3人同时购买#xff0c;三个线程同时执…为什么会存在分布式锁
经典场景-扣库存多人去同时购买一件商品首先会查询判断是否有剩余如果有进行购买并扣减库存没有提示库存不足。假如现在仅存有一件商品3人同时购买三个线程同时执行方法第一人通过了库存0的校验此时线程阻塞第二人通过校验后抢先购买此时库存已经为0第一人继续执行扣减库存就为-1也就是【超卖】。
首先会想到加锁的方式解决使用synchronized或者ReentrantLock解决但如果多个服务器同时对一个共享资源操作时多个服务器的内存是不共享的加锁只能锁住当前服务器的进程。但在分布式系统下例如使用nginx负载均衡请求会发送到不同的tomcat容器。 1、基于关系型数据库
1lock表 基于主键或者唯一索引一个线程尝试获取锁时往表中插入一条数据插入成功则表示获取锁成功获取锁依赖于数据库架构简单但是一旦出现宕机等问题锁就无法释放需要设置一个定时任务清理数据。并且获取不到锁的线程需要一直等待轮询查询什么时候可获取锁。
2悲观版本
锁数据提前初始化抢锁时使用select ...... for update; 数据库的悲观锁可以自动阻塞其他等待锁的线程实现锁等待的功能如果持有锁的节点宕机数据库事务会自动回滚锁自然释放。
这种锁可能存在两个问题
第一个是MySQL可能会对查询做优化对于小表可能采用表锁代替行锁表中不同的锁之间就变成串行互斥的关系
第二个是使用悲观锁需要开启事务需要一直占用数据库的连接如果锁过多则对数据库造成压力。 2、基于Redis
1setnxuuidfinally
setnx命令只有key不存在时才能添加成功达到加锁的目的 void reduceStock() {//uidString uuid UUID.randomUUID().toString();// 获取锁///Boolean lock redisTemplate.opsForValue().setIfAbsent(lock, uuid, 1);// 设置锁过期时间//redisTemplate.expire(lock,30, TimeUnit.SECONDS);// 解决原子性问题boolean result redisTemplate.opsForValue().setIfAbsent(lock, uuid, 10, TimeUnit.SECONDS);if (lock) {try{int num stockMapper.selectNum();if(num 0) {//扣减库存stockMapper.reduceStock();}else{log.info(库存不足);}} finally {//校验是否是当前线程的锁String lockValue redisTemplate.opsForValue().get(lock);if(lockValue.equals(uuid)) {redisTemplate.delete(lock); } }} else {log.info(未获取锁);}
} 在线程A获取锁之后如果出现宕机或者代码报错异常锁不会释放需要在finally中释放锁。仍存在一个问题如果在执行释放锁逻辑的时候服务器宕机释放锁失败在重启服务器后加锁的数据又被恢复就出现死锁的问题。需要再redis中添加锁的过期时间。那么又会出现在获取锁和设置时间之间如果存在宕机问题时间会失败也就是不满足原子性。通过setnx可以同时满足。Lua脚本后续阐述此时又会出现在高并发的情况下如果锁设置30s线程A获取到锁但方法执行了35s线程A的锁已经过期线程B重新获取锁并开始执行方法执行到5s时线程A执行finally释放锁导致线程A将线程B的锁释放。可以设置一个uuid表示当前线程的锁。 2redisson void reduceStock() {RLock lock redissonClient.getLock(lock);if (lock.tryLock()) {try {int stockNum stockMapper.selectNum();if (stockNum 0) {stockMapper.reduceStock();} else {log.info(库存不足);}} finally {lock.unlock();}} else {log.info(未获取锁);}
} 使用通过getLock方法获取到RLock对象tryLock或lock()来进行加锁(Lua脚本)底层reids的key是锁的名称这个key对应的值是一个HASH结构HASH的key是持有锁的客户端ID线程IDvalue初始化为1表示锁的重入次数当其他线程去获取锁时使用redis提的的subcribe特性等待redis的key的变动方式轮询造成的系统消耗资源。看门狗当前持有锁的线程在redission客户端初始化一个watch dog线程定时刷新key的过期时间默认是30s监听主线程是否还在执行如果还在执行通过LUA脚本每10s给锁续期30秒。这个值可以config自定义。【注意】加锁时如果使用 tryLock(long t1,long t2, TimeUnit unit) 或 lock(long t1,long t2, TimeUnit unit) 第二个参数不是-1则看门狗无法生效 LUA脚本为什么能保证原子性呢
因为redis是单线程的当redis执行lua脚本时lua脚本将一系列操作封装成一个命令redis会把lua作为一个整体任务加入到一个队列单线程执行任务会按照队列的顺序依次执行在执行时lua是不会被其他线程请求打断的从而保证原子性。
优点
减少网络开销一次请求和接受一次响应直接在redis上执行无需解析和转换
缺点
新的语言需要单独存储和管理需要维护占用redis资源和时间 源码解析
首先找到这个方法的实现 private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId Thread.currentThread().getId();//【核心方法】尝试去获取锁返回null时表示获取成功Long ttl tryAcquire(-1, leaseTime, unit, threadId);if (ttl null) {return;}//通过线程id去订阅锁CompletableFutureRedissonLockEntry future subscribe(threadId);pubSub.timeout(future);RedissonLockEntry entry;if (interruptibly) {entry commandExecutor.getInterrupted(future);} else {entry commandExecutor.get(future);}try {//锁自旋while (true) {//尝试获取锁获取成功后break跳出循环ttl tryAcquire(-1, leaseTime, unit, threadId);if (ttl null) {break;}//获取锁成功并且获取到锁的线程需要等待一段时间ttlif (ttl 0) {try {entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}//如果发生中断根据 interruptibly 参数判断是否重新尝试获取许可entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {//获取锁成功但获取到锁的线程无需等待可以直接执行后续操作if (interruptibly) {entry.getLatch().acquire();} else {entry.getLatch().acquireUninterruptibly();}}}} finally {//取消订阅unsubscribe(entry, threadId);}
// get(lockAsync(leaseTime, unit));}首先看核心代码 Lua脚本分析 刷新时间 这里为什么要使用ConCurrentHashMap呢
锁的过期时间刷新任务需要在不同的节点之间共享确保在一个节点续约锁的时候其他节点也能够知道锁的状态让不同的节点能够访问和更新相同的数据结构保证一致性。其中putIfAbsent 也是原子的防止并发问题。
看门狗 (3) RedLock
redission解决了锁续期的问题但是在redis集群中主从复制是有延时性的。例如当线程A获取锁成功后主节点保存了锁信息当主节点还未同步到从节点锁信息时主节点宕机从节点切换为主节点后线程的锁就丢失了。
而使用RedLock解决这个问题就是认为每台redis都是独立的主节点在加锁时会记录开始加速的时间以及加锁成功后的时间。
例如5台redis服务器
客户端获取当前毫秒级的时间戳设置超时时间ttl5向5个redis服务发起请求保证全局唯一的value请求key锁如果存在3个一半以上那么就是获取锁成功否则失败如果失败或者超过ttl超过5则向所有的redis服务发出解锁请求获取锁失败后在 随机时间后重试获取锁同时重试需要限制次数随机时间是防止过多的客户端尝试去获取但孩子有一台能获取到导致大批出现失败的问题
【注意】向redis服务建立网络连接时要设置一个超时时间避免redis服务宕机客户端仍在等待官方建议5-50毫秒之间。 如果所有节点同时宕机怎么办参考
延迟重启reids同步到磁盘方式默认1次/s在redis崩溃后等待ttl之后再重启。
ttl时间后全部锁都过期不会对现有的锁造成影响但在ttl时间内是宕机状态影响性能和使用。 4基于zookeeper
后续学到继续更新