背景介绍ZooKeeper可以以standalone ,伪分布式和分布式三种方式部署.standalone 模式下只有一台机器作为服务器,丧失了ZooKeeper高可用的特点.伪分布式是在一台电脑上使用不同端口启动多个ZooKeeper服务器.分布式是使用多个机器,每台机器上部署一个ZooKeeper服务器,即使有服务器宕机,只要少于半数,ZooKeeper集群依然可以正常对外提供服务. ZooKeeper以standalone 模式启动只需启动对客户端提供服务的组件,无需启动集群内部通信组件,较为简单,因此先从standalone 模式开始介绍. 整体架构 Zookeeper整体架构如上图,其中包括ServerCnxnFactory ,SessionTracker ,RequestProcessor ,FileTxnSnapLog 等众多组件,这些组件都会在日后一一介绍.
启动流程概述standalone 模式启动主要包括如下几个步骤:
配置文件解析 创建并启动历史文件清理器 初始化数据管理器 注册shutdownHandler 启动Admin server 创建并启动网络IO管理器 启动ZooKeeperServer 创建并启动secureCnxnFactory 创建并启动ContainerManager
源码如下: protected void initializeAndRun(String[] args)throws ConfigException, IOException, AdminServerException {//1.解析配置文件QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {
config.parse(args[0]);
}// Start and schedule the the purge task//2.创建并启动历史文件清理器(对事务日志和快照数据文件进行定时清理)DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();if (args.length == 1 && config.isDistributed()) {//集群启动runFromConfig(config);
} else {//单机启动LOG.warn('Either no config or no quorum defined in config, running ' ' in standalone mode');// there is only server in the quorum -- run as standaloneZooKeeperServerMain.main(args);
}
} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/**
* Run from a ServerConfig.
*
* @param config ServerConfig to use.
* @throws IOException
* @throws AdminServerException
*/public void runFromConfig(ServerConfig config)throws IOException, AdminServerException {
LOG.info('Starting server');
FileTxnSnapLog txnLog = null;try {//3.创建ZooKeeper数据管理器txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog,
config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);
txnLog.setServerStats(zkServer.serverStats());//4.注册shutdownHandler,在ZooKeeperServer的状态变化时调用shutdownHandler的handle()final CountDownLatch shutdownLatch = new CountDownLatch(1);
zkServer.registerServerShutdownHandler(new ZooKeeperServerShutdownHandler(shutdownLatch));//5.启动Admin serveradminServer = AdminServerFactory.createAdminServer();
adminServer.setZooKeeperServer(zkServer);
adminServer.start();//6.创建并启动网络IO管理器boolean needStartZKServer = true;if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);//7.此方法除了启动ServerCnxnFactory,还会启动ZooKeepercnxnFactory.startup(zkServer);// zkServer has been started. So we don't need to start it again in secureCnxnFactory.needStartZKServer = false;
}//8.创建并启动secureCnxnFactory if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true);
secureCnxnFactory.startup(zkServer, needStartZKServer);
}//9.创建并启动ContainerManagercontainerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.firstProcessor,
Integer.getInteger('znode.container.checkIntervalMs', (int) TimeUnit.MINUTES.toMillis(1)),
Integer.getInteger('znode.container.maxPerMinute', 10000)
);
containerManager.start();// Watch status of ZooKeeper server. It will do a graceful shutdown// if the server is not running or hits an internal error.//服务器正常启动时,运行到此处阻塞,只有server的state变为ERROR或SHUTDOWN时继续运行后面的代码shutdownLatch.await();
shutdown();if (cnxnFactory != null) {
cnxnFactory.join();
}if (secureCnxnFactory != null) {
secureCnxnFactory.join();
}if (zkServer.canShutdown()) {
zkServer.shutdown(true);
}
} catch (InterruptedException e) {// warn, but generally this is okLOG.warn('Server interrupted', e);
} finally {if (txnLog != null) {
txnLog.close();
}
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
ZooKeeperstandalone 启动共有9个步骤,其中有些步骤还有子步骤,有些步骤还会启动线程.因此下文中只会对一些简单的步骤进行介绍,复杂的步骤留作日后补充,接下来我们就分别看下这9个步骤. 解析配置文件ZooKeeper启动时会读取配置文件,默认读取$ZK_HOME/conf/zoo.cfg ,其将文件解析为java.util.Properties ,根据Properties 中键值对的key 设置相应value . 创建并启动历史文件清理器ZooKeeper虽然是一个内存数据库,但是其通过快照和事务日志提供了持久化的功能.在ZooKeeper启动时,会根据快照和事务日志恢复数据,重建内存数据库;每次写操作提交时,会在事务日志中增加一条记录,表明此次写操作更改了哪些数据;在进行snapCount 次事务之后,将内存数据库所有节点的数据和当前会话信息生成一个快照. ZooKeeper的事务日志类似于MySQL的redolog,若每次写操作后直接将数据写到磁盘上,则会存在大量的磁盘随机读写,若是写事务日志,则将磁盘随机读写转换为顺序读写.保证了数据的持久性的同时也兼顾了性能. 随着时间的推移,会生成越来越多的快照和事务日志文件,为了定时清理无效日志,DatadirCleanupManager 启动定时任务完成日志文件的清理. 相关配置属性名 | 对应配置 | 配置方式 | 默认值 | 含义 |
---|
snapRetainCount | autopurge.snapRetainCount | 配置文件 | 3 | 清理后保留的快照文件个数,最小值为3,若设置为<3的数,则修改为3 | purgeInterval | autopurge.purgeInterval | 配置文件 | 0 | 清理任务TimeTask的执行周期,即几小时执行一次,单位:小时,若设置为<=0的值,则不会设置定时任务,默认不设置. |
思考配置项中只有snapRetainCount 用于设置清理后保留的快照文件个数,那清理快照文件时会同时清理事务日志文件吗?若会清理,清理之后会保留几个事务日志文件呢? 答案:清理快照文件时会同时清理事务日志文件,假如保留了3个快照文件,其后缀名分别为100,200,300,则若事务日志文件中包含事务id>100的事务,则该事务日志文件被保留.则事务日志文件后缀>100的都会被保留,此外,后缀名<=100的事务日志文件中最新的事务日志也被保留.因为即使该事务日志文件后缀<=100,但是可能其包含的事务中一部分id<=100,一部分>100,此时也需保留该文件 事务日志和快照文件后缀名的含义见Zookeeper-持久化 创建ZooKeeper数据管理器即初始化FileTxnSnapLog ,FileTxnSnapLog 组合了TxnLog 和SnapShot ,根据类名也可以看出,TxnLog 负责处理事务日志,SnapShot 负责处理快照.FileTxnSnapLog 是Zookeeper上层服务器和底层数据存储之间的对接层,提供一系列操作数据文件的方法,如: restore(DataTree, Map, PlayBackListener) 启动ZookeeperServer 时调用此方法从磁盘上的快照和事务日志中恢复数据 getLastLoggedZxid() 获取日志中记载的最新的zxid save(DataTree,ConcurrentHashMap, boolean syncSnap) 将内存中的数据持久化到磁盘中
除此之外还有大量方法便于操作快照和事务日志. 注册shutdownhandler在服务器单机启动结束后有一句shutdownLatch.await() ,服务器运行到此已经启动完毕,主线程阻塞在此处.但服务器退出时还需要做一些清理工作,因此注册shutdownhandler ,在ZooKeeperServer#setState(State) 中调用此方法. /**
* 当服务器状态变为`ERROR`或`SHUTDOWN`时唤醒shutdownLatch,执行后续的清理代码.
* @param state new server state
*/void handle(State state) {if (state == State.ERROR || state == State.SHUTDOWN) {
shutdownLatch.countDown();
}
} 启动Admin serverAdminServer是3.5.0之后支持的特性,启动了一个jettyserver,默认端口是8080,访问此端口可以获取Zookeeper运行时的相关信息:
如服务器的相关配置,统计信息等. 相关配置其配置如下 参数名 | 默认 | 描述 |
---|
admin.enableServer | true | 设置为“false”禁用AdminServer。默认情况下,AdminServer是启用的。对应java系统属性是:zookeeper.admin.enableServer | admin.serverPort | 8080 | Jetty服务的监听端口,默认是8080。对应java系统属性是:zookeeper.admin.serverPort | admin.commandURL | “/commands” | 访问路径 |
如果在启动Zookeeper时提示Unable to start AdminServer, exiting abnormally ,可能就是tomcat或其他软件占用了8080端口,需要修改AdminServer 的默认端口. 创建并启动网络IO管理器ServerCnxnFactory 是Zookeeper中的重要组件,负责处理客户端与服务器的连接.主要有两个实现,一个是NIOServerCnxnFactory ,使用Java原生NIO处理网络IO事件;另一个是NettyServerCnxnFactory ,使用Netty处理网络IO事件.作为处理客户端连接的组件,其会启动若干线程监听客户端连接端口(即默认的9876端口).由于此组件非常复杂,日后单写一篇博客讲解
启动ZooKeeperServer启动Zookeeper会完成两件事情,一是从磁盘上快照和事务日志文件将数据恢复到内存中,二是启动会话管理器 恢复数据 /**
* 初始化ZkDatabase
*/public void startdata()throws IOException, InterruptedException {//check to see if zkDb is not nullif (zkDb == null) {
zkDb = new ZKDatabase(this.txnLogFactory);
}if (!zkDb.isInitialized()) {//从快照和事务日志中恢复数据loadData();
}
}
/**
* Restore sessions and data
*/public void loadData() throws IOException, InterruptedException {if (zkDb.isInitialized()) {
setZxid(zkDb.getDataTreeLastProcessedZxid());
} else {//1.由于zkDatabase尚未初始化,进入此分支(通过快照和事务日志恢复数据)setZxid(zkDb.loadDataBase());
}// 2.清理过期session,删除其对应的nodeList<Long> deadSessions = new LinkedList<Long>();for (Long session : zkDb.getSessions()) {if (zkDb.getSessionWithTimeOuts().get(session) == null) {
deadSessions.add(session);
}
}for (long session : deadSessions) {// XXX: Is lastProcessedZxid really the best thing to use?killSession(session, zkDb.getDataTreeLastProcessedZxid());
}// 3.做一次快照takeSnapshot();
} 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
启动会话管理器在介绍Zookeeper的回话之前,我们先回忆下Http中的会话. 由于HTTP协议是无状态的协议,所以服务端需要记录用户的状态时,就需要用某种机制来识具体的用户,这个机制就是Session.典型的场景比如购物车,当你点击下单按钮时,由于HTTP协议无状态,所以并不知道是哪个用户操作的,所以服务端要为特定的用户创建了特定的Session,用于标识这个用户,并且跟踪用户,这样才知道购物车里面有几本书。这个Session是保存在服务端的,有一个唯一标识。在服务端保存Session的方法很多,内存、数据库、文件都有。集群的时候也要考虑Session的转移,在大型的网站,一般会有专门的Session服务器集群,用来保存用户会话,这个时候 Session 信息都是放在内存的,使用一些缓存服务比如Memcached之类的来放 Session。
session是一个抽象概念,开发者为了实现中断和继续等操作,将 user agent 和 server 之间一对一的交互,抽象为”会话”,进而衍生出”会话状态”,也就是 session 的概念. 而session的实现一般是在服务端保存的一个数据结构,用来跟踪用户的状态,这个数据可以保存在集群,数据库,文件中,每一个session都有一个sessionId用来唯一标识session.客户端在发送请求时,将sessionId作为请求参数发送给服务器,服务器就可根据sessionId找到保存在服务器中的session. 由于Zookeeper提供了临时节点,Watcher通知等功能.自然需要保存客户端的状态,会话管理器就是Zookeeper中用于管理会话的组件.由于此组件过于复杂,单独介绍. 初始化zookeeper的请求处理链类比于tomcat,tomcat处理请求时会构造pipeline 和value ,filter 和filterChain 两个拦截过滤器处理请求,便于职责的解耦.Zookeeper会构造一个请求处理链用于处理客户端发送的请求.此组件过于复杂,单独介绍. 注册JMXJMX的全称为Java Management Extensions. 顾名思义,是管理Java的一种扩展。这种机制可以方便的管理、监控正在运行中的Java程序。常用于管理线程,内存,日志Level,服务重启,系统环境等
Zookeeper内部封装了注册JMX的逻辑,JMX注册成功后,可以通过visualVM查看和修改运行时属性.JMX相关知识请查阅资料. 创建并启动secureCnxnFactory个人推测此组件应该和ServerCnxnFactory 提供的功能类似,可能增加了认证的逻辑,目前没有在网上关于此组件源码的资料,有时间查看源码后补充. 创建并启动ContainerManager容器节点Zookeeper中的节点类型有持久节点,持久顺序节点,临时节点,临时顺序节点,通过创建临时顺序节点,我们可以实现leader选举,分布式锁等功能,比如实现一个分布式锁,我们的思路一般如下: 创建一个持久节点,如”/lock” 每一个想获取锁的进程在该节点下创建子节点,子节点类型为临时顺序节点 创建了若干临时顺序节点中顺序号最小的节点的线程获得锁;若进程未获得锁,则在顺序号最小的节点上注册监听事件,监听事件中包括竞争锁的相关逻辑.当获取锁的进程释放锁(即删除顺序号最小的节点)时将回调监听事件竞争锁.(简单介绍分布式锁的实现思路,未解决羊群效应)
问题出现在第一步,为了实现分布式锁的逻辑,我们必须建立一个父节点,且其类型为持久节点,但是当不需要分布式锁时谁来删除/lock 节点呢? 为了解决这个问题,Zookeeper在3.6.0 版本新增一种节点类型,即容器节点.其特点为:当容器节点的最后一个孩子节点被删除之后,容器节点将被标注并在一段时间后删除. 那么在实现分布式锁时,可以将/lock 类型设置为容器节点,当没有线程竞争分布式锁时,/lock 节点会被Zookeeper自动删除. 属性ContainerManager 中有两个重要参数控制其行为:
属性名 | 对应配置 | 配置方式 | 默认值 | 含义 |
---|
checkIntervalMs | znode.container.checkIntervalMs | 系统属性 | 60_000 | 执行两次检查任务之间的时间间隔,单位:ms,默认1min | maxPerMinute | znode.container.maxPerMinute | 系统属性 | 10_000 | 一分钟内最多删除多少个容器节点,即删除两个容器节点之间的最少时间间隔为60000/10000=6ms |
注:上述属性通过设置系统属性配置,即在启动QuorumPeerMain 时添加-Dznode.container.checkIntervalMs=XXX 实现为了能够及时清理容器节点,通过Timer 来执行定时任务,实现代码如下: /**
* Manually check the containers. Not normally used directly
*/public void checkContainers()throws InterruptedException {//删除两个容器节点之间的最小间隔,默认:6mslong minIntervalMs = getMinIntervalMs();//遍历待删除的容器节点(同时会删除过期的TTL节点)for (String containerPath : getCandidates()) {long startMs = Time.currentElapsedTime();
ByteBuffer path = ByteBuffer.wrap(containerPath.getBytes());
Request request = new Request(null, 0, 0,
ZooDefs.OpCode.deleteContainer, path, null);try {
LOG.info('Attempting to delete candidate container: {}',
containerPath);//只是将删除节点的请求发送给PrepRequestProcessor,并未真正删除该节点requestProcessor.processRequest(request);
} catch (Exception e) {
LOG.error('Could not delete container: {}',
containerPath, e);
}//删除一个容器节点所需时间long elapsedMs = Time.currentElapsedTime() - startMs;long waitMs = minIntervalMs - elapsedMs;//若删除一个容器节点所需时间小于minIntervalMs,线程sleep.// 由于Timer内部只有一个线程,因此可以保证删除两个容器节点之间的时间间隔至少是minIntervalMsif (waitMs > 0) {
Thread.sleep(waitMs);
}
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
总结作为一个服务器,除了在主线程中进行初始化工作,还会开启若干线程,为客户端提供服务,在这里,我们总结下Zookeeper单机启动时启动了多少线程: 历史文件清理线程:通过Timer 定时执行,执行周期为小时级别 Admin server:通过内置的jetty监听8080端口,但由于对jetty不了解,不知一共启动了多少个线程 ServerCnxnFactory:此组件负责管理客户端的TCP连接,其有两种实现,分别是原生NIO和基于Netty的实现 会话管理器:启动若干线程 请求处理链:启动若干线程 SecureServerCnxnFactory:与ServerCnxnFactory类似 ContainerManager:通过Timer 定时执行,清理过期的容器节点和TTL节点,执行周期为分钟级别
可以看出,有很多启动线程的组件在此都未做介绍,正式因为启动线程的组件是服务器的重点,内容繁多,另开博客介绍
|