如何解决分布式架构系统中商品秒杀的高并发问题?
应用场景
为了解决分布式架构系统中的高并发问题,如分布式系统中的商品秒杀,如图:
注:java线程锁(如synchronize)无法解决分布式系统的高并发问题,因为java中的线程锁是jvm进程级别的锁,上图中A、B两个tomcat对应着两个进程,所以可以同时获得synchronize
解决
思路
使用redis实现分布式锁
在redis中:
SETkeyvalue[NX|XX][EX|PX]seconds
NX–只有键key不存在的时候才会设置key的值
XX–只有键key存在的时候才会设置key的值
EXseconds–设置键key的过期时间,单位时秒
PXmilliseconds–设置键key的过期时间,单位时毫秒
如setlock“test”nxex10,设置一个key为lock,value为test,过期时间为10秒的锁,当第二个进程要获取这个lock的时候,因为已经存在,所以获取不成功。
实现过程
使用springBoot整合redis解决,下面代码为controller层的核心实现代码
@RestController
publicclassRedisController{
@Autowired
privateStringRedisTemplatestringRedisTemplate;
@RequestMapping("/deduct_stock")
publicStringdeduct(@RequestParam("produtId")StringprodutId){
Stringkey=produtId;
try{
//设置锁并设置过期时间,原子操作
//相当于setkey"test"nxex10
Booleanresult=stringRedisTemplate.opsForValue().setIfAbsent(key,"test",10,TimeUnit.SECONDS);
if(!result){//没获取成功就不执行下面代码
return"error";
}
intstock=Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if(stock>0){
intresultStock=stock-1;
stringRedisTemplate.opsForValue().set("stock",resultStock+"");
System.out.println("购买成功,还剩:"+resultStock);
}else{
System.out.println("购买失败,库存不足!");
}
}finally{
//释放锁
stringRedisTemplate.delete(key);
}
return"ok";
}
}
上面代码,以商品id为锁key,当在分布式系统中多个tomcat同时访问"/deduct_stock"接口并且传参都相同(秒杀同一件商品时),只有获取了
这个锁才可继续操作。
但是上面代码也存在问题:
在0s时刻
A进程购买product_1,但是由于这是系统压力大,需要15s才可以运行完
10s时刻
A的锁被Redis自动释放了,这时候又有B购买product_1,这时B也可以获取到锁
15s时刻
A执行stringRedisTemplate.delete(key);释放锁,注意这时A释放的锁是B的锁。如果这时又有C购买product_1,则C又可以获取锁,B又释放C锁,一直下去,导致锁永久失效。
解决办法:
先用一个唯一标识作为锁的value,在释放锁的时候判断一下这个value是否为自己的,是才释放锁,解决自己进程不被其他进程释放
@RestController
publicclassRedisController{
@Autowired
privateStringRedisTemplatestringRedisTemplate;
@RequestMapping("/deduct_stock")
publicStringdeduct(@RequestParam("produtId")StringprodutId){
Stringkey=produtId;
StringclientId=UUID.randomUUID().toString();//唯一标识
try{
//设置锁并设置过期时间,原子操作
//相当于setkey"test"nxex30
Booleanresult=stringRedisTemplate.opsForValue().setIfAbsent(key,clientId,30,TimeUnit.SECONDS);
if(!result){//没获取成功就不执行下面代码
return"error";
}
intstock=Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if(stock>0){
intresultStock=stock-1;
stringRedisTemplate.opsForValue().set("stock",resultStock+"");
System.out.println("购买成功,还剩:"+resultStock);
}else{
System.out.println("购买失败,库存不足!");
}
}finally{
if(clientId.equals(stringRedisTemplate.opsForValue().get(key))){
//释放锁
stringRedisTemplate.delete(key);
}
}
return"ok";
}
上面代码只是解决了:自己进程不被其他进程释放
为了解决线程不会因为运行超时导致redis自动释放锁的问题
解决思路:
可以在进程拿到锁后开启分线程,分线程中设置一个定时器(jdkTimer),每隔一段时间查看一下单前进程的锁是否还存在,存在的话就将锁的时间复原,比如初始时间设置为10,每隔101/3的时间段,检查一下,如若存在,则将时间复原回10。
Redission
除了上面原生的代码,还可以使用Redission框架很方便解决以上问题
先导入相关包
org.redisson
redisson
3.6.5
然后再启动类中注入bean
@SpringBootApplication
publicclassSprbdemo2Application{
publicstaticvoidmain(String[]args){
SpringApplication.run(Sprbdemo2Application.class,args);
}
@Bean
publicRedissonredisson(){
//单机模式
Configconfig=newConfig();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
return(Redisson)Redisson.create(config);
}
}
@RestController
publicclassRedisController{
@Autowired
privateStringRedisTemplatestringRedisTemplate;
@Autowired
privateRedissonredisson;
@RequestMapping("/deduct_stock")
publicStringdeduct(@RequestParam("produtId")StringprodutId){
Stringkey=produtId;
RLocklock=redisson.getLock(key);
//StringclientId=UUID.randomUUID().toString();
try{
///设置锁并设置过期时间,原子操作
//相当于setkey"test"nxex30
Booleanresult=stringRedisTemplate.opsForValue().setIfAbsent(key,clientId,30,TimeUnit.SECONDS);
if(!result){//没获取成功就不执行下面代码
return"error";
}/
lock.lock(30,TimeUnit.SECONDS);
intstock=Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if(stock>0){
intresultStock=stock-1;
stringRedisTemplate.opsForValue().set("stock",resultStock+"");
System.out.println("购买成功,还剩:"+resultStock);
}else{
System.out.println("购买失败,库存不足!");
}
}finally{
/if(clientId.equals(stringRedisTemplate.opsForValue().get(key))){
//释放锁
stringRedisTemplate.delete(key);
}/
lock.unlock();
}
return"ok";
}
}
Redisson实现原理图:
总结
redis分布式锁解决秒杀商品的过程:
1、以商品id为锁key
2、设置唯一标识为锁的value(用作释放锁的判断条件,避免其他进程释放自己的锁
3、再获取锁的同时设置锁的过期时间(避免系统宕机导致死锁)
stringRedisTemplate.opsForValue().setIfAbsent(key,clientId,30,TimeUnit.SECONDS);
4、只有获取了锁的进程才可以进行以下购买操作,没有获取到的锁的进程进行阻塞操作(while操作一直尝试加锁操作)
5、在获取锁后开启一个分线程,使用定时器每隔一个时间段查看进程自己的锁是否过期,时间段=初始时间/3,如果还未过期,则恢复为初始时间,避免在因为运行超过锁的过期时间导致redis自己释放锁。
6、使用tryfinally在方法运行完后释放锁。
百度发现目前已经存在较好的框架Redisson框架更方便实现以上功能。底层思想也是和上面5点差不多。
效率问题思考
因为redis为单线程(但其并发量可达几万),所以分布式锁是将并发进程变为串行执行,如果想提高效率,如上面的秒杀商品,可将redis中的product1分段存储,比如product1:100分为
prouduct1_01:10、product1_02:10、product1_03:10…product1_10:10,
到时候分布式锁就可以锁分段后的商品id,这样效率会是原先秒杀product1的10倍。
|
|