springcloud源码之eureka-client服务发现前言请先看服务注册,这篇博文把代码入口说清楚了,服务发现和心跳的代码在一个地方 服务发现private int registryFetchIntervalSeconds = 30; class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } } refreshRegistry关键代码如下 //fetchRegistry其实在eureka-client初始化的时候拿过一次,上篇说了 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } fetchRegistry关键代码如下 //forceFullRegistryFetch=true 代表全量拉取 private boolean fetchRegistry(boolean forceFullRegistryFetch) { //从缓存中拿到已存的微服务信息 Applications applications = getApplications(); //如果配置了不增量拉取||配置了vip地址||applications ==null||没有一个微服务信息||之前没有拉取过 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) { //全量拉取 getAndStoreFullRegistry(); } else { //增量拉取 getAndUpdateDelta(applications); } return true; } 全量拉取 private void getAndStoreFullRegistry() throws Throwable { Applications apps = null; //访问eureka-server的接口ApplicationsResource#getContainers拉取全部的微服务实例 //eureka-sever缓存设计这个博客说了getContainers EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } //更新缓存 localRegionApps.set(this.filterAndShuffle(apps)); } 增量拉取 private void getAndUpdateDelta(Applications applications) throws Throwable { //存储增量微服务 Applications delta = null; //去访问ApplicationsResource#getContainerDifferential接口增量拉取微服务 //这里具体服务端是如何控制哪些微服务是要被增量拉取的待会再说 EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } //没有拿到增量就去再全量拉取一遍 if (delta == null) { getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { //加锁,因为上面有网络请求耗时操作,可能这块代码会有并发 if (fetchRegistryUpdateLock.tryLock()) { try { //去更新本地缓存,根据微服务的上一次操作类型就更新 //MODIFIED和ADDED类型就执行add操作 //DELETED类型就执行remove操作 updateDelta(delta); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } } 服务端是如何判断那些微服务属于增量的 //getContainerDifferential会构造一个ALL_APPS_DELTA类型的key Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS_DELTA, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); responseCache.getGZIP(cacheKey) -----------------> 下面这些代码在缓存设计博客说到了 Value getValue(final Key key, boolean useReadOnlyCache) { Value payload = null; if (useReadOnlyCache) { //先从只读缓存拿 final Value currentPayload = readOnlyCacheMap.get(key); if (currentPayload != null) { //拿到了直接返回 payload = currentPayload; } else { //去读写缓存拿,拿到了回写只读缓存 //readWriteCacheMap是基于guava的cache来的,如果readWriteCacheMap拿不到 //会回调ResponseCacheImpl的构造方法里的generatePayload(key);这行代码 payload = readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); } } else { payload = readWriteCacheMap.get(key); } return payload; } generatePayload(key) //获取增量的逻辑 payload = getPayLoad(key, registry.getApplicationDeltas()); registry.getApplicationDeltas() public Applications getApplicationDeltas() { Applications apps = new Applications(); Map<String, Application> applicationInstancesMap = new HashMap<String, Application>(); //recentlyChangedQueue这个队列里面就是存储增量微服务的,这个队列有个神奇的地方,只会存储最近操作三分钟以内的微服务 Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator(); //遍历增量队列返回增量 while (iter.hasNext()) { } return apps; } 增量队列三分钟的代码 private TimerTask getDeltaRetentionTask() { return new TimerTask() { @Override public void run() { Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator(); while (it.hasNext()) { //getRetentionTimeInMSInDeltaQueue=3min if (it.next().getLastUpdateTime() < System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) { it.remove(); } else { break; } } } }; } 总结1:eureka-client每隔30s去eureka-client拉取一次信息 |
|
来自: vnxy001 > 《022springcloud》