缓存和分布式锁

Posted by Chenyawei on 2021-09-16
Words 3.1k and Reading Time 13 Minutes
Viewed Times

一、缓存

1、缓存使用

为了系统性能的提升,我们一般都会将部分数据放入缓存中,加速访问。而 db 承担数据落盘工作。 哪些数据适合放入缓存?

  • 即时性、数据一致性要求不高的
  • 访问量大且更新频率不高的数据(读多,写少)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private Map<String, List<Catelog2Vo>> getCatalogJsonFromDB() {
String catalogJson = stringRedisTemplate.opsForValue().get("catalogJson"); // 从缓存中加载数据
if (StringUtils.isNotBlank(catalogJson)) {
return JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Catelog2Vo>>>() {
});
}
System.out.println("查询了数据库");
//将数据库的多次查询变为一次
List<CategoryEntity> selectList = this.baseMapper.selectList(null);
//1、查出所有分类
//1、1)查出所有一级分类
List<CategoryEntity> level1Categorys = getParent_cid(selectList, 0L);
//封装数据
Map<String, List<Catelog2Vo>> parentCid = level1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> {
//1、每一个的一级分类,查到这个一级分类的二级分类
List<CategoryEntity> categoryEntities = getParent_cid(selectList, v.getCatId());
//2、封装上面的结果
List<Catelog2Vo> catelog2Vos = null;
if (categoryEntities != null) {
catelog2Vos = categoryEntities.stream().map(l2 -> {
Catelog2Vo catelog2Vo = new Catelog2Vo(v.getCatId().toString(), null, l2.getCatId().toString(), l2.getName().toString());
//1、找当前二级分类的三级分类封装成vo
List<CategoryEntity> level3Catelog = getParent_cid(selectList, l2.getCatId());
if (level3Catelog != null) {
List<Catelog2Vo.Category3Vo> category3Vos = level3Catelog.stream().map(l3 -> {
//2、封装成指定格式
Catelog2Vo.Category3Vo category3Vo = new Catelog2Vo.Category3Vo(l2.getCatId().toString(), l3.getCatId().toString(), l3.getName());
return category3Vo;
}).collect(Collectors.toList());
catelog2Vo.setCatalog3List(category3Vos);
}
return catelog2Vo;
}).collect(Collectors.toList());
}
return catelog2Vos;
}));
stringRedisTemplate.opsForValue().set("catalogJson", JSON.toJSONString(parentCid), 24, TimeUnit.HOURS); // 保存到cache中
return parentCid;
}

注意:在开发中,凡是放入缓存中的数据我们都应该指定过期时间,使其可以在系统即使没有主动更新数据也能自动触发数据加载进缓存的流程。避免业务崩溃导致的数据永久不一致问题。

2、整合redis 作为缓存

1)、引入redis=starter
1
2
3
4
<dependency> 
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2)、配置redis
1
2
3
4
spring:
redis:
host: 172.20.6.176
port: 6379
3)、使用RedisTemplate操作redis
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@SpringBootTest
class GulimallProductApplicationTests {

@Autowired
StringRedisTemplate stringRedisTemplate;

@Test
void testRedis() {
// Redis的五大数据类型
/**
* String(字符串)
* string是redis最基本的类型,可以理解为Memcached一样的类型,一个key对应一个value。
* string类型是二进制安全的,意思是redis的string可以包含任何数据。比如ipg图片或者序列化的对象。
* string类型是Redis最基本的数据结构,一个redis中字符串value最多可以是512M。
*
* Hash(哈希,类似java里的Map)
* Redis hash是一个键值对集合。
* Redis hash是一个string类型的field和value的映射表,hash特别适合用于存储对象。
* 类似java里面的Map
*
* List(列表)
* Redis列表是简单的字符串列表,按照插入顺序排序。可以添加一个元素到列表的头部(左边)或者尾部(右边)。
* 它的底层是一个链表。
*
* Set(集合)
* Redis的Set是string类型的无序集合。它是通过HashTable实现的。
*
* Zset(sorted set:有序集合)
* Redis zset 和 set 一样也是string类型元素的集合,且不允许重复的成员。
* 不同的是每个元素都会关联一个double类型的分数。
* redis正是通过分数来为集合中的成员进行从小到大的排序。zset的成员是唯一的,但分数(score)却可以重复。
*/
// string
stringRedisTemplate.opsForValue().set("testKey", "testValue" + UUID.randomUUID().toString());
System.out.println(stringRedisTemplate.opsForValue().get("testKey"));

// list
ListOperations<String, String> stringStringListOperations = stringRedisTemplate.opsForList();
stringStringListOperations.rightPushAll("listKey", Lists.newArrayList("a", "b", "c", "d"));
System.out.println("获取listKey: " + stringStringListOperations.size("listKey"));
System.out.println("获取listKey: " + stringStringListOperations.range("listKey", 0, -1));
stringStringListOperations.remove("listKey", 10, "a");
stringStringListOperations.remove("listKey", 10, "b");
stringStringListOperations.remove("listKey", 10, "c");
stringStringListOperations.remove("listKey", 10, "d");
System.out.println("获取listKey后: " + stringStringListOperations.size("listKey"));
System.out.println("获取listKey后: " + stringStringListOperations.range("listKey", 0, -1));

// hash KV模式不变,但V是一个键值对
HashOperations<String, Object, Object> stringObjectObjectHashOperations = stringRedisTemplate.opsForHash();
stringObjectObjectHashOperations.put("hashKey", "id", "chenyawei003");
stringObjectObjectHashOperations.put("hashKey", "name", "陈亚威");
stringObjectObjectHashOperations.put("hashKey", "age", "80");
System.out.println("hashKey: " + stringObjectObjectHashOperations.multiGet("hashKey", Lists.newArrayList("id", "name", "age")));
;
// set
stringRedisTemplate.opsForSet().add("setKey", "v5", "v1", "v2", "v3", "v4");
System.out.println("setKey: " + stringRedisTemplate.opsForSet().members("setKey"));
// zset
ZSetOperations<String, String> stringStringZSetOperations = stringRedisTemplate.opsForZSet();
stringStringZSetOperations.add("zsetKey", "v1", 60);
stringStringZSetOperations.add("zsetKey", "v2", 70);
stringStringZSetOperations.add("zsetKey", "v3", 80);
stringStringZSetOperations.add("zsetKey", "v4", 90);
System.out.println("zsetKey按下标: " + stringStringZSetOperations.range("zsetKey", 2, 3));
System.out.println("zsetKey按分数: " + stringStringZSetOperations.rangeByScore("zsetKey", 65, 88));
}

}
4)、切换使用jedis
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency> 
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>

