SOFAStack (Scalable Open Financial Architecture Stack)是蚂蚁金服自主研发的金融级分布式架构,包含了构建金融级云原生架构所需的各个组件,是在金融场景里锤炼出来的最佳实践。SOFARegistry 是蚂蚁金服开源的具有承载海量服务注册和订阅能力的、高可用的服务注册中心,在支付宝/蚂蚁金服的业务发展驱动下,近十年间已经演进至第五代。本文为《剖析 | SOFARegistry 框架》第三篇,本篇作者 Yavin,来自考拉海购。《剖析 | SOFARegistry 框架》系列由 SOFA 团队和源码爱好者们出品,项目代号:<SOFARegistry:Lab/>,文末包含往期系列文章。SOFARegistry:https://github.com/sofastack/sofa-registry集群成员管理是分布式系统中绕不开的话题。MetaServer 在 SOFARegistry 中,承担着集群元数据管理的角色,用来维护集群成员列表。本文希望从 MetaServer 的功能和部分源码切入剖析,为学习研究、或者项目中使用SOFARegistry 的开发者带来一些启发,分为三个部分:MetaServer 作为 SOFARegistry 的元数据中心,其核心功能可以概括为集群成员管理。分布式系统中,如何知道集群中有哪些节点列表,如何处理集群扩容,如何处理集群节点异常,都是不得不考虑的问题。MetaServer 的存在就是解决这些问题,其在 SOFARegistry 中位置如图所示:MetaServer 通过 SOFAJRaft 保证高可用和一致性,类似于注册中心,管理着集群内部的成员列表:MetaServer 基于 Bolt,通过 TCP 私有协议的形式对外提供服务,包括 DataServer,SessionServer 等,处理节点的注册,续约和列表查询等请求。同时也基于 Http 协议提供控制接口,比如可以控制 session 节点是否开启变更通知,健康检查接口等。成员列表数据存储在 Repository 中,Repository 被一致性协议层进行包装,作为 SOFAJRaft 的状态机实现,所有对 Repository 的操作都会同步到其他节点,通过 Rgistry 来操作存储层。MetaServer 使用 Raft 协议保证数据一致性, 同时也会保持与注册的节点的心跳,对于心跳超时没有续约的节点进行驱逐,来保证数据的有效性。在可用性方面,只要未超过半数节点挂掉,集群都可以正常对外提供服务,半数以上挂掉,Raft 协议无法选主和日志复制,因此无法保证注册的成员数据的一致性和有效性。整个集群不可用不会影响 Data 和 Session 节点的正常功能,只是无法感知节点列表变化。MetaServer 在启动时,会启动三个 Bolt Server,并且注册 Processor Handler,处理对应的请求, 如下图所示:- DataServer:处理 DataNode 相关的请求;
- SessionServer:处理 SessionNode 相关的请求;
- MetaServer:处理MetaNode相关的请求;
然后启动 HttpServer,用于处理 Admin 请求,提供推送开关,集群数据查询等 Http 接口,最后启动 Raft 服务,每个节点同时作为 RaftClient 和 RaftServer,用于集群间的变更和数据同步。meta.server.sessionServerPort=9610 meta.server.dataServerPort=9611 meta.server.metaServerPort=9612 meta.server.raftServerPort=9614 meta.server.httpServerPort=9615 由上节可知,DataServer 和 SessionServer 都有处理节点注册请求的 Handler。注册行为由 Registry 完成。实现为:@Override public NodeChangeResult register(Node node) { StoreService storeService = ServiceFactory.getStoreService(node.getNodeType()); return storeService.addNode(node); } Regitsry 根据不同的节点类型,获取对应的 StoreService ,比如 DataNode ,其实现为 DataStoreService 然后由 StoreService 存储到 Repository 中,具体实现为:// 存储节点信息 dataRepositoryService.put(ipAddress, new RenewDecorate(dataNode, RenewDecorate.DEFAULT_DURATION_SECS)); //... // 存储变更事件 dataConfirmStatusService.putConfirmNode(dataNode, DataOperator.ADD); 调用 RepositoryService#put 接口存储后,同时会存储一个变更事件到队列中,主要用于数据推送,消费处理。节点数据的存储,其本质上是存储在内存的哈希表中,其存储结构为:// RepositoryService 底层存储Map<String/*dataCenter*/, NodeRepository> registry;// NodeRepository 底层存储Map<String/*ipAddress*/, RenewDecorate<T>> nodeMap; 将 RenewDecorate 存储到该 Map 中,整个节点注册的流程就完成了,至于如何和 Raft 协议进行结合和数据同步,下文介绍。节点移除的逻辑类似,将节点信息从该 Map 中删除,也会存储一个变更事件到队列。不知道有没有注意到,节点注册的时候,节点信息被 RenewDecorate 包装起来了,这个就是实现注册信息续约和驱逐的关键:private T renewal; // 节点对象封装 private long beginTimestamp; // 注册事件 private volatile long lastUpdateTimestamp; // 续约时间 private long duration; // 超时时间 该对象为注册节点信息,附加了注册时间、上次续约时间、过期时间。那么续约操作就是修改 lastUpdateTimestamp ,是否过期就是判断System.currentTimeMillis()-lastUpdateTimestamp>duration 是否成立,成立则认为节点超时进行驱逐。和注册一样,续约请求的处理 Handler 为 ReNewNodesRequestHandler ,最终交由 StoreService 进行续约操作。另外一点,续约的时候如果没有查询到注册节点,会触发节点注册的操作。驱出的操作是由定时任务完成,MetaServer 在启动时会启动多个定时任务,详见 ExecutorManager#startScheduler ,其中一个任务会调用 Registry#evict ,其实现为遍历存储的 Map,获得过期的列表,调用 StoreService#removeNodes 方法,将他们从 Repository 中移除,这个操作也会触发变更通知。该任务默认每3秒执行一次。上文有介绍到,在处理节点注册请求后,也会存储一个节点变更事件,即:dataConfirmStatusService.putConfirmNode(dataNode, DataOperator.ADD); DataConfirmStatusService 也是一个由 Raft 协议进行同步的存储,其存储结构为:BlockingQueue<NodeOperator> expectNodesOrders = new LinkedBlockingQueue(); ConcurrentHashMap<DataNode/*node*/, Map<String/*ipAddress*/, DataNode>> expectNodes = new ConcurrentHashMap<>();
expectNodesOrders 用来存储节点变更事件;expectNodes 用来存储变更事件需要确认的节点,也就是说 NodeOperator 只有得到了其他节点的确认,才会从expectNodesOrders移除; 那么事件存储到 BlockingQueue 里,哪里去消费呢?看源码发现,并不是想象中的使用一个线程阻塞的读。在 ExecutorManager 中会启动一个定时任务,轮询该队列有没有数据。即周期性的调用 Registry#pushNodeListChange 方法,获取队列的头节点并消费。Data 和 Session 各对应一个任务。具体流程如下图所示:- 首先获取队列(expectNodesOrders)头节点,如果为Null直接返回;
- 获取当前数据中心的节点列表,并存储到确认表(expectNodes);
- 提交节点变更推送任务(firePushXxListTask);
- 处理任务,即调用 XxNodeService 的 pushXxxNode 方法,即通过 ConnectionHandler 获取所有的节点连接,发送节点列表;
- 收到回复后,如果需要确认,则会调用
StroeService#confirmNodeStatus 方法,将该节点从expectNodes中移除; - 待所有的节点从 expectNodes 中移除,则将此次操作从 expectNodesOrders 移除,处理完毕;
Data,Meta,Session Server 都提供 getNodesRequestHandler,用于处理查询当前节点列表的请求,其本质上从底层存储 Repository 读取数据返回,这里不在赘述。返回的结果的具体结构见 NodeChangeResult 类,包含各个数据中心的节点列表以及版本号。后端 Repository 可以看作SOFAJRaft 的状态机,任何对 Map 的操作都会在集群内部,交由 Raft 协议进行同步,从而达到集群内部的一致。从源码上看,所有的操作都是直接调用的 RepositoryService 等接口,那么是如何和 Raft 服务结合起来的呢?看源码会发现,凡是引用 RepositoryService 的地方,都加了 @RaftReference , RepositoryService 的具体实现类都加了 @RaftService 注解。其关键就在这里,其处理类为 RaftAnnotationBeanPostProcessor 。具体流程如下:在 processRaftReference 方法中,凡是加了 @RaftReference 注解的属性,都会被动态代理类替换,其代理实现 ProxyHandler 类,即将方法调用,封装为 ProcessRequest ,通过 RaftClient 发送给 RaftServer。而被加了 @RaftService 的类会被添加到 Procssor类 中,通过 serviceId (interfaceName + uniqueId) 进行区分。RaftServer 收到请求后,会把它生效到 SOFAJRaft 的状态机,具体实现类为 ServiceStateMachine ,即会调用 Procssor 方法,通过 serviceId 找到这个实现类,执行对应的方法调用。当然如果本机就是主节点, 对于一些查询请求不需要走 Raft 协议而直接调用本地实现方法。这个过程其实和 RPC 调用非常类似,在引用方发起的方法调用,并不会真正的执行方法,而是封装成请求发送到 Raft 服务,由 Raft 状态机进行真正的方法调用,比如把节点信息存储到 Map 中。在分布式系统中,集群成员管理是避不开的问题,有些集群直接把列表信息写到配置文件或者配置中心,也有的集群选择使用 zookeeper 或者 etcd 等维护集群元数据,SOFARegistry 选择基于一致性协议 Raft,开发独立的MetaServer,来实现集群列表维护和变更实时推送,以提高集群管理的灵活性和集群的健壮性。
|