分享

sprincloud源码之eureka服务注册【图文】

 vnxy001 2022-02-09

sprincloud源码之eureka服务注册

前言

eureka是netflix公司基于jersey框架写的一个注册中心,提供了服务注册,服务下架,服务续约,集群同步等功能。
jersey是一个类似于springmvc的框架,只不过mvc是基于servlet的,jersey是基于filter的,二者在使用上也很类似,mvc发请求被servlet拦截到反射调用controller,而jersey是被filter拦截到调用resource, 二者的原理基本一致。

sprincloud整合eureka

@EnableEurekaServer

@EnableEurekaServer----->>

@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}
----->>>
@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {
	@Bean
	public Marker eurekaServerMarkerBean() {
		return new Marker();
	}
	class Marker {
	}
}

上面代码的意思就是加了@EnableEurekaServer就会往spring容器注入一个Marker对象

EurekaServerAutoConfigurationsprincloud源码之eureka服务注册_服务注册

spring.factories里的内容如下

org.springframework.boot.autoconfigure.EnableAutoConfiguration=  org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

这是springboot自动配置的原理,springboot在初始化的时候会把spring.factories定义的类也初始化,可认为是spi

@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
public class EurekaServerAutoConfiguration
这里我把其他注解省略了,可以看到EurekaServerAutoConfiguration初始化的前提是spring容器得有Marker类,
也就是说加上@EnableEurekaServer就是eureka服务注册中心

jersey过滤器的初始化
springmvc自动配置的时候是在DispatcherServletAutoConfiguration初始化DispatcherServlet,
现在也是如此,在EurekaServerAutoConfiguration初始化一个filter,代码如下

@Bean
	public FilterRegistrationBean<?> jerseyFilterRegistration(
			javax.ws.rs.core.Application eurekaJerseyApp) {
		FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
		bean.setFilter(new ServletContainer(eurekaJerseyApp));
		bean.setOrder(Ordered.LOWEST_PRECEDENCE);
		bean.setUrlPatterns(
				Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
		return bean;
	}

filter初始化后就可以拦截eureka-client的请求转发给eureka-server处理了

eureka服务注册

源码在ApplicationResource的addInstance方法
sprincloud源码之eureka服务注册_eureka源码_02

ApplicationResource.addInstance()---->>
this.registry.register()---->>
InstanceRegistry.register(){
	//springcloud发布一个事件EurekaInstanceRegisteredEvent
	handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
	//调用父类的register
	super.register(info, isReplication);
}

我们先暂停一下如上代码讲解,先看看InstanceRegistry的继承关系图:
sprincloud源码之eureka服务注册_服务注册_03

InstanceRegistry是springcloud对于eureka的一个扩展,此类的唯一作用就是发布事件
PeerAwareInstanceRegistrImpl是专门做集群同步的
AbstractInstanceRegistry才是做服务注册的
这里采取了一种责任链模式,先发布事件,在服务注册最后集群同步,每一个类只做一件事

下面我们接着代码说:
InstanceRegistry.register 发布事件

InstanceRegistry.register(){
	//springcloud发布一个事件EurekaInstanceRegisteredEvent
	handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
	//调用父类的register
	super.register(info, isReplication);
}

PeerAwareInstanceRegistrImpl.register

public void register(InstanceInfo info, boolean isReplication) {
		//leaseExpirationDurationInSeconds: 30 #Eureka服务器在接收到实例的最后一次发出的心跳后,需要等待多久才可以将此实例删除,默认为90秒
        int leaseDuration = 90;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
		//服务注册
        super.register(info, leaseDuration, isReplication);
        //集群同步
        this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication);
    }

AbstractInstanceRegistry.register 服务注册
在看服务注册的代码前我们先看this.registry和租债器Lease
this.registry

ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap();

registry的数据结构如上等价于<applicationName, Map<instanceId, Lease>>

sprincloud源码之eureka服务注册_spring cloud_04
如图所示有三个order微服务,applicationName配置的都是order,说明三个微服务是属于同一个微服务组,而三个微服务的id各不相同,代表这是同一个order组的三个微服务,这样user微服务来调用order模块就知道如何负载均衡了。
租债器Lease
注释的代码是我修正netflix的代码

