轻松理解redis做缓存的流程

2022-04-05
2935

Redis

缓存

1.redis的缓存过期处理

a.(主动)定时删除

Redis会定时的抽查一些随机的key,默认1秒钟抽查10次(可配置),一旦抽查到某一个key是过期的,Redis就会删除这个key。



# 默认每秒钟10次,把这个设置的越大意味着占用的CPU也就越多。范围在1到500之间,但是超过100的值通常  # 不是好主意。大多数用户应该使用默认值10并将其提高到仅在需要非常低延迟的环境中。hz 10



b.(被动)惰性删除

客户端在请求的时候,有可能会请求到一个已经过期的key,这种时候就是删除这个key的时候。用户在请求查询某一个key的时候,那么 redis 会检查这个 key 是否过期,如果过期则删除,返回一个 nil,这种策略对 CPU 比较友好,不会有太多的损耗,但缺点是过期的key无法及时删除。



2.redis的内存淘汰机制

内存如果占满了,可以使用硬盘,但是没有意义,因为硬盘没有内存块,会严重影响 redis 的性能,所以,当内存占用满了之后,redis 提供了一套缓存淘汰机制,即 memory management



# 当内存已使用值达到 maxmemory,开始清理缓存 # 单位:bytes(字节),123000000Byte ≈ 117.3Mmaxmemory 123000000 maxmemory-policy 缓存淘汰策略值枚举: # volatile开头的策略针对设置了过期时间的key# allkeys开头的策略针对内存中所有的key # volatile-lru:在使用 expire 设置了过期时间的 key 中,删除最近最少使用的缓存,然后保存新的缓存 # allkeys-lru:删除最近最少使用的缓存,然后保存新的缓存(推荐使用) # volatile-lfu:在使用 expire 设置了过期时间的 key 中,删除最不经常使用的缓存,然后保存新的缓存 # allkeys-lfu:删除最不经常使用的缓存,然后保存新的缓存(推荐使用) # volatile-random:在使用 expire 设置了过期时间的 key 中,随机删除一个 key # allkeys-random:在所有 key 中,随机删除一个 key # volatile-ttl:在使用 expire 设置了过期时间的 key 中,删除最快要过期的 key,即删除 TTL 最小的 key # noeviction:不删除任何数据,报出写操作异常(默认值)




如果没有设置maxmemory,将导致Redis在达到堆限制时以内存不足异常终止。


在设置了maxmemory之后,当内存使用达到maxmemory时,Redis将使用maxmemory-policy指定的策略对缓存进行删除。



3.redis缓存击穿的原因

key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。让数据库承压


4.redis缓存雪崩的原因

主要是对于redis集群来说的,大量的缓存失效,由数据库承压,叫做雪崩

5.redis缓存穿透的原因

key对应的数据在数据源并不存在,每次针对此key的请求从缓存获取不到,请求都会到数据源,从而可能压垮数据源。比如用一个不存在的用户id获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。

通俗理解:用户发起的请求,缓存中没有数据,数据库中也没有响应的数据,短时间发起大量的假请求都走了数据库,让数据库承压了,并且数据库中还不存在这些数据



三,缓存穿透解决方案

一个一定不存在缓存及查询不到的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。


有很多种方法可以有效地解决缓存穿透问题,最常见的则是采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的bitmap中,一个一定不存在的数据会被 这个bitmap拦截掉,从而避免了对底层存储系统的查询压力。另外也有一个更为简单粗暴的方法(我们采用的就是这种),如果一个查询返回的数据为空(不管是数据不存在,还是系统故障),我们仍然把这个空结果进行缓存,但它的过期时间会很短,最长不超过五分钟。




//伪代码public object GetProductListNew() {     int cacheTime = 30;     String cacheKey = "product_list";       String cacheValue = CacheHelper.Get(cacheKey);         if (cacheValue != null) {         return cacheValue;     }       cacheValue = CacheHelper.Get(cacheKey);         if (cacheValue != null) {         return cacheValue;     } else {         //数据库查询不到,为空         cacheValue = GetProductListFromDB();         if (cacheValue == null) {         //如果发现为空,设置个默认值,也缓存起来             cacheValue = string.Empty;         }         CacheHelper.Add(cacheKey, cacheValue, cacheTime);         return cacheValue;     }}





