本文主要基于 Eureka 1.8.X 版本 1. 概述 2. Eureka-Client 发起下线 3. Eureka-Server 接收下线 3.1 接收下线请求 3.2 下线应用实例信息 666. 彩蛋
1. 概述本文主要分享 Eureka-Client 向 Eureka-Server 下线应用实例的过程。 FROM 《深度剖析服务发现组件Netflix Eureka》 二次编辑
推荐 Spring Cloud 书籍: 请支持正版。下载盗版,等于主动编写低级 BUG 。 程序猿DD —— 《Spring Cloud微服务实战》 周立 —— 《Spring Cloud与Docker微服务架构实战》
推荐 Spring Cloud 视频: 2. Eureka-Client 发起下线应用实例关闭时,Eureka-Client 向 Eureka-Server 发起下线应用实例。需要满足如下条件才可发起: 实现代码如下: // DiscoveryClient.java public synchronized void shutdown() {
// ... 省略无关代码
// If APPINFO was registered if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() // eureka.registration.enabled = true && clientConfig.shouldUnregisterOnShutdown()) { // eureka.shouldUnregisterOnShutdown = true applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN); unregister(); } }
调用 ApplicationInfoManager#setInstanceStatus(...) 方法,设置应用实例为关闭( DOWN )。 调用 #unregister() 方法,实现代码如下: // DiscoveryClient.java void unregister() { // It can be null if shouldRegisterWithEureka == false if(eurekaTransport != null && eurekaTransport.registrationClient != null) { try { logger.info('Unregistering ...'); EurekaHttpResponse httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId()); logger.info(PREFIX + appPathIdentifier + ' - deregister status: ' + httpResponse.getStatusCode()); } catch (Exception e) { logger.error(PREFIX + appPathIdentifier + ' - de-registration failed' + e.getMessage(), e); } } }
// AbstractJerseyEurekaHttpClient.java @Override public EurekaHttpResponse cancel(String appName, String id) { String urlPath = 'apps/' + appName + '/' + id; ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder.delete(ClientResponse.class); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug('Jersey HTTP DELETE {}/{}; statusCode={}', serviceUrl, urlPath, response == null ? 'N/A' : response.getStatus()); } if (response != null) { response.close(); } } }
3. Eureka-Server 接收下线3.1 接收下线请求com.netflix.eureka.resources.InstanceResource ,处理单个应用实例信息的请求操作的 Resource ( Controller )。
下线应用实例信息的请求,映射 InstanceResource#cancelLease() 方法,实现代码如下: @DELETE public Response cancelLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { // 下线 boolean isSuccess = registry.cancel(app.getName(), id, 'true'.equals(isReplication));
if (isSuccess) { // 下线成功 logger.debug('Found (Cancel): ' + app.getName() + ' - ' + id); return Response.ok().build(); } else { // 下线成功 logger.info('Not Found (Cancel): ' + app.getName() + ' - ' + id); return Response.status(Status.NOT_FOUND).build(); } }
调用 PeerAwareInstanceRegistryImpl#cancel(...) 方法,下线应用实例。实现代码如下: 1: @Override 2: public boolean cancel(final String appName, final String id, 3: final boolean isReplication) { 4: if (super.cancel(appName, id, isReplication)) { // 下线 5: // Eureka-Server 复制 6: replicateToPeers(Action.Cancel, appName, id, null, null, isReplication); 7: // 减少 `numberOfRenewsPerMinThreshold` 、`expectedNumberOfRenewsPerMin` 8: synchronized (lock) { 9: if (this.expectedNumberOfRenewsPerMin > 0) { 10: // Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute) 11: this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2; 12: this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); 13: } 14: } 15: return true; 16: } 17: return false; 18: }
第 4 行 :调用父类 AbstractInstanceRegistry#cancel(...) 方法,下线应用实例信息。 第 6 行 :Eureka-Server 复制下线操作,在 《Eureka 源码解析 —— Eureka-Server 集群同步》 有详细解析。 第 7 至 14 行 :减少 numberOfRenewsPerMinThreshold 、expectedNumberOfRenewsPerMin ,自我保护机制相关,在 《Eureka 源码解析 —— 应用实例注册发现(四)之自我保护机制》 有详细解析。
3.2 下线应用实例信息调用 AbstractInstanceRegistry#cancel(...) 方法,下线应用实例信息,实现代码如下: 1: @Override 2: public boolean cancel(String appName, String id, boolean isReplication) { 3: return internalCancel(appName, id, isReplication); 4: } 5: 6: protected boolean internalCancel(String appName, String id, boolean isReplication) { 7: try { 8: // 获得读锁 9: read.lock(); 10: // 增加 取消注册次数 到 监控 11: CANCEL.increment(isReplication); 12: // 移除 租约映射 13: Map> gMap = registry.get(appName); 14: Lease leaseToCancel = null; 15: if (gMap != null) { 16: leaseToCancel = gMap.remove(id); 17: } 18: // 添加到 最近取消注册的调试队列 19: synchronized (recentCanceledQueue) { 20: recentCanceledQueue.add(new Pair(System.currentTimeMillis(), appName + '(' + id + ')')); 21: } 22: // 移除 应用实例覆盖状态映射 23: InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id); 24: if (instanceStatus != null) { 25: logger.debug('Removed instance id {} from the overridden map which has value {}', id, instanceStatus.name()); 26: } 27: // 租约不存在 28: if (leaseToCancel == null) { 29: CANCEL_NOT_FOUND.increment(isReplication); // 添加 取消注册不存在 到 监控 30: logger.warn('DS: Registry: cancel failed because Lease is not registered for: {}/{}', appName, id); 31: return false; // 失败 32: } else { 33: // 设置 租约的取消注册时间戳 34: leaseToCancel.cancel(); 35: // 添加到 最近租约变更记录队列 36: InstanceInfo instanceInfo = leaseToCancel.getHolder(); 37: String vip = null; 38: String svip = null; 39: if (instanceInfo != null) { 40: instanceInfo.setActionType(ActionType.DELETED); 41: recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel)); 42: instanceInfo.setLastUpdatedTimestamp(); 43: vip = instanceInfo.getVIPAddress(); 44: svip = instanceInfo.getSecureVipAddress(); 45: } 46: // 设置 响应缓存 过期 47: invalidateCache(appName, vip, svip); 48: logger.info('Cancelled instance {}/{} (replication={})', appName, id, isReplication); 49: return true; // 成功 50: } 51: } finally { 52: // 释放锁 53: read.unlock(); 54: } 55: }
第 9 行 :获取读锁。在 《Eureka源码解析 —— 应用实例注册发现 (九)之岁月是把萌萌的读写锁》 详细解析。 第 10 至 11 行 :增加下线次数到监控。配合 Netflix Servo 实现监控信息采集。 第 12 至 17 行 :移除租约映射( registry )。 第 18 至 21 行 :添加到最近下线的调试队列( recentCanceledQueue ),用于 Eureka-Server 运维界面的显示,无实际业务逻辑使用。实现代码如下: /** * 最近取消注册的调试队列 * key :添加时的时间戳 * value :字符串 = 应用名(应用实例信息编号) */ private final CircularQueue<>> recentCanceledQueue;
第 22 至 26 行 :移除应用实例覆盖状态映射。在《应用实例注册发现 (八)之覆盖状态》详细解析。 第 27 至 31 行 :租约不存在,返回下线失败( false )。 第 34 行 :调用 Lease#cancel() 方法,取消租约。实现代码如下: // Lease.java public void cancel() { if (evictionTimestamp <=>=>0) { evictionTimestamp = System.currentTimeMillis(); } }
第 35 至 45 行 :设置应用实例信息的操作类型为添加,并添加到最近租约变更记录队列( recentlyChangedQueue )。recentlyChangedQueue 用于注册信息的增量获取,在《应用实例注册发现 (七)之增量获取》详细解析。实现代码如下: /** * 最近租约变更记录队列 */ private ConcurrentLinkedQueue recentlyChangedQueue = new ConcurrentLinkedQueue();
第 47 行 :设置响应缓存( ResponseCache )过期,在《Eureka 源码解析 —— 应用实例注册发现 (六)之全量获取》详细解析。 第 49 行 :返回下线失败( false )。 第 53 行 :释放锁。
666. 彩蛋水更一篇,下一篇租约过期!走起。 胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?
|