文章目录通过本文你可以学习到:
负载均衡策略RandomRule该策略会从当前可用的服务节点中,随机挑选一个节点访问,使用了yield+自旋的方式做重试,还采用了严格的防御性编程。 RoundRobinRule该策略会从一个节点一步一步地向后选取节点,如下图所示: CAS+自旋锁这套组合技是高并发下最廉价的线程安全手段,因为这套操作不需要锁定系统资源。但缺点是,自旋锁如果迟迟不能释放,将会带来CPU资源的浪费,因为自旋本身并不会执行任何业务逻辑,而是单纯的使CPU空转。所以通常情况下会对自旋锁的旋转次数做一个限制,比如JDK中 while (true) { // cas操作 if (cas(expected, update)) { // 业务逻辑代码 // break或退出return }}
RetryRuleRetryRule是一个类似装饰器模式的规则,装饰器相当于一层套一层的套娃,每一层都会加上一层独特的功能。 经典的装饰器模式示意图: WeightedResponseTimeRule这个规则继承自
BestAvailableRule在过滤掉故障服务以后,它会基于过去30分钟的统计结果选取当前并发量最小的服务节点作为目标地址。如果统计结果尚未生成,则采用轮询的方式选定节点。 AvailabilityFilteringRule这个规则底层依赖 每次
如果被选中的 ZoneAvoidanceRule这个过滤器包含了组合过滤条件,分别是Zone级别和可用性级别。
Ribbon 负载均衡策略源码RandomRule源码先从 public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { if (Thread.interrupted()) { return null; } List<Server> upList = lb.getReachableServers(); List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { /* * No servers. End regardless of pass, because subsequent passes * only get more restrictive. */ return null; } int index = chooseRandomInt(serverCount); server = upList.get(index); if (server == null) { Thread.yield(); continue; } if (server.isAlive()) { return (server); } server = null; Thread.yield(); } return server;} 在
if (Thread.interrupted()) { return null;} 如果线程暂停了,则直接返回空(防御性编程) List<Server> upList = lb.getReachableServers();List<Server> allList = lb.getAllServers();
int serverCount = allList.size();if (serverCount == 0) { return null;} 服务中心上没有 int index = chooseRandomInt(serverCount);server = upList.get(index); 随机选择一个server 其中,chooseRandomInt的逻辑如下: protected int chooseRandomInt(int serverCount) { return ThreadLocalRandom.current().nextInt(serverCount);} 返回0到
回到 如果发现随机选择的 if (server == null) { Thread.yield(); continue;} if (server.isAlive()) { return (server);} 如果 server = null;Thread.yield(); 如果不可用则 所以该方法每次进入下一次循环时都会让出线程。 RoundRobinRule源码接下来看 public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } Server server = null; int count = 0; while (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers(); List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { return (server); } server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server;}
List<Server> reachableServers = lb.getReachableServers();List<Server> allServers = lb.getAllServers();int upCount = reachableServers.size();int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null;} 没有可用服务器则返回空 int nextServerIndex = incrementAndGetModulo(serverCount);server = allServers.get(nextServerIndex); 选择哪个下标的 private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextServerCyclicCounter.get(); int next = (current + 1) % modulo; if (nextServerCyclicCounter.compareAndSet(current, next)) return next; }} 使用了自旋锁, if (server == null) { Thread.yield(); continue;} 如果获取到的 if (server.isAlive() && (server.isReadyToServe())) { return (server);}
server = null; 最后没有让出线程资源,因为重试10次后就退出循环了 BestAvailableRule源码接下来看 @Overridepublic Server choose(Object key) { if (loadBalancerStats == null) { return super.choose(key); } List<Server> serverList = getLoadBalancer().getAllServers(); int minimalConcurrentConnections = Integer.MAX_VALUE; long currentTime = System.currentTimeMillis(); Server chosen = null; for (Server server: serverList) { ServerStats serverStats = loadBalancerStats.getSingleServerStat(server); if (!serverStats.isCircuitBreakerTripped(currentTime)) { int concurrentConnections = serverStats.getActiveRequestsCount(currentTime); if (concurrentConnections < minimalConcurrentConnections) { minimalConcurrentConnections = concurrentConnections; chosen = server; } } } if (chosen == null) { return super.choose(key); } else { return chosen; }} if (loadBalancerStats == null) { return super.choose(key);} 如果
ServerStats serverStats = loadBalancerStats.getSingleServerStat(server); public ServerStats getSingleServerStat(Server server) { return getServerStats(server);} protected ServerStats getServerStats(Server server) { try { return serverStatsCache.get(server); } catch (ExecutionException e) { ServerStats stats = createServerStats(server); serverStatsCache.asMap().putIfAbsent(server, stats); return serverStatsCache.asMap().get(server); }} 这里是从缓存中获取 随后判断是否处于熔断状态 if (!serverStats.isCircuitBreakerTripped(currentTime)) {...} public boolean isCircuitBreakerTripped(long currentTime) { long circuitBreakerTimeout = getCircuitBreakerTimeout(); if (circuitBreakerTimeout <= 0) { return false; } return circuitBreakerTimeout > currentTime;} 首先获得熔断的 熔断的 private long getCircuitBreakerTimeout() { long blackOutPeriod = getCircuitBreakerBlackoutPeriod(); if (blackOutPeriod <= 0) { return 0; } return lastConnectionFailedTimestamp + blackOutPeriod;} 返回上一次连接失败的时间戳 + blackOutPeriod 其中又调用了 private long getCircuitBreakerBlackoutPeriod() { int failureCount = successiveConnectionFailureCount.get(); int threshold = connectionFailureThreshold.get(); if (failureCount < threshold) { return 0; } int diff = (failureCount - threshold) > 16 ? 16 : (failureCount - threshold); int blackOutSeconds = (1 << diff) * circuitTrippedTimeoutFactor.get(); if (blackOutSeconds > maxCircuitTrippedTimeout.get()) { blackOutSeconds = maxCircuitTrippedTimeout.get(); } return blackOutSeconds * 1000L;}
回到 if (concurrentConnections < minimalConcurrentConnections) { minimalConcurrentConnections = concurrentConnections; chosen = server;} 选出连接数最小的服务器 if (chosen == null) { return super.choose(key);} else { return chosen;} 最后返回 核心是找到一个最轻松的服务器。 RetryRule源码查看 public Server choose(ILoadBalancer lb, Object key) { long requestTime = System.currentTimeMillis(); long deadline = requestTime + maxRetryMillis; Server answer = null; answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis()); while (!Thread.interrupted()) { answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { /* pause and retry hoping it's transient */ Thread.yield(); } else { break; } } task.cancel(); } if ((answer == null) || (!answer.isAlive())) { return null; } else { return answer; }} long requestTime = System.currentTimeMillis();long deadline = requestTime + maxRetryMillis; 先记录当前时间和 answer = subRule.choose(key); 方法里面是由 如果选到的是空或者选到的不是up的,且时间在ddl之前则进入重试逻辑: while (!Thread.interrupted()) { answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { /* pause and retry hoping it's transient */ Thread.yield(); } else { break; }} 如果线程中断了就中断重试。之后重新选择服务器,如果又没选到则把资源让出去,下一次 InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis()); 到了截止时间之后,程序会中断重试的流程 task.cancel(); 最后返回 if ((answer == null) || (!answer.isAlive())) { return null;} else { return answer;} |
|