缓存击穿
什么是缓存击穿?
缓存击穿指的是某个热点 key 在缓存中突然失效了,导致大量的请求都到达数据库,给数据库带来了不必要的压力。
缓存击穿的解决方案
- 互斥锁/分布式锁
- 逻辑过期
- 互斥锁/分布式锁 + 逻辑过期
- 定时刷新
互斥锁/分布式锁
互斥锁/分布式锁的具体原理就是:利用锁的互斥性,保证多个请求中只有一个请求能够到达数据库并访问数据库进行更新缓存,其它的请求阻塞等待。
需要做一个双重检测的机制,避免重复查询。其他请求拿到锁之后先判断缓存中是否存在数据,存在数据直接返回即可。
伪代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| String cacheKey = "hot_key"; Object cached = redis.get(cacheKey);
if (cached != null) { return cached; }
String lockKey = "lock:" + cacheKey; boolean locked = redis.setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
if (locked) { try { cached = redis.get(cacheKey); if (cached != null) { return cached; }
Object dbData = queryFromDB(cacheKey);
redis.set(cacheKey, dbData, 3600);
return dbData; } finally { redis.del(lockKey); } } else { Thread.sleep(100); return getFromCacheWithLock(cacheKey); }
|
这种方式数据一致性比较强,但是需要阻塞等待,会对性能造成一定影响。
逻辑过期
逻辑过期就是指不设置缓存 key 的过期时间,使其永不过期,由我们在保存缓存数据的时候手动维护一个 expireTime
字段到 Redis 当中。比如保存 JSON 或 Map 的时候显式的指定一个 expireTime
字段,字段中保存当前时间的时间戳 + 手动的过期时间。
每次获取数据的时候,解析相应的 expireTime
字段,与当前的时间戳进行比较,若保存的时间戳大于当前时间,则表示还没有逻辑过期,否则就已经过期,过期之后可以开启一个新的线程更新缓存或者使用消息队列去更新缓存。
该方案可能会存在短暂时间的脏数据,但是不需要阻塞请求。
互斥锁/分布式锁 + 逻辑过期
实际环境中,都是二者一起去使用来防止缓存击穿的,伪代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| @Service public class CacheService {
@Autowired private StringRedisTemplate redisTemplate;
private static final long LOGICAL_EXPIRE_TIME = 10 * 60 * 1000;
private static final String CACHE_KEY = "product:123";
public String getDataWithLogicalExpire() { String jsonStr = redisTemplate.opsForValue().get(CACHE_KEY);
if (jsonStr == null) { return rebuildCacheAndReturn(); }
JSONObject jsonObject = JSON.parseObject(jsonStr); Long expireTime = jsonObject.getLong("expireTime"); String data = jsonObject.getString("data");
if (expireTime > System.currentTimeMillis()) { return data; } else { asyncRefreshCache(); return data; } }
private void asyncRefreshCache() { ThreadPool.submit(() -> { try { String freshData = queryFromDatabase();
JSONObject newJson = new JSONObject(); newJson.put("data", freshData); newJson.put("expireTime", System.currentTimeMillis() + LOGICAL_EXPIRE_TIME);
redisTemplate.opsForValue().set(CACHE_KEY, newJson.toJSONString()); } catch (Exception e) { log.error("异步刷新缓存失败", e); } }); }
private String rebuildCacheAndReturn() { Boolean locked = redisTemplate.opsForValue().setIfAbsent("lock:" + CACHE_KEY, "1", 10, TimeUnit.SECONDS); if (!locked) { Thread.sleep(100); return getDataWithLogicalExpire(); }
try { String freshData = queryFromDatabase();
JSONObject newJson = new JSONObject(); newJson.put("data", freshData); newJson.put("expireTime", System.currentTimeMillis() + LOGICAL_EXPIRE_TIME);
redisTemplate.opsForValue().set(CACHE_KEY, newJson.toJSONString());
return freshData; } finally { redisTemplate.delete("lock:" + CACHE_KEY); } }
private String queryFromDatabase() { return "Data from DB"; } }
|
定时刷新
使用定时任务框架,比如 Spring Scheduler、xxl-job 等,针对每个热点 key 进行记录过期时间,在过期时间之前刷新数据,比如 1 分种过期时间进行数据刷新。
缓存穿透
什么是缓存穿透?
缓存穿透指的是数据在缓存和数据库中都不存在,从而导致大量的请求直接访问数据库,造成数据库压力过载,甚至出现宕机。
缓存穿透解决方案
缓存穿透的解决可以分为多个步骤。
第一步
首先对请求参数做好校验,拦截非法的请求。对于一些异常的用户或者异常的 IP 直接进行限流或者设置黑名单禁止访问,可以在网关层中进行拦截。
第二步
对于数据库中查不到的数据缓存一个空值进行返回,同时设置一个较短的过期时间,避免后续增加了该数据后反而查不到。
第三步
如果恶意用户构造大量不存在的数据疯狂攻击我们,那么缓存空值就力不从心了,可以使用布隆过滤器来实现。写数据库的时候同时写布隆过滤器,后续打来的请求通过布隆过滤器进行判断。
布隆过滤器就是把数据经过多个 hash 函数进行计算得到多个 hash 值,然后把 hash 值映射到一个 bitmap 中,把对应位置修改为 1。请求来了,就对这些数据用 hash 函数做计算,然后看看这几个 hash 值对应的 bitmap 的那个位置上是不是 1,是 1 说明有,是 0 说明没有。
但是布隆过滤器是存在一定误判的可能的。且布隆过滤器不支持删除,因为无法确定哪个哈希值是哪个元素设置的。
缓存雪崩
什么是缓存雪崩?
缓存雪崩指的是在同一时期大量的缓存 key 突然 同时过期,导致所有的请求都直接访问数据库,从而导致数据库的流量激增,压力骤增,最终导致宕机。
缓存雪崩解决方案
随机过期时间
该方案就是在给定一个固定的过期时间之后再随机加上一个随机数,从而避免大量缓存数据的同时过期。
示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
@Override public Page<PictureVO> listPictureVOByPageWithCache(PictureQueryRequest queryRequest) { String jsonStr = JSONUtil.toJsonStr(queryRequest); String md5Str = DigestUtils.md5DigestAsHex(jsonStr.getBytes()); final String key = "picture:listPictureVOByPage:" + md5Str; String valueData = redisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(valueData)) { return JSONUtil.toBean(valueData, new TypeReference<Page<PictureVO>>() { }, false); } int current = queryRequest.getCurrent(); int pageSize = queryRequest.getPageSize(); Page<Picture> picturePage = this.page(new Page<>(current, pageSize), this.getQueryWrapper(queryRequest)); Page<PictureVO> pictureVOPage = this.getPictureVOPage(picturePage); int timeout = 300 + RandomUtil.randomInt(0, 300); redisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(pictureVOPage), timeout, TimeUnit.SECONDS); return pictureVOPage; }
|
缓存永不过期
该方案就是针对于几乎不会改变的数据不设置过期时间,在有需要修改的场景进行异步删除或更新即可。
缓存预热
利用定时任务提前对缓存做预热,保证用户直接查询缓存而不是数据库。
多级缓存
结合本地缓存与分布式缓存一起使用,并设置不同的过期时间,避免对数据库直接造成请求。比如本地缓存 Caffeine 结合分布式缓存 Redis 共同构成多级缓存,避免缓存雪崩。
下面是一个我之前项目中使用的一个多级缓存示例。
1、引入依赖 (JDK11+)
1 2 3 4 5 6
| <dependency> <groupId>com.github.ben-manes.caffeine</groupId> <artifactId>caffeine</artifactId> <version>3.1.8</version> </dependency>
|
2、Caffeine 缓存工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
public class CaffeineUtil { private static final Cache<String, String> LOCAL_CACHE = Caffeine.newBuilder() .initialCapacity(1024) .maximumSize(10000L) .expireAfterWrite(5L, TimeUnit.MINUTES) .build();
public static Cache<String, String> getCache() { return LOCAL_CACHE; } }
|
3、构建多级缓存示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
@Override public Page<PictureVO> listPictureVOByPageWithCache(PictureQueryRequest queryRequest) { String jsonStr = JSONUtil.toJsonStr(queryRequest); String md5Str = DigestUtils.md5DigestAsHex(jsonStr.getBytes()); final String key = "picture:listPictureVOByPage:" + md5Str; String valueData = CaffeineUtil.getCache().getIfPresent(key); if (StrUtil.isNotBlank(valueData)) { return JSONUtil.toBean(valueData, new TypeReference<Page<PictureVO>>() { }, false); } valueData = redisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(valueData)) { CaffeineUtil.getCache().put(key, valueData); return JSONUtil.toBean(valueData, new TypeReference<Page<PictureVO>>() { }, false); } int current = queryRequest.getCurrent(); int pageSize = queryRequest.getPageSize(); Page<Picture> picturePage = this.page(new Page<>(current, pageSize), this.getQueryWrapper(queryRequest)); Page<PictureVO> pictureVOPage = this.getPictureVOPage(picturePage); CaffeineUtil.getCache().put(key, JSONUtil.toJsonStr(pictureVOPage)); int timeout = 300 + RandomUtil.randomInt(0, 300); redisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(pictureVOPage), timeout, TimeUnit.SECONDS); return pictureVOPage; }
|
不可避免的缓存雪崩突发情况
针对于 Redis 突然宕机,或者内存不够导致淘汰其他缓存数据,或者机房被洪水淹了等不可抗力因素导致的缓存雪崩问题,我们有如下的对应方案:
1、做好 Redis 的高可用,要么搭建主从 + 哨兵节点要么搭建多主多从集群。
2、做好服务的降级限流熔断的服务保护措施,做好兜底。