四,缓存击穿解决方案

key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题。

使用互斥锁(mutex key)


业界比较常用的做法,是使用mutex。简单地来说,就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db,而是先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX或者Memcache的ADD)去set一个mutex key,当操作返回成功时,再进行load db的操作并回设缓存;否则,就重试整个get缓存的方法。


SETNX,是「SET if Not eXists」的缩写,也就是只有不存在的时候才设置,可以利用它来实现锁的效果。



public String get(key) { String value = redis.get(key); if (value == null) { //代表缓存值过期 //设置3min的超时,防止del操作失败的时候,下次缓存过期一直不能load db if (redis.setnx(key_mutex, 1, 3 * 60) == 1) { //代表设置成功 value = db.get(key); redis.set(key, value, expire_secs); redis.del(key_mutex); } else { //这个时候代表同时候的其他线程已经load db并回设到缓存了,这时候重试获取缓存值即可 sleep(50); get(key); //重试 } } else { return value;  } }



memcache代码:




if (memcache.get(key) == null) {  // 3 min timeout to avoid mutex holder crash  if (memcache.add(key_mutex, 3 * 60 * 1000) == true) {  value = db.get(key);  memcache.set(key, value);  memcache.delete(key_mutex);  } else {  sleep(50);  retry();  } }





五,缓存雪崩解决方案

与缓存击穿的区别在于这里针对很多key缓存,前者则是某一个key。


缓存正常从Redis中获取,示意图如下:


缓存失效时的雪崩效应对底层系统的冲击非常可怕!大多数系统设计者考虑用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。还有一个简单方案就时讲缓存失效时间分散开,比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。


加锁排队,伪代码如下:




//伪代码public object GetProductListNew() { int cacheTime = 30; String cacheKey = "product_list"; String lockKey = cacheKey;   String cacheValue = CacheHelper.get(cacheKey); if (cacheValue != null) { return cacheValue; } else { synchronized(lockKey) { cacheValue = CacheHelper.get(cacheKey); if (cacheValue != null) { return cacheValue; } else { //这里一般是sql查询数据 cacheValue = GetProductListFromDB();  CacheHelper.Add(cacheKey, cacheValue, cacheTime); } } return cacheValue; }}




加锁排队只是为了减轻数据库的压力,并没有提高系统吞吐量。假设在高并发下,缓存重建期间key是锁着的,这是过来1000个请求999个都在阻塞的。同样会导致用户等待超时,这是个治标不治本的方法!


注意:加锁排队的解决方式分布式环境的并发问题,有可能还要解决分布式锁的问题;线程还会被阻塞,用户体验很差!因此,在真正的高并发场景下很少使用!


随机值伪代码:



//伪代码public object GetProductListNew() { int cacheTime = 30; String cacheKey = "product_list"; //缓存标记 String cacheSign = cacheKey + "_sign";   String sign = CacheHelper.Get(cacheSign); //获取缓存值 String cacheValue = CacheHelper.Get(cacheKey); if (sign != null) { return cacheValue; //未过期,直接返回 } else { CacheHelper.Add(cacheSign, "1", cacheTime); ThreadPool.QueueUserWorkItem((arg) -> { //这里一般是 sql查询数据 cacheValue = GetProductListFromDB();  //日期设缓存时间的2倍,用于脏读 CacheHelper.Add(cacheKey, cacheValue, cacheTime * 2);  }); return cacheValue; }}



解释说明:


缓存标记:记录缓存数据是否过期,如果过期会触发通知另外的线程在后台去更新实际key的缓存;

缓存数据:它的过期时间比缓存标记的时间延长1倍,例:标记缓存时间30分钟,数据缓存设置为60分钟。这样,当缓存标记key过期后,实际缓存还能把旧数据返回给调用端,直到另外的线程在后台更新完成后,才会返回新缓存。

关于缓存崩溃的解决方法,这里提出了三种方案:使用锁或队列、设置过期标志更新缓存、为key设置不同的缓存失效时间,还有一种被称为“二级缓存”的解决方法。