配色: 字号:
如何解决分布式架构系统中商品秒杀的高并发问题
2020-03-03 | 阅:  转:  |  分享 
  
如何解决分布式架构系统中商品秒杀的高并发问题?

应用场景

为了解决分布式架构系统中的高并发问题,如分布式系统中的商品秒杀,如图:





注:java线程锁(如synchronize)无法解决分布式系统的高并发问题,因为java中的线程锁是jvm进程级别的锁,上图中A、B两个tomcat对应着两个进程,所以可以同时获得synchronize



解决

思路



使用redis实现分布式锁

在redis中:



SETkeyvalue[NX|XX][EX|PX]seconds



NX–只有键key不存在的时候才会设置key的值



XX–只有键key存在的时候才会设置key的值



EXseconds–设置键key的过期时间,单位时秒



PXmilliseconds–设置键key的过期时间,单位时毫秒



如setlock“test”nxex10,设置一个key为lock,value为test,过期时间为10秒的锁,当第二个进程要获取这个lock的时候,因为已经存在,所以获取不成功。



实现过程

使用springBoot整合redis解决,下面代码为controller层的核心实现代码



@RestController

publicclassRedisController{

@Autowired

privateStringRedisTemplatestringRedisTemplate;



@RequestMapping("/deduct_stock")

publicStringdeduct(@RequestParam("produtId")StringprodutId){

Stringkey=produtId;

try{

//设置锁并设置过期时间,原子操作

//相当于setkey"test"nxex10

Booleanresult=stringRedisTemplate.opsForValue().setIfAbsent(key,"test",10,TimeUnit.SECONDS);

if(!result){//没获取成功就不执行下面代码

return"error";

}

intstock=Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

if(stock>0){

intresultStock=stock-1;

stringRedisTemplate.opsForValue().set("stock",resultStock+"");

System.out.println("购买成功,还剩:"+resultStock);

}else{

System.out.println("购买失败,库存不足!");

}



}finally{

//释放锁

stringRedisTemplate.delete(key);

}

return"ok";

}

}



上面代码,以商品id为锁key,当在分布式系统中多个tomcat同时访问"/deduct_stock"接口并且传参都相同(秒杀同一件商品时),只有获取了

这个锁才可继续操作。

但是上面代码也存在问题:

在0s时刻

A进程购买product_1,但是由于这是系统压力大,需要15s才可以运行完



10s时刻

A的锁被Redis自动释放了,这时候又有B购买product_1,这时B也可以获取到锁



15s时刻

A执行stringRedisTemplate.delete(key);释放锁,注意这时A释放的锁是B的锁。如果这时又有C购买product_1,则C又可以获取锁,B又释放C锁,一直下去,导致锁永久失效。

解决办法:

先用一个唯一标识作为锁的value,在释放锁的时候判断一下这个value是否为自己的,是才释放锁,解决自己进程不被其他进程释放



@RestController

publicclassRedisController{

@Autowired

privateStringRedisTemplatestringRedisTemplate;



@RequestMapping("/deduct_stock")

publicStringdeduct(@RequestParam("produtId")StringprodutId){

Stringkey=produtId;

StringclientId=UUID.randomUUID().toString();//唯一标识

try{

//设置锁并设置过期时间,原子操作

//相当于setkey"test"nxex30

Booleanresult=stringRedisTemplate.opsForValue().setIfAbsent(key,clientId,30,TimeUnit.SECONDS);

if(!result){//没获取成功就不执行下面代码

return"error";

}

intstock=Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

if(stock>0){

intresultStock=stock-1;

stringRedisTemplate.opsForValue().set("stock",resultStock+"");

System.out.println("购买成功,还剩:"+resultStock);

}else{

System.out.println("购买失败,库存不足!");

}



}finally{

if(clientId.equals(stringRedisTemplate.opsForValue().get(key))){

//释放锁

stringRedisTemplate.delete(key);

}



}

return"ok";

}



上面代码只是解决了:自己进程不被其他进程释放

为了解决线程不会因为运行超时导致redis自动释放锁的问题

解决思路:

可以在进程拿到锁后开启分线程,分线程中设置一个定时器(jdkTimer),每隔一段时间查看一下单前进程的锁是否还存在,存在的话就将锁的时间复原,比如初始时间设置为10,每隔101/3的时间段,检查一下,如若存在,则将时间复原回10。



Redission

除了上面原生的代码,还可以使用Redission框架很方便解决以上问题

先导入相关包





org.redisson

redisson

3.6.5





然后再启动类中注入bean



@SpringBootApplication

publicclassSprbdemo2Application{



publicstaticvoidmain(String[]args){

SpringApplication.run(Sprbdemo2Application.class,args);

}



@Bean

publicRedissonredisson(){

//单机模式

Configconfig=newConfig();

config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);

return(Redisson)Redisson.create(config);

}



}





@RestController

publicclassRedisController{

@Autowired

privateStringRedisTemplatestringRedisTemplate;



@Autowired

privateRedissonredisson;



@RequestMapping("/deduct_stock")

publicStringdeduct(@RequestParam("produtId")StringprodutId){

Stringkey=produtId;

RLocklock=redisson.getLock(key);

//StringclientId=UUID.randomUUID().toString();

try{

///设置锁并设置过期时间,原子操作

//相当于setkey"test"nxex30

Booleanresult=stringRedisTemplate.opsForValue().setIfAbsent(key,clientId,30,TimeUnit.SECONDS);

if(!result){//没获取成功就不执行下面代码

return"error";

}/



lock.lock(30,TimeUnit.SECONDS);

intstock=Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));

if(stock>0){

intresultStock=stock-1;

stringRedisTemplate.opsForValue().set("stock",resultStock+"");

System.out.println("购买成功,还剩:"+resultStock);

}else{

System.out.println("购买失败,库存不足!");

}



}finally{

/if(clientId.equals(stringRedisTemplate.opsForValue().get(key))){

//释放锁

stringRedisTemplate.delete(key);

}/

lock.unlock();



}

return"ok";

}

}



























Redisson实现原理图:





总结

redis分布式锁解决秒杀商品的过程:

1、以商品id为锁key



2、设置唯一标识为锁的value(用作释放锁的判断条件,避免其他进程释放自己的锁



3、再获取锁的同时设置锁的过期时间(避免系统宕机导致死锁)

stringRedisTemplate.opsForValue().setIfAbsent(key,clientId,30,TimeUnit.SECONDS);



4、只有获取了锁的进程才可以进行以下购买操作,没有获取到的锁的进程进行阻塞操作(while操作一直尝试加锁操作)



5、在获取锁后开启一个分线程,使用定时器每隔一个时间段查看进程自己的锁是否过期,时间段=初始时间/3,如果还未过期,则恢复为初始时间,避免在因为运行超过锁的过期时间导致redis自己释放锁。



6、使用tryfinally在方法运行完后释放锁。



百度发现目前已经存在较好的框架Redisson框架更方便实现以上功能。底层思想也是和上面5点差不多。



效率问题思考

因为redis为单线程(但其并发量可达几万),所以分布式锁是将并发进程变为串行执行,如果想提高效率,如上面的秒杀商品,可将redis中的product1分段存储,比如product1:100分为

prouduct1_01:10、product1_02:10、product1_03:10…product1_10:10,

到时候分布式锁就可以锁分段后的商品id,这样效率会是原先秒杀product1的10倍。











献花(0)
+1
(本文系华大职业教...首藏)