二、缓存失效问题

解决大并发读情况下的缓存失效问题

1、缓存穿透 (查询一个一定不存在的数据)

  • 缓存击穿是指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义.
  • 在流量大时,可能db就挂掉了,要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞.
  • 解决: 缓存空结果,并设置短的过期时间

2、缓存雪崩

  • 缓存雪崩是指我们在设置缓存时,采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB, DB瞬时压力过重雪崩.
  • 解决: 原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件.

3、缓存击穿(热点数据)

  • 对于一些设置了过期时间的key, 如果这些key可能会在某些时间点被超高并发的访问,是一种非常热点的数据
  • 这个时候,需要考虑一个问题: 如果这个key在大量请求同时进来前正好失效,那么所有对这个key的数据查询都落到db,我们称为缓存击穿
  • 解决: 加锁

三、缓存数据一致性

1、保证一致性的模式

1)、双写模式

image-20210916115113219

改进方法: 分布式读写锁.读数据等待写数据整个操作完成

2)、失效模式

image-20210916115132998

改进方法: 业务代码->更新DataBase->binlog->Canal->redis

四、分布式锁

1、分布式锁与本地锁

image-20210916133836820

synchronized (this) 就是本地锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 从数据库查询商品种类数据,使用本地锁
*
* @return map
*/
private Map<String, List<Catelog2Vo>> getCatalogJsonFromDBWithLocalLock() {

/**
* 本地锁:synchronized,JUC(lock) , 分布式情况下,想要锁住所有线程,必须使用分布式锁
*/
synchronized (this) {
return getCatalogJsonFromDB();
}
}

2、分布式锁实现

image-20210916133851478

使用 RedisTemplate 操作分布式锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
 /**
* 从数据库查询商品种类数据,使用redis 分布式锁
*
* @return map
*/
private Map<String, List<Catelog2Vo>> getCatalogJsonFromDBWithRedisLock() {

/**
* 本地锁:synchronized,JUC(lock) , 分布式情况下,想要锁住所有线程,必须使用分布式锁
*/
String uuid = UUID.randomUUID().toString();
//1、占分布式锁。去 redis 占坑
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("catalog-lock", uuid, 300, TimeUnit.SECONDS);
if (lock) {
System.out.println("获取分布式成功。。。");
Map<String, List<Catelog2Vo>> catalogJsonFromDB;
try {
//2、设置过期时间,必须和加锁是同步的,原子的
// redisTemplate.expire("lock",30,TimeUnit.SECONDS);
catalogJsonFromDB = getCatalogJsonFromDB();
} finally {
// Redis实现分布式锁3-使用LUA脚本解锁,解决原子性问题;
// 获取值对比+对比成功删除 = 原子操作
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
RedisScript<Long> redisScript = new DefaultRedisScript(script, Long.class);
// 删除锁
Long lock1 = stringRedisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);

}

// String lockValue = stringRedisTemplate.opsForValue().get("lock");
// if (StringUtils.equals(lockValue,uuid)){
// // 删除自己的锁,redis
// stringRedisTemplate.delete("lock");
// }
return catalogJsonFromDB;
} else {
// 加锁失败。。。重试 synchronized()
// 休眠100ms
System.out.println("获取分布式锁失败。。。等待重试");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 自旋的方式
return getCatalogJsonFromDBWithRedisLock();
}
}

3、Redisson完成分布式锁

