当我们查询商铺信息时,首先先去 Redis 中查询,如果查询到了,则直接返回商铺信息,未查询到则查询数据库,将查询到的信息先写入 Redis 中,以便下次查询时可以直接命中缓存,然后再将商铺信息返回给用户。
@Service public class ShopServiceImpl extends ServiceImpl二、缓存更新策略 2.1 缓存更新策略implements IShopService { @Resource private StringRedisTemplate stringRedisTemplate; @Override public Result queryById(Long id) { String key = CACHE_SHOP_KEY + id; // 1.从redis查询商铺缓存 String shopJson = stringRedisTemplate.opsForValue().get(key); // 2.判断是否存在 if (StrUtil.isNotBlank(shopJson)) { // 3.存在,转为Java对象返回 Shop shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } // 4.不存在,根据id查询数据库 Shop shop = getById(id); // 5.不存在,返回错误 if (shop == null) { return Result.fail("店铺不存在!"); } // 6.存在,写入redis stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop)); // 7.返回 return Result.ok(shop); } }
业务场景:
- 低一致性需求:使用内存淘汰机制,例如店铺类型的查询缓存
- 高一致性需求:主动更新,并以超时剔除作为兜底方案,例如店铺详情查询的缓存
- 更新缓存:每次更新数据库都更新缓存,无效写操作较多。
如果我们对数据库做了上百次操作,那么就需要对缓存进行上百次操作,在进行这上百次的操作过程中,如果没有任何的查询操作,也就是写多读少,那么对于缓存的上百次操作都可以看作成是无效的操作。 - 删除操作:更新数据库时让缓存失效,查询时再更新缓存。(一般选择此种方案)
- 单体系统,将缓存与数据库操作放在一个事务。
- 分布式事务,利用 TCC 等分布式事务方案。
先删除缓存,再操作数据库:
假设有两个线程:线程1 和 线程2,线程1执行更新操作,先将缓存中的数据删除,然后执行更新数据库操作,由于更新逻辑复杂,执行时间较长,此时线程2 也开始执行,线程2 执行查询操作,由于缓存中的数据被线程 1 删除了,导致查询缓存未命中,于是线程2转而去查询数据库,此时数据库并未完成更新操作,查询出的数据依旧为旧数据,接着程序就将旧数据重新写入到了缓存。这就会导致后续的所有查询操作查询到的数据依旧是旧数据。
先操作数据库,再删除缓存:
三、缓存穿透缓存穿透 是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会到达数据库。
如果有恶意用户,使用大量线程并发访问这些不存在的数据,这样所有的请求都会到达数据库,数据库顶不住访问压力,就会崩掉。
缓存空对象
将数据库中不存在的数据以 null 的形式存储到缓存中。但是这种方式会增加额外的内存消耗,我们可以在缓存 null 的时候,设置过期时间。
- 优点:实现简单,维护方便
- 缺点:①额外的内存消耗 ②可能造成短期的不一致。
布隆过滤(一种算法)
在客户端与 Redis 之间增加一层过滤,当用户请求来的时候,先去访问布隆过滤器,判断请求的数据是否存在,如果不存在则拒绝请求,如果存在,则放行。
- 优点:内存占用较少,没有多余的 key
- 缺点:实现复杂 、存在误判的可能
布隆过滤器判断时,如果数据不存在,就是真的不存在,如果判断数据存在,那么有可能不存在。存在一定的穿透风险。
四、缓存雪崩缓存雪崩 是指在同一时段大量的缓存 key 同时失效或者 Redis 服务宕机,导致大量请求到达数据库,给数据库带来巨大压力。
4.1 解决方案:- 给不同的 Key 的 TTL(过期时间)添加随机值
一般在做缓存预热时,可能会提前将数据库中的数据批量导入到缓存中,由于是批量导入的,所以这些 key 的 TTL 是一样的,这就很有可能导致这些 key 在未来的某一时刻一起过期,从而引发缓存雪崩问题。为了解决这个问题,我们可以在做缓存预热时,可以在设置 TTL 时,在 TTL 后面追加一个随机数,比如 TTL 设置的 30 分钟,我们在30 的基础上加上一个 1~5之间的随机数,那么这些 key 的过期时间就会在 30 ~ 35 之间,这样就可以将 key 的过期时间分散开来,而不是一起失效。 - 利用 Redis 集群提高服务的可用性
利用 Redis 的哨兵机制,Redis 哨兵机制可以实现服务的监控,比如在一个主从模式下的 Redis 集群,当主机宕机时,哨兵就会从从机中选出一个来替代主机,这样就可以确保 Redis 一直对外提供服务。另外,主从模式还可以实现数据的同步,当主机宕机,从机上的数据也不会丢失。 - 给缓存业务添加降级限流策略
- 给业务添加多级缓存
可以先在反向代理服务器 Nginx 中做缓存,在 Nginx 中未命中缓存时,再去 Redis 中查询。
缓存击穿 也叫热点Key问题,一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
5.1 互斥锁假设线程 1 查询缓存未命中,那么线程 1 就需要进行缓存重建工作,为了避免其他线程重复线程1 的工作,线程 1 就必须要先获取互斥锁,只有获取锁成功的线程才能够重建缓存数据。重建完成后,线程 1 就会将数据写入到缓存中,并将锁释放。如果在线程 1 将数据写入缓存之前,其他线程涌入,这个时候,其他线程查询缓存依然是未命中的,那么这些线程为了重建缓存,也必须先获取到互斥锁,但是,由于此时线程 1 未释放锁,所以其他线程就会获取锁失败,一旦获取锁失败,一般程序处理是让线程休眠一会儿,然后再重试(包括查询缓存以及获取互斥锁),如果线程 1 执行缓存重建时间过长,就会导致其他线程一直处于阻塞等待重试的状态,效率过低。
5.2 逻辑过期当我们在向 Redis 中存储数据时,不再为 key 设置过期时间(TTL),但是,需要在 value 中额外添加一个逻辑时间(以当前时间为基础,加上需要设置的过期时间),也就是说,这个 key 一旦存入到 Redis 中,就永不过期。假设线程 1 在查询缓存时发现逻辑时间已经过期,为了避免出现多个线程重建缓存,线程 1 就会去获取互斥锁,一旦线程 1 获取互斥锁成功,就会开启一个独立线程,由独立线程去查询数据库重建缓存数据,以及写入缓存重置逻辑过期时间等操作,一旦完成操作,独立线程就会将互斥锁释放掉。线程 1 在开启独立线程后,会直接将过期数据返回。而在独立线程释放锁之前,缓存中的数据都是过期数据。当其他线程在此之前涌入程序时,去查询缓存获取到依旧是逻辑时间过期的数据,那么这些线程就会试图获取互斥锁,此时由于独立线程还未释放锁,所以会获取锁失败,一旦失败,这些线程就会将查询到的旧数据返回。只有当独立线程执行结束,其他线程才会从缓存中获取到新数据。
两种方案对比:
- 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
- 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
- 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
- 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
@Slf4j @Component public class CacheClient { private final StringRedisTemplate stringRedisTemplate; private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10); public CacheClient(StringRedisTemplate stringRedisTemplate) { this.stringRedisTemplate = stringRedisTemplate; } public void set(String key, Object value, Long time, TimeUnit unit) { stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit); } public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) { // 设置逻辑过期 RedisData redisData = new RedisData(); redisData.setData(value); redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time))); // 写入Redis stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData)); } publicR queryWithPassThrough( String keyPrefix, ID id, Class type, Function dbFallback, Long time, TimeUnit unit){ String key = keyPrefix + id; // 1.从redis查询商铺缓存 String json = stringRedisTemplate.opsForValue().get(key); // 2.判断是否存在 if (StrUtil.isNotBlank(json)) { // 3.存在,直接返回 return JSONUtil.toBean(json, type); } // 判断命中的是否是空值 if (json != null) { // 返回一个错误信息 return null; } // 4.不存在,根据id查询数据库 R r = dbFallback.apply(id); // 5.不存在,返回错误 if (r == null) { // 将空值写入redis stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES); // 返回错误信息 return null; } // 6.存在,写入redis this.set(key, r, time, unit); return r; } public R queryWithLogicalExpire( String keyPrefix, ID id, Class type, Function dbFallback, Long time, TimeUnit unit) { String key = keyPrefix + id; // 1.从redis查询商铺缓存 String json = stringRedisTemplate.opsForValue().get(key); // 2.判断是否存在 if (StrUtil.isBlank(json)) { // 3.存在,直接返回 return null; } // 4.命中,需要先把json反序列化为对象 RedisData redisData = JSONUtil.toBean(json, RedisData.class); R r = JSONUtil.toBean((JSONObject) redisData.getData(), type); LocalDateTime expireTime = redisData.getExpireTime(); // 5.判断是否过期 if(expireTime.isAfter(LocalDateTime.now())) { // 5.1.未过期,直接返回店铺信息 return r; } // 5.2.已过期,需要缓存重建 // 6.缓存重建 // 6.1.获取互斥锁 String lockKey = LOCK_SHOP_KEY + id; boolean isLock = tryLock(lockKey); // 6.2.判断是否获取锁成功 if (isLock){ // 6.3.成功,开启独立线程,实现缓存重建 CACHE_REBUILD_EXECUTOR.submit(() -> { try { // 查询数据库 R newR = dbFallback.apply(id); // 重建缓存 this.setWithLogicalExpire(key, newR, time, unit); } catch (Exception e) { throw new RuntimeException(e); }finally { // 释放锁 unlock(lockKey); } }); } // 6.4.返回过期的商铺信息 return r; } public R queryWithMutex( String keyPrefix, ID id, Class type, Function dbFallback, Long time, TimeUnit unit) { String key = keyPrefix + id; // 1.从redis查询商铺缓存 String shopJson = stringRedisTemplate.opsForValue().get(key); // 2.判断是否存在 if (StrUtil.isNotBlank(shopJson)) { // 3.存在,直接返回 return JSONUtil.toBean(shopJson, type); } // 判断命中的是否是空值 if (shopJson != null) { // 返回一个错误信息 return null; } // 4.实现缓存重建 // 4.1.获取互斥锁 String lockKey = LOCK_SHOP_KEY + id; R r = null; try { boolean isLock = tryLock(lockKey); // 4.2.判断是否获取成功 if (!isLock) { // 4.3.获取锁失败,休眠并重试 Thread.sleep(50); return queryWithMutex(keyPrefix, id, type, dbFallback, time, unit); } // 4.4.获取锁成功,根据id查询数据库 r = dbFallback.apply(id); // 5.不存在,返回错误 if (r == null) { // 将空值写入redis stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES); // 返回错误信息 return null; } // 6.存在,写入redis this.set(key, r, time, unit); } catch (InterruptedException e) { throw new RuntimeException(e); }finally { // 7.释放锁 unlock(lockKey); } // 8.返回 return r; } private boolean tryLock(String key) { Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } private void unlock(String key) { stringRedisTemplate.delete(key); } }