public class Lease<T> {
    //Eureka服务器在接收到实例的最后一次发出的心跳后,需要等待多久才可以将此实例删除,默认为90秒
    public static final int DEFAULT_DURATION_IN_SECS = 90;
    private T holder;//微服务对象
    private long evictionTimestamp;//服务剔除时间戳
    private long registrationTimestamp;//服务注册时间戳
    private long serviceUpTimestamp;//
    private volatile long lastUpdateTimestamp;//最后一次更新的时间,注册下架续约都是更新操作
    private long duration;//duration ms后没有心跳剔除服务
    //private long expireTimestamp;//过期时间
    /**
     * @param r 微服务实例
     * @param durationInSecs Eureka服务器在接收到实例的最后一次发出的心跳后,需要等待多久才可以将此实例删除
     *  eureka.instance.leaseExpirationDurationInSeconds=30s,后一次发出的心跳后,30s后还没有新的心跳剔除服务
     *  eureka.instance.leaseRenewalIntervalInSeconds=10s心跳间隔时间
     */
    public Lease(T r, int durationInSecs) {
        this.holder = r;
        this.registrationTimestamp = System.currentTimeMillis();//注册时间
        this.lastUpdateTimestamp = this.registrationTimestamp;//更新时间=注册时间
        this.duration = (long)(durationInSecs * 1000);//duration ms后没有心跳剔除服务
    }

    //续期操作
    public void renew() {
        this.lastUpdateTimestamp = System.currentTimeMillis() + this.duration;
//        this.lastUpdateTimestamp = System.currentTimeMillis();
//        this.expireTimestamp = System.currentTimeMillis() + this.duration;
    }
    //服务下架/剔除操作
    public void cancel() {
        if (this.evictionTimestamp <= 0L) {
            this.evictionTimestamp = System.currentTimeMillis();
        }
    }

    public void serviceUp() {
        if (this.serviceUpTimestamp == 0L) {
            this.serviceUpTimestamp = System.currentTimeMillis();
        }
    }

    public boolean isExpired() {
        return this.isExpired(0L);
    }

    public boolean isExpired(long additionalLeaseMs) {
        return this.evictionTimestamp > 0L || System.currentTimeMillis() > this.lastUpdateTimestamp + this.duration + additionalLeaseMs;
    }
//    public boolean isExpired(long additionalLeaseMs) {
//        return this.evictionTimestamp > 0L || System.currentTimeMillis() > this.expireTimestamp + additionalLeaseMs;
//    }

    public static void main(String[] args) throws InterruptedException {
        Lease lease = new Lease(new User(1,"lry"),5);
        while(true){
            System.out.println(lease.isExpired());
            Thread.sleep(1000);
        }
    }

    static class User{
        private int id;
        private String name;
        public User(int id,String name){
            this.id = id;
            this.name = name;
        }
    }
}

上面的代码netflix公司表示有bug,具体是如下两个方法

	//续期
	//把最后更新时间改为当前时间+duration
	public void renew() {
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
    }
    //对象是否过期
    //System.currentTimeMillis() > lastUpdateTimestamp + duration 
    //判断是否过期的时候又加了一个duration 相当于是2*duration 后才算是过期
    public boolean isExpired(long additionalLeaseMs) {
        return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
    }

bug描述

   Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration
   more than what it should be, the expiry will actually be 2 * duration. This is a minor bug and
   should only affect instances that ungracefully shutdown. Due to possible wide ranging impact 
   to existing usage, this will not be fixed.
   大致意思就是renew多加了一个duration导致判断isExpire实际是2*duration