1、简介

Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格(In-Memory Data Grid)。充分的利用了 Redis 键值数据库提供的一系列优势,基于Java 实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。

官方文档:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95

2、配置
1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class MyRedissonConfig {

@Bean(destroyMethod = "shutdown")
public RedissonClient redisson() throws IOException {
Config config = new Config();
config.useSingleServer().setAddress("redis://172.20.6.176:6379");
// config.useClusterServers().addNodeAddress("redis://127.0.0.1:7004", "redis://127.0.0.1:7001"); 集群模式
return Redisson.create(config);
}
}
3、使用分布式锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 1)、springboot2.0以后默认使用lettuce作为操作redis的客户端。它使用netty进行网络通信
* 2)、lettuce的bug导致netty堆外内存溢出 -Xmx300m; netty如果没有指定堆外内存,默认使用-Xmx300m,
* 可以通过-Dio.netty.maxDirectMemory进行设置
* 解决方案:不能使用-Dio.netty.maxDirectMemory只去调大堆外内存
* (1)、升级lettuce客户端 (2)、切换使用jedis
* redisTemplate:
* lettuce、jedis操作redis的底层客户端。spring再次封装redisTemplate
*
* @return
*/
// @Override
public Map<String, List<Catelog2Vo>> getCatalogJson2() {
// 从redis中获取
/**
* 1、空结果缓存:解决缓存穿透
* 2、设置过期时间(加随机值):解决缓存雪崩
* 3、加锁:解决缓存击穿
*/
String catalogJson = stringRedisTemplate.opsForValue().get("catalogJson");
if (StringUtils.isBlank(catalogJson)) {
/**
* 分布式情况下,想要锁住所有线程,必须使用分布式锁
*/
RLock rLock = redissonClient.getLock("catalog-lock");
rLock.lock(30, TimeUnit.SECONDS);
Map<String, List<Catelog2Vo>> catalogJsonFromDB;
try {
catalogJsonFromDB = getCatalogJsonFromDB();
} finally {
rLock.unlock();
}
return catalogJsonFromDB;
}
Map<String, List<Catelog2Vo>> stringListMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Catelog2Vo>>>() {
});
return stringListMap;
}

五、Spring Cache

1、简介

  • Spring 从 3.1 开始定义了 org.springframework.cache.Cache 和 org.springframework.cache.CacheManager 接口来统一不同的缓存技术; 并支持使用 JCache(JSR-107)注解简化我们开发;

  • Cache 接口为缓存的组件规范定义,包含缓存的各种操作集合;Cache 接 口 下 Spring 提 供 了 各 种 xxxCache 的 实 现 ; 如 RedisCache ,EhCacheCache,ConcurrentMapCache 等;

  • 每次调用需要缓存功能的方法时,Spring 会检查检查指定参数的指定的目标方法是否已经被调用过;如果有就直接从缓存中获取方法调用后的结果,如果没有就调用方法并缓存结果后返回给用户。下次调用直接从缓存中获取。

  • 使用 Spring 缓存抽象时我们需要关注以下两点;

    1、确定方法需要被缓存以及他们的缓存策略

    2、从缓存中读取之前缓存存储的数据

2、基础概念

image-20210916142623717

3、注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1)、引入依赖spring-boot-starter-cache
2)、写配置
自动配置了:CacheAutoConfiguration里面引入了RedisCacheConfiguration;RedisCacheConfiguration自动配好了缓存管理器RedisCacheManage
我们需要配置:配置使用redis作为缓存
3)、测试使用缓存,对于缓存声明,Spring的缓存抽象提供了一组Java注释
@Cacheable: Triggers cache population. 触发将数据保存到缓存到操作
@CacheEvict: Triggers cache eviction. 触发将数据从缓存中删除到操作
@CachePut: Updates the cache without interfering with the method execution. 不影响方法执行更新缓存
@Caching: Regroups multiple cache operations to be applied on a method. 组合以上多个操作
@CacheConfig: Shares some common cache-related settings at class-level. 在类级别共享缓存的相同配置
1)、开启缓存 @EnableCaching
2)、只需要使用注解就能完成缓存操作
4)、原理
CacheAutoConfiguration 导入类CacheConfigurations.getConfigurationClass-> RedisCacheConfiguration ->
自动配置了RedisCacheManager -> 初始化所有的缓存->每个缓存决定使用什么配置->如果RedisCacheConfiguration有就使用已有的,没有
就使用默认配置->想改缓存配置,只需要给容器中放一个RedisCacheConfiguration即可->就会应用当前RedisCacheManager管理的所有缓存分区中

image-20210916143528740

image-20210916143604475

4、语法表达式

image-20210916143714493


notice

欢迎访问 chenyawei 的博客, 若有问题或者有好的建议欢迎留言,笔者看到之后会及时回复。 评论点赞需要github账号登录,如果没有账号的话请点击 github 注册, 谢谢 !

If you like this blog or find it useful for you, you are welcome to comment on it. You are also welcome to share this blog, so that more people can participate in it. If the images used in the blog infringe your copyright, please contact the author to delete them. Thank you !