青岛企业网站开发,ag1042入口快速入口,自己做的网站 360不兼容,陕西煤化建设集团网站面试问题#xff1a;
你只要用缓存#xff0c;就可能会涉及到redis缓存与数据库双存储双写#xff0c;你只要是双写#xff0c;就一定会有数据一致性的问题#xff0c;那么你如何解决一致性问题#xff1f;双写一致性#xff0c;你先动缓存redis还是数据库mysql哪一个
你只要用缓存就可能会涉及到redis缓存与数据库双存储双写你只要是双写就一定会有数据一致性的问题那么你如何解决一致性问题双写一致性你先动缓存redis还是数据库mysql哪一个why?延时双删你做过吗会有哪些问题有这么一种情况微服务查询redis无数据mysql有数据为保证数据双写一致性回写redis你需要注意什么双检加锁策略你了解过吗如何尽量避免缓存击穿redis和mysql双写100%会出纰漏做不到强一致性你如何保证最终一致性 谈谈缓存双写一致性的理解
如果redis中有数据需要和数据库中的值相同。
如果redis中无数据数据库中的值要是最新值且准备回写redis。
缓存按照操作来分细分2种只读缓存和读写缓存。 读写缓存分为同步直写策略和异步缓写策略。 同步直写策略写数据库后也同步写redis缓存缓存和数据库中的数据一致对于读写缓存来说要想保证缓存和数据库中的数据一致就要采用同步直写策略。 异步缓写策略正常业务运行中mysq数据变动了但是可以在业务上容许出现一定时间后才作用于redis,比如仓库、物流系统异常情况出现了不得不将失败的动作重新修补有可能需要借助kafka或者RabbitMQ等消息中间件实现重试重写。 一图代码你如何写 问题上面业务逻辑你用java代码如何写
答案采用双检加锁策略。 多个线程同时去查询数据库的这条数据那么我们可以在第一个查询数据的请求上使用一个 互斥锁来锁住它。 其他的线程走到这一步拿不到锁就等着等第一个线程查询到了数据然后做缓存。 后面的线程进来发现已经有缓存了就直接走缓存。 伪代码 案例代码
package com.atguigu.redis.service;import com.atguigu.redis.entities.User;
import com.atguigu.redis.mapper.UserMapper;
import io.swagger.models.auth.In;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.PathVariable;import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;Service
Slf4j
public class UserService {public static final String CACHE_KEY_USER user:;Resourceprivate UserMapper userMapper;Resourceprivate RedisTemplate redisTemplate;/*** 业务逻辑没有写错对于小厂中厂(QPS《1000)可以使用但是大厂不行* param id* return*/public User findUserById(Integer id){User user null;String key CACHE_KEY_USERid;//1 先从redis里面查询如果有直接返回结果如果没有再去查询mysqluser (User) redisTemplate.opsForValue().get(key);if(user null){//2 redis里面无继续查询mysqluser userMapper.selectByPrimaryKey(id);if(user null){//3.1 redismysql 都无数据//你具体细化防止多次穿透我们业务规定记录下导致穿透的这个key回写redisreturn user;}else{//3.2 mysql有需要将数据写回redis保证下一次的缓存命中率redisTemplate.opsForValue().set(key,user);}}return user;}/*** 加强补充避免突然key失效了打爆mysql做一下预防尽量不出现击穿的情况。* param id* return*/public User findUserById2(Integer id){User user null;String key CACHE_KEY_USERid;//1 先从redis里面查询如果有直接返回结果如果没有再去查询mysql// 第1次查询redis加锁前user (User) redisTemplate.opsForValue().get(key);if(user null) {//2 大厂用对于高QPS的优化进来就先加锁保证一个请求操作让外面的redis等待一下避免击穿mysqlsynchronized (UserService.class){//第2次查询redis加锁后user (User) redisTemplate.opsForValue().get(key);//3 二次查redis还是null可以去查mysql了(mysql默认有数据)if (user null) {//4 查询mysql拿数据(mysql默认有数据)user userMapper.selectByPrimaryKey(id);if (user null) {return null;}else{//5 mysql里面有数据的需要回写redis完成数据一致性的同步工作redisTemplate.opsForValue().setIfAbsent(key,user,7L,TimeUnit.DAYS);}}}}return user;}} 数据库和缓存一致性的几种更新策略
目的
达到最终一致性。给缓存设置过期时间定期清理缓存并回写是保证最终一致性的解决方案。 我们可以对存入缓存的数据设置过期时间所有的写操作以数据库为准对缓存操作只是尽最大努力即可。也就是说如果数据库写成功缓存更新失败那么只要到达过期时间则后面的读请求自然会从数据库中读取新值然后回填缓存达到一致性切记要以mysql的数据库写入库为准。 上述方案和后续落地案例是调研后的主流成熟的做法但是考虑到各个公司业务系统的差距不是100%绝对正确不保证绝对适配全部情况请自行酌情选择打法合适自己的最好。
可以停机情况
挂牌报错凌晨升级温馨提示服务降级等使用单线程操作这样重量级的数据操作最好不要多线程。
不停机4种更新策略
先更新数据库再更新缓存不推荐
异常问题1
1、先更新mysql的某商品的库存当前商品的库存是100更新为99个。 2、先更新mysql修改为99成功然后更新redis。 3、此时假设异常出现更新redis失败了这导致mysql里面的库存是99而redis里面的还是100 。 4、上述发生会让数据库里面和缓存redis里面数据不一致读到redis脏数据。
异常问题2
【先更新数据库再更新缓存】A、B两个线程发起调用 【正常逻辑】 1 A update mysql 100 2 A update redis 100 3 B update mysql 80 4 B update redis 80 【异常逻辑】多线程环境下A、B两个线程有快有慢有前有后有并行 1 A update mysql 100 3 B update mysql 80 4 B update redis 80 2 A update redis 100 最终结果mysql和redis数据不一致o(╥﹏╥)o mysql80,redis100 先更新缓存再更新数据库不推荐
业务上一般把mysql作为底单数据库保证最后解释。
异常问题
【先更新缓存再更新数据库】A、B两个线程发起调用 【正常逻辑】 1 A update redis 100 2 A update mysql 100 3 B update redis 80 4 B update mysql 80 【异常逻辑】多线程环境下A、B两个线程有快有慢有并行 A update redis 100 B update redis 80 B update mysql 80 A update mysql 100 结果mysql100,redis80 先删除缓存再更新数据库不推荐
异常问题
步骤分析1先删除缓存再更新数据库 A线程先成功删除了redis里面的数据然后去更新mysql此时mysql正在更新中还没有结束。比如网络延时然后B突然出现要来读取缓存数据。 步骤分析2先删除缓存再更新数据库 此时redis里面的数据是空的B线程来读取先去读redis里数据(已经被A线程delete掉了)此处出来2个问题 1、B从mysql获得了旧值B线程发现redis里没有(缓存缺失)马上去mysql里面读取从数据库里面读取来的是旧值。 2、B会把获得的旧值写回redis获得旧值数据后返回前台并回写进redis(刚被A线程删除的旧数据有极大可能又被写回了)。 步骤分析3先删除缓存再更新数据库 A线程更新完mysql发现redis里面的缓存是脏数据A线程直接懵逼了o(╥﹏╥)o。两个并发操作一个是更新操作另一个是查询操作A删除缓存后B查询操作没有命中缓存B先把老数据读出来后放到缓存中然后A更新操作更新了数据库。于是在缓存中的数据还是老的数据导致缓存中的数据是脏的而且还一直这样脏下去了。
上面3步骤串讲梳理总结流程
1请求A进行写操作删除redis缓存后工作正在进行中更新mysql......A还么有彻底更新完mysql还没commit2请求B开工查询查询redis发现缓存不存在(被A从redis中删除了)3请求B继续去数据库查询得到了mysql中的旧值(A还没有更新完)4请求B将旧值写回redis缓存5请求A将新值写入mysql数据库
上述情况就会导致不一致的情形出现。 解决方案采用延时双删策略 加上sleep的这段时间就是为了让线程B能够先从数据库读取数据再把缺失的数据写入缓存然后线程A再进行删除。所以线程A sleep的时间就需要大于线程B读取数据再写入缓存的时间。这样一来其它线程读取数据时会发现缓存缺失所以会从数据库中读取最新值。因为这个方案会在第一次删除缓存值后延迟一段时间再次进行删除所以我们也把它叫做“延迟双删”。
双删方案面试题
这个删除该休眠多久呢
线程A sleep的时间就需要大于线程B读取数据再写入缓存的时间。这个时间怎么确定呢
第一种方法 在业务程序运行的时候统计下线程读数据和写缓存的操作时间自行评估自己的项目的读数据业务逻辑的耗时以此为基础来进行估算。然后写数据的休眠时间则在读数据业务逻辑的耗时基础上加百毫秒即可。 这么做的目的就是确保读请求结束写请求可以删除读请求造成的缓存脏数据。
第二种方法 新启动一个后台监控程序比如WatchDog监控程序会加时。
这种同步淘汰策略吞吐量降低怎么办 采用异步方式删除。 看门狗WatchDogi源码分析
先更新数据库再删除缓存推荐
异常问题
先更新数据库再删除缓存。 假如缓存删除失败或者来不及导致请求再次访问redis时缓存命中读取到的是缓存旧值。
大厂业务指导思想
微软云Cache-Aside pattern - Azure Architecture Center | Microsoft Learn
阿里巴巴canal思想上述的订阅binlog程序在mysql中有现成的中间件叫canal可以完成订阅binlog日志的功能。
解决方案 可以把要删除的缓存值或者是要更新的数据库值暂存到消息队列中例如使用Kafka/RabbitMQ等。当程序没有能够成功地删除缓存值或者是更新数据库值时可以从消息队列中重新读取这些值然后再次进行删除或更新。如果能够成功地删除或更新我们就要把这些值从消息队列中去除以免重复操作此时我们也可以保证数据库和缓存的数据一致了否则还需要再次进行重试如果重试超过的一定次数后还是没有成功我们就需要向业务层发送报错信息了通知运维人员。
类似经典的分布式事务问题只有一个权威答案最终一致性。 例如 流量充值先下发短信实际充值可能滞后5分钟可以接受 电商发货短信下发但是物流明天见 如何选择
在大多数业务场景下 建议(仅代表我个人不权威)优先使用先更新数据库再删除缓存的方案(先更库→后删存)。
理由如下
1、先删除缓存值再更新数据库有可能导致请求因缓存缺失而访问数据库给数据库带来压力导致打满mysql。
2、如果业务应用中读取数据库和写缓存的时间不好估算那么延迟双删中的等待时间就不好设置。
多补充一句如果使用先更新数据库再删除缓存的方案。 如果业务层要求必须读取一致性的数据那么我们就需要在更新数据库时先在Redis缓存客户端暂停并发读请求等数据库更新完、缓存值删除后再读取数据从而保证数据一致性这是理论可以达到的效果但实际不推荐因为真实生产环境中分布式下很难做到实时一致性一般都是最终一致性请大家参考。 Redis与MySQL数据双写一致性落地案例 Canal数据同步
是什么
官网https://github.com/alibaba/canal/wiki canal [kənæl]中文翻译为 水道/管道/沟渠/运河主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析是阿里巴巴开发并开源的采用Java语言开发 历史背景是早期阿里巴巴因为杭州和美国双机房部署存在跨机房数据同步的业务需求实现方式主要是基于业务 trigger触发器 获取增量变更。从2010年开始阿里巴巴逐步尝试采用解析数据库日志获取增量变更进行同步由此衍生出了canal项目 一句话canal译意为水道/管道/沟渠主要用途是基于MySQL数据库增量日志解析提供增量数据订阅和消费。
能干嘛
数据库镜像数据库实时备份索引构建和实时维护拆分异构索引、倒排索引等业务cache刷新带业务逻辑的增量数据处理
去哪下
地址https://github.com/alibaba/canal/releases/tag/canal-1.1.6
MySQL主从复制工作原理 MySQL的主从复制将经过如下步骤
1、当 master 主服务器上的数据发生改变时则将其改变写入二进制事件日志文件中
2、salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测探测其是否发生过改变
如果探测到 master 主服务器的二进制事件日志发生了改变则开始一个 I/O Thread 请求 master 二进制事件日志
3、同时 master 主服务器为每个 I/O Thread 启动一个dump Thread用于向其发送二进制事件日志
4、slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中
5、salve 从服务器将启动 SQL Thread 从中继日志中读取二进制日志在本地重放使得其数据和主服务器保持一致
6、最后 I/O Thread 和 SQL Thread 将进入睡眠状态等待下一次被唤醒
canal工作原理 mysql-canal-redis双写一致性实战
java案例来源出处ClientExample · alibaba/canal Wiki · GitHub
mysql配置
1、查看mysql版本 命令select version(); 2、当前的主机二进制日志 命令show master status; 3、查看日志文件是否开启 命令show variables like log_bin;默认关闭 4、开启MySQL的binlog写入功能
打开MySQL安装根目录修改my.ini文件 log-binmysql-bin #开启 binlog binlog-formatROW #选择 ROW 模式 server_id1 #配置MySQL replaction需要定义不要和canal的 slaveId重复 ROW模式除了记录sql语句之外还会记录每个字段的变化情况能够清楚的记录每行数据的变化历史但会占用较多的空间。 STATEMENT模式只记录了sql语句但是没有记录上下文信息在进行数据恢复的时候可能会导致数据的丢失情况 MIX模式比较灵活的记录理论上说当遇到了表结构变更的时候就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式 5、重启mysql
6、再次查看SHOW VARIABLES LIKE log_bin; 命令show variables like log_bin; 7、授权canal连接MySQL账号
mysql默认的用户在mysql库的user表里 命令select * from mysql.user 默认没有canal账户此处新建授权 DROP USER IF EXISTS canal%; CREATE USER canal% IDENTIFIED BY canal; GRANT ALL PRIVILEGES ON *.* TO canal% IDENTIFIED BY canal; FLUSH PRIVILEGES; SELECT * FROM mysql.user; canal服务端
1、下载
网址https://github.com/alibaba/canal/releases/tag/canal-1.1.6
下载Linux版本canal.deployer-1.1.6.tar.gz 注意发布时间版本2022.8.11后发布的才用
2、解压
解压后整体放入/mycanal路径下 3、配置
修改/mycanal/conf/example路径下instance.properties文件 换成自己的mysql主机master的IP地址 换成自己的在mysq新建的canal账户 4、启动
/opt/mycanal/bin路径下执行./startup.sh
5、查看canal是否启动成功
查看server日志 查看样例example的日志 canal客户端Java编写业务程序
1、随便选个数据库以你自己为主本例bigdata按照下面建表
CREATE TABLE t_user (id bigint(20) NOT NULL AUTO_INCREMENT,userName varchar(100) NOT NULL,PRIMARY KEY (id)
) ENGINEInnoDB AUTO_INCREMENT10 DEFAULT CHARSETutf8mb4;
2、pom.xml
!--canal--
dependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.client/artifactIdversion1.1.0/version
/dependency
3、yaml文件
# alibaba.druid
spring.datasource.typecom.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-namecom.mysql.jdbc.Driver
spring.datasource.urljdbc:mysql://localhost:3306/bigdata?useUnicodetruecharacterEncodingutf-8useSSLfalse
spring.datasource.usernameroot
spring.datasource.password123456
spring.datasource.druid.test-while-idlefalse
4、RedisUtils.java
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;public class RedisUtils{public static final String REDIS_IP_ADDR 192.168.111.185;public static final String REDIS_pwd 111111;public static JedisPool jedisPool;static {JedisPoolConfig jedisPoolConfignew JedisPoolConfig();jedisPoolConfig.setMaxTotal(20);jedisPoolConfig.setMaxIdle(10);jedisPoolnew JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000,REDIS_pwd);}public static Jedis getJedis() throws Exception {if(null!jedisPool){return jedisPool.getResource();}throw new Exception(Jedispool is not ok);}}5、RedisCanalClientExample.java
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.atguigu.canal.util.RedisUtils;
import redis.clients.jedis.Jedis;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;public class RedisCanalClientExample{public static final Integer _60SECONDS 60;public static final String REDIS_IP_ADDR 192.168.111.185;private static void redisInsert(ListColumn columns) {JSONObject jsonObject new JSONObject();for (Column column : columns) {System.out.println(column.getName() : column.getValue() update column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size() 0) {try(Jedis jedis RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());}catch (Exception e){e.printStackTrace();}}}private static void redisDelete(ListColumn columns) {JSONObject jsonObject new JSONObject();for (Column column : columns){jsonObject.put(column.getName(),column.getValue());}if(columns.size() 0){try(Jedis jedis RedisUtils.getJedis()){jedis.del(columns.get(0).getValue());}catch (Exception e){e.printStackTrace();}}}private static void redisUpdate(ListColumn columns){JSONObject jsonObject new JSONObject();for (Column column : columns){System.out.println(column.getName() : column.getValue() update column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size() 0){try(Jedis jedis RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());System.out.println(---------update after: jedis.get(columns.get(0).getValue()));}catch (Exception e){e.printStackTrace();}}}public static void printEntry(ListEntry entrys) {for (Entry entry : entrys) {if (entry.getEntryType() EntryType.TRANSACTIONBEGIN || entry.getEntryType() EntryType.TRANSACTIONEND) {continue;}RowChange rowChage null;try {//获取变更的row数据rowChage RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException(ERROR ## parser of eromanga-event has an error,data: entry.toString(),e);}//获取变动类型EventType eventType rowChage.getEventType();System.out.println(String.format(gt; binlog[%s:%s] , name[%s,%s] , eventType : %s,entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType EventType.INSERT) {redisInsert(rowData.getAfterColumnsList());} else if (eventType EventType.DELETE) {redisDelete(rowData.getBeforeColumnsList());} else {//EventType.UPDATEredisUpdate(rowData.getAfterColumnsList());}}}}public static void main(String[] args){System.out.println(---------O(∩_∩)O哈哈~ initCanal() main方法-----------);//// 创建链接canal服务端CanalConnector connector CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,11111), example, , );int batchSize 1000;//空闲空转计数器int emptyCount 0;System.out.println(---------------------canal init OK开始监听mysql变化------);try {connector.connect();//connector.subscribe(.*\\..*);connector.subscribe(bigdata.t_user);connector.rollback();int totalEmptyCount 10 * _60SECONDS;while (emptyCount totalEmptyCount) {System.out.println(我是canal每秒一次正在监听: UUID.randomUUID().toString());Message message connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId message.getId();int size message.getEntries().size();if (batchId -1 || size 0) {emptyCount;try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }} else {//计数器重新置零emptyCount 0;printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println(已经监听了totalEmptyCount秒无任何消息请重启重试......);} finally {connector.disconnect();}}
}
6、题外话
java程序下connector.subscribe配置的过滤正则 关闭资源代码简写ty-with-resources释放资源 try(...){ ... }cache(){ ... }finally{ ... }