🍊 Java学习:社区快速通道
🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想
🍊 绝对不一样的职场干货:大厂最佳实践经验指南 📆 最近更新:2023年5月2日
🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!
引言
服务注册是为了解决各个微服务的“你是谁” 这个问题,即获取所有服务节点的身份信息和服务名称,站在注册中心的角度来看,有以下两种比较直观的解决方案:
由注册中心主动访问网络节点中所有机器 注册中心等待服务节点主动进行注册
目前主流的注册中心(Nacos、Eureka)都选择了第二种方案,主要原因是第一种方案有很多弊端:
模型复杂: 网络结点构成了一张复杂的网,结点与结点之间的关系错综复杂,轮询每个节点的做法通常是注册中心发局域网广播,客户端响应的方式。现实中对于跨局域网的分布式系统来说,响应模型会更加复杂。
网络开销大: 整个网络环境里会掺杂大量非服务节点,这些节点无需对送达的广播请求做出响应,这种广播的模式无疑增加了网络通信成本。
服务端压力增加: 不仅要求注册中心向网络中所有节点主动发送广播请求,还需要对客户端的应答做出响应。考虑到注册中心的节点数远远少于服务节点,所以要尽可能地减轻服务中心承载的业务。
一一对照着看,第二种实现方案就有如下优点:
注册中心压力小: 网络中其它非服务节点不会产生任何无效请求,也就不用做额外的判断效率高: 省去了广播环节的时间,使注册效率大大提高节省成本: 节省了大量网络请求的开销
下面就来探索一下经典注册中心微服务 Eureka
服务注册源码。
Eureka 服务注册源码
寻找配置类
要使用Eureka,就需要在SpringBoot的启动类上添加 @EnableDiscoveryClient
注解,所以我们的源码解析,从启动类上的 @EnableDiscoveryClient
注解开始:
在Eureka已经启动的状态下,以debug模式启动EurekaClientApplication
,会来到这里面的断点:
其中metadata
是main
函数里挂的注解:
attributes
是会获得@EnableDiscoveryClient
的EnableDiscoveryClient
注解,接下来读取注解里面的autoRegister
属性,如果是true的话,会发现之后导入了一个配置类:
寻找服务注册的元数据
进入到该配置类:
继续进入到AutoServiceRegistrationProperties
类里: 这些个属性一定会在某些配置项加载的流程中应用到,大家尝试找一下哪些类会引用它。
其中你会找到AbstractAutoServiceRegistration
,发现其在初始化的流程里使用到:
protected AbstractAutoServiceRegistration ( ServiceRegistry < R > serviceRegistry, AutoServiceRegistrationProperties properties) {
this . serviceRegistry = serviceRegistry;
this . properties = properties;
}
同时还发现了一个服务注册属性serviceRegistry
:
private final ServiceRegistry < R > serviceRegistry;
进入到ServiceRegistry
的实现类EurekaServiceRegistry
里
进入到第一行的方法maybeInitializeClient
里:
private void maybeInitializeClient ( EurekaRegistration reg) {
reg. getApplicationInfoManager ( ) . getInfo ( ) ;
reg. getEurekaClient ( ) . getApplications ( ) ;
}
继续进入到getInfo
里
发现这里面的信息其实就是我们要向服务中心注册的东西。
register方法
接下来继续执行EurekaServiceRegistry
的register
方法:
reg. getApplicationInfoManager ( ) . setInstanceStatus ( reg. getInstanceConfig ( ) . getInitialStatus ( ) ) ;
首先设置了instance
的状态,这里reg.getInstanceConfig().getInitialStatus()
是UP
这里的register
并没有发起服务调用请求,所以还要通过调用栈来继续寻找。来到上一层EurekaAutoServiceRegistration
的start
方法里:
停留在的这一行往上下文中发布了一个事件InstanceRegisteredEvent
,但此时我们会发现服务其实并没有注册 说明在event
发布前后肯定发生了什么事,让eureka
服务提供者向注册中心发送了请求,既然event
发布之后running
的状态变为了true
,那确实是运行起来了。
下一个流程
下一个流程在DiscoveryClient
里,它封装了我们服务的client
和注册中心之间的各种交互,里面有一个register
方法
跟着断点继续往下走: 发现这里用的是SessionedEurekaHttpClient
,接下来去找它的源码:
register
方法在其父类EurekaHttpClientDecorator
中
通过一个子类实现的execute
方法,参数是由父类传入的一个代理delegate
,execute
是由SessionedEurekaHttpClient
子类实现的
protected < R > EurekaHttpResponse < R > execute ( RequestExecutor < R > requestExecutor) {
long now = System . currentTimeMillis ( ) ;
long delay = now - this . lastReconnectTimeStamp;
if ( delay >= this . currentSessionDurationMs) {
logger. debug ( "Ending a session and starting anew" ) ;
this . lastReconnectTimeStamp = now;
this . currentSessionDurationMs = this . randomizeSessionDuration ( this . sessionDurationMs) ;
TransportUtils . shutdown ( ( EurekaHttpClient ) this . eurekaHttpClientRef. getAndSet ( ( Object ) null ) ) ;
}
EurekaHttpClient eurekaHttpClient = ( EurekaHttpClient ) this . eurekaHttpClientRef. get ( ) ;
if ( eurekaHttpClient == null ) {
eurekaHttpClient = TransportUtils . getOrSetAnotherClient ( this . eurekaHttpClientRef, this . clientFactory. newClient ( ) ) ;
}
return requestExecutor. execute ( eurekaHttpClient) ;
}
这一段代码尝试从HttpClient
里拿实例,如果实例为空则会调用一个工具类的方法getOrSetAnotherClient
去获取一个新的实例,但这里我们会发现其实并不为空:
这里调用了另一个httpclient
。其中SessionedEurekaHttpClient
用到了装饰器模式,主要装饰的功能是delay
时间过长时重新启动一个session
进入到下一层RetryableEurekaHttpClient
,这一层装饰的功能是可以重试,默认最大重试次数为3:
protected < R > EurekaHttpResponse < R > execute ( RequestExecutor < R > requestExecutor) {
List < EurekaEndpoint > candidateHosts = null ;
int endpointIdx = 0 ;
for ( int retry = 0 ; retry < this . numberOfRetries; ++ retry) {
EurekaHttpClient currentHttpClient = ( EurekaHttpClient ) this . delegate. get ( ) ;
EurekaEndpoint currentEndpoint = null ;
if ( currentHttpClient == null ) {
if ( candidateHosts == null ) {
candidateHosts = this . getHostCandidates ( ) ;
if ( candidateHosts. isEmpty ( ) ) {
throw new TransportException ( "There is no known eureka server; cluster server list is empty" ) ;
}
}
if ( endpointIdx >= candidateHosts. size ( ) ) {
throw new TransportException ( "Cannot execute request on any known server" ) ;
}
currentEndpoint = ( EurekaEndpoint ) candidateHosts. get ( endpointIdx++ ) ;
currentHttpClient = this . clientFactory. newClient ( currentEndpoint) ;
}
try {
EurekaHttpResponse < R > response = requestExecutor. execute ( currentHttpClient) ;
if ( this . serverStatusEvaluator. accept ( response. getStatusCode ( ) , requestExecutor. getRequestType ( ) ) ) {
this . delegate. set ( currentHttpClient) ;
if ( retry > 0 ) {
logger. info ( "Request execution succeeded on retry #{}" , retry) ;
}
return response;
}
logger. warn ( "Request execution failure with status code {}; retrying on another server if available" , response. getStatusCode ( ) ) ;
} catch ( Exception var8) {
logger. warn ( "Request execution failed with message: {}" , var8. getMessage ( ) ) ;
}
this . delegate. compareAndSet ( currentHttpClient, ( Object ) null ) ;
if ( currentEndpoint != null ) {
this . quarantineSet. add ( currentEndpoint) ;
}
}
throw new TransportException ( "Retry limit reached; giving up on completing the request" ) ;
}
其中this.getHostCandidates();
获取的是注册中心:
private List < EurekaEndpoint > getHostCandidates ( ) {
List < EurekaEndpoint > candidateHosts = this . clusterResolver. getClusterEndpoints ( ) ;
this . quarantineSet. retainAll ( ( Collection ) candidateHosts) ;
int threshold = ( int ) ( ( double ) ( ( List ) candidateHosts) . size ( ) * this . transportConfig. getRetryableClientQuarantineRefreshPercentage ( ) ) ;
if ( threshold > ( ( List ) candidateHosts) . size ( ) ) {
threshold = ( ( List ) candidateHosts) . size ( ) ;
}
if ( ! this . quarantineSet. isEmpty ( ) ) {
if ( this . quarantineSet. size ( ) >= threshold) {
logger. debug ( "Clearing quarantined list of size {}" , this . quarantineSet. size ( ) ) ;
this . quarantineSet. clear ( ) ;
} else {
List < EurekaEndpoint > remainingHosts = new ArrayList ( ( ( List ) candidateHosts) . size ( ) ) ;
Iterator var4 = ( ( List ) candidateHosts) . iterator ( ) ;
while ( var4. hasNext ( ) ) {
EurekaEndpoint endpoint = ( EurekaEndpoint ) var4. next ( ) ;
if ( ! this . quarantineSet. contains ( endpoint) ) {
remainingHosts. add ( endpoint) ;
}
}
candidateHosts = remainingHosts;
}
}
return ( List ) candidateHosts;
}
如果坏注册中心节点的数量超过了阈值(66%),则要重启。quarantineSet
存储的是失败的注册中心,remainingHosts
存储的是成功的注册中心。
继续execute
回到上面的execute
方法里,如果重试的索引大于候选注册中心的size时,就表示已知的所有注册中心都不能处理注册请求,此时会抛一个异常出来:
if ( endpointIdx >= candidateHosts. size ( ) ) {
throw new TransportException ( "Cannot execute request on any known server" ) ;
}
currentEndpoint = ( EurekaEndpoint ) candidateHosts. get ( endpointIdx++ ) ;
currentHttpClient = this . clientFactory. newClient ( currentEndpoint) ;
如果在某一层execute
成功了,则会将deligate
设置为当前的client
,如果不成功则会通过CAS操作将currentHttpClient
设置为空,然后放置到失效的EurekaEndpoint
加入到quarantineSet
,下次不用了 此时还有好多层装饰器,这里直接快进跳到最后一层AbstractJerseyEurekaHttpClient
中的register
方法:
public EurekaHttpResponse < Void > register ( InstanceInfo info) {
String urlPath = "apps/" + info. getAppName ( ) ;
ClientResponse response = null ;
EurekaHttpResponse var5;
try {
Builder resourceBuilder = this . jerseyClient. resource ( this . serviceUrl) . path ( urlPath) . getRequestBuilder ( ) ;
this . addExtraHeaders ( resourceBuilder) ;
response = ( ClientResponse ) ( ( Builder ) ( ( Builder ) ( ( Builder ) resourceBuilder. header ( "Accept-Encoding" , "gzip" ) ) . type ( MediaType . APPLICATION_JSON_TYPE) ) . accept ( new String [ ] { "application/json" } ) ) . post ( ClientResponse . class , info) ;
var5 = EurekaHttpResponse . anEurekaHttpResponse ( response. getStatus ( ) ) . headers ( headersOf ( response) ) . build ( ) ;
} finally {
if ( logger. isDebugEnabled ( ) ) {
logger. debug ( "Jersey HTTP POST {}/{} with instance {}; statusCode={}" , new Object [ ] { this . serviceUrl, urlPath, info. getId ( ) , response == null ? "N/A" : response. getStatus ( ) } ) ;
}
if ( response != null ) {
response. close ( ) ;
}
}
return var5;
}
在这里发送了http请求,info里面存的是当前服务的所有信息
这一步结束之后就注册成功了