服务注册代码

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
		//Lease可以理解成一个微服务实例,它内部封装一个微服务和各种时间来实现对对象的过期与否控制
		//可以理解lease==InstanceInfo
		 Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());
		 //registrant.getAppName()微服务组内没有微服务
		 if (gMap == null) {
                ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap();
                gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
           }
           //尝试通过id拿到一个微服务实例,一般情况下都拿不到,除非以下两种情况
           //情况一:有两台application name 和instance id都一样微服务
           //情况二:断点调试,这种情况冲突是因为客户端注册有超时重试机制
           Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
           //如果拿到了
            if (existingLease != null && (existingLease.getHolder() != null)) {
            	//已经存在的微服务实例最后修改时间
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                //要注册的微服务实例最后修改时间
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                //如果已存的微服务时间>要注册的(时间越大说明操作越新),用已存的覆盖要注册的
                //即如果出现冲突的话拿最新的微服务实例
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    registrant = existingLease.getHolder();
                }
            } 
            //没有拿到,一般都是进这里
            else {
                synchronized (lock) {
                	//期待发送心跳的客户端数量
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                    	//要注册进来了,期待值+1
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        //更新客户端每分钟发送心跳数的阈值,这个方法较重要
                        updateRenewsPerMinThreshold();
                    }
                }
            }
            //不管如何都new一个Lease
            Lease<InstanceInfo> lease = new Lease(registrant, leaseDuration);
            //如果if(gMap == null)都没有进,说明微服务组内已经有微服务了,直接put(id,instance)即可
            ((Map)gMap).put(registrant.getId(), lease);
            //最近注册队列添加此微服务
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
            //标记微服务实例ADDED
            registrant.setActionType(ActionType.ADDED);
            //最近改变队列添加此微服务,此队列会保存近三分钟有改动的微服务,用于增量更新
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            //设置最后更新的时间戳
            registrant.setLastUpdatedTimestamp();
  }

updateRenewsPerMinThreshold()

	protected void updateRenewsPerMinThreshold() {
        this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
                * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
                * serverConfig.getRenewalPercentThreshold());
    }

expectedNumberOfClientsSendingRenews:期待发送心跳的客户端数量
ExpectedClientRenewalIntervalSeconds:期待客户端发送心跳的间隔秒数
RenewalPercentThreshold:续期的百分比阈值85%
numberOfRenewsPerMinThreshold:客户端每分钟发送心跳数的阈值,如果server在一分钟内没有收到这么多的心跳数就会触发自我保护机制,以后博客应该会说到

举个例子就明白了:
假设有10个客户端,发送心跳间隔为30s,那么一分钟如果全部正常的话server收到的心跳应该是20次,
如果server一分钟收到的心跳<20*85%,即17个触发自我保护机制

PeerAwareInstanceRegistrImpl.replicateToPeers 集群同步

private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info ,
                                  InstanceStatus newStatus, boolean isReplication) {
              //没有同伴,或者是isReplication=true则不要集群同步
              //isReplication=true是这种情况,比如有两台eureka-server server1和server2组成了server集群
              //此时有一个eureka-client user要注册到集群中,首先user注册到server1上(isReplication=false),
              //server1会把user注册进registry这个map中然后集群同步将user注册给server2,此时
              //isReplication就是true,这样server2注册好user后发现isReplication是true就不会再做集群同步,
              //即不会给server1发送注册user的指令,避免了无限注册
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }
            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            	//跳过自己
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
    }
private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
			
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
    }

sprincloud源码之eureka服务注册_集群同步_05
从集群同步的代码大致可以推断出流程如上图
集群中有三个server,当user模块注册到集群中的时候会先注册到一台server上,然后该server会遍历其他集群节点把user注册上去

总结

1:@EnableEurekaServer注入一个Marker类,说明是一个注册中心
2:EurekaServerAutoConfiguration注入一个filter,来拦截jersey请求转发给resource
3:服务注册,就是把信息存到一个ConcurrentHashMap<name,Map<id,Lease>>
4:对于注册冲突拿最新的微服务实例
5:server每分钟内收到的心跳数低于理应收到的85%就会触发自我保护机制
6:Lease的renew bug, duration多加了一次,理应加一个expireTime表示过期时间
7:集群同步:先注册到一台server,然后遍历其他的集群的其他server节点调用register注册到其他server,
isReplication=true代表此次注册来源于集群同步的注册,代表此次注册不要再进行集群同步,避免无限注册

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多