分享

Spark1.6.0

 开心豆豆2010 2018-07-23

前言

几年前,我在刚刚进入大数据领域的时候,很快就了解到Hive所提供的一种另类的SQL。最初使用Hive的命令行提交任务,后来便用上了HiveServer和HiveServer2。半年前第一次注意到Spark的Thrift服务,当时心中就笃定它肯定与HiveServer2有着某种联系,直到在工作中真正使用它。

在使用HiveThriftServer2的过程中,通过故障排查、源码分析和功能优化,HiveThriftServer2的实现及其原理就浮上水面。等我了解了HiveThriftServer2,情不自禁的就会和Tomcat进行一番比较,尤其是在组件的生命周期管理方面。有兴趣的同学可以先阅读下我之前写得《Tomcat7.0源码分析——生命周期管理》一文。Tomcat将内部组件都抽象为容器,而HiveThriftServer2的内部组件都是服务。HiveThriftServer2对服务的抽象不是本来固有的,而是继承自HiveServer2的。根据HiveServer2的设计,所有的服务都需要实现Service接口。

本文将从HiveThriftServer2的Service接口设计、生命周期管理、启动过程分析三个角度深入分析。

一切都是服务

早年间火热的SOA技术,有一个关键的修饰语——“一切都是服务”,今天拿来形容HiveThriftServer2及HiveServer2中的组件抽象却恰如其分。HiveServer2定义了Service接口,其中包含的接口方法见代码清单1。

代码清单1 Service的定义

  1. void init(HiveConf conf);
  2. void start();
  3. void stop();
  4. void register(ServiceStateChangeListener listener);
  5. void unregister(ServiceStateChangeListener listener);
  6. String getName();
  7. HiveConf getHiveConf();
  8. STATE getServiceState();
  9. long getStartTime();
代码清单1展示了Service定义的各个接口方法,它们分别是:

  • init:对服务组件进行初始化
  • start:启动服务组件
  • stop:停止服务组件
  • register:注册对服务组件的状态感兴趣的监听器,监听器由ServiceStateChangeListener接口定义。ServiceStateChangeListener只有stateChanged一个方法用来处理服务组件的状态。
  • unregister:撤销对服务组件的状态感兴趣的监听器
  • getName:获取服务组件的名称
  • getHiveConf:获取初始化服务组件时,设置的HiveConf
  • getServiceState:获取服务组件的状态。方法返回的枚举类型STATE定义了服务组件所能拥有的所有可能状态,包括:未初始化(NOTINITED)、初始化(INITED)、已启动(STARTED)、已停止(STOPPED)。
  • getStartTime:获取服务组件的启动时间。
HiveServer2的抽象类AbstractService中实现了Service的所有接口,AbstractService中有以下属性:

  • state:服务组件的初始状态,默认为NOTINITED。
  • name:服务组件的名称。
  • startTime:服务组件的启动时间。
  • hiveConf:服务组件初始化时设置的hiveConf。
  • listeners:用于缓存对服务组件的状态感兴趣的所有ServiceStateChangeListener的列表。
有了对这些属性的了解,AbstractService实现的方法如下:

服务初始化

AbstractService实现的初始化方法见代码清单2.
代码清单2
  1. public synchronized void init(HiveConf hiveConf) {
  2. ensureCurrentState(STATE.NOTINITED); //确认服务组件的当前状态是否一致
  3. this.hiveConf = hiveConf;
  4. changeState(STATE.INITED); //将服务组件的状态修改为INITED
  5. LOG.info("Service:" + getName() + " is inited.");
  6. }
changeState方法除了修改服务组件的状态外,还会触发所有监听器,见代码清单3.
代码清单3
  1. private void changeState(STATE newState) {
  2. state = newState;
  3. // notify listeners
  4. for (ServiceStateChangeListener l : listeners) {
  5. l.stateChanged(this);
  6. }
  7. }

服务启动

AbstractService实现的启动方法见代码清单4.
代码清单4
  1. public synchronized void start() {
  2. startTime = System.currentTimeMillis();
  3. ensureCurrentState(STATE.INITED);
  4. changeState(STATE.STARTED); //将服务组件的状态修改为STARTED
  5. LOG.info("Service:" + getName() + " is started.");
  6. }

服务停止

AbstractService实现的启动方法见代码清单5.
代码清单5

  1. public synchronized void stop() {
  2. if (state == STATE.STOPPED ||
  3. state == STATE.INITED ||
  4. state == STATE.NOTINITED) {
  5. // already stopped, or else it was never
  6. // started (eg another service failing canceled startup)
  7. return;
  8. }
  9. ensureCurrentState(STATE.STARTED);
  10. changeState(STATE.STOPPED); //将服务组件的状态修改为STOPPED
  11. LOG.info("Service:" + getName() + " is stopped.");
  12. }

监听器的注册与注销

AbstractService实现的监听器注册和注销方法见代码清单6.

代码清单6

  1. @Override
  2. public synchronized void register(ServiceStateChangeListener l) {
  3. listeners.add(l);
  4. }
  5. @Override
  6. public synchronized void unregister(ServiceStateChangeListener l) {
  7. listeners.remove(l);
  8. }

获取服务信息

AbstractService实现的获取服务信息的方法见代码清单7.

代码清单7

  1. @Override
  2. public String getName() {
  3. return name;
  4. }
  5. @Override
  6. public synchronized HiveConf getHiveConf() {
  7. return hiveConf;
  8. }
  9. @Override
  10. public long getStartTime() {
  11. return startTime;
  12. }

有了Service的基础定义和AbstractService的基础实现,HiveServer2中的所有服务就有了依托。像OperationManager和ThriftCliService就直接继承了AbstractService。在整个设计中,HiveServer2还提供了一个对AbstractService的行为进行了重写的复合组件(CompositeService)用于表示由多种服务组件组合构成的服务组件。HiveServer2中的所有复合组件,比如CLIService、HiveServer2、SessionManager等都继承自CompositeService。HiveThriftServer2通过反射加继承的方式间接的使用了AbstractService和CompositeService。在介绍复合组件之前,我们先展示整个Service的继承体系,如图1所示。



图1 Service的继承体系

复合服务

从图1可以看出CompositeService直接继承了AbstractService。CompositeService内部定义了一个用于管理所有子Service的列表:

  private final List<Service> serviceList = new ArrayList<Service>();
所有组成CompositeService的子Service,都将被serviceList所持有。CompositeService借助List的add或remove方法实现了子Service的添加与删除。

CompositeService重写了AbstractService实现的服务初始化、服务启动、服务停止等方法。

复合服务的初始化

CompositeService重写的初始化方法见代码清单8.

代码清单8

  1. @Override
  2. public synchronized void init(HiveConf hiveConf) {
  3. for (Service service : serviceList) {
  4. service.init(hiveConf);
  5. }
  6. super.init(hiveConf);
  7. }
根据代码清单8,我们知道CompositeService的初始化实际就是对所有子Service的初始化。

复合服务的启动

CompositeService重写的启动方法见代码清单9.

代码清单9

  1. @Override
  2. public synchronized void start() {
  3. int i = 0;
  4. try {
  5. for (int n = serviceList.size(); i < n; i++) {
  6. Service service = serviceList.get(i);
  7. service.start();
  8. }
  9. super.start();
  10. } catch (Throwable e) {
  11. LOG.error("Error starting services " + getName(), e);
  12. stop(i);
  13. throw new ServiceException("Failed to Start " + getName(), e);
  14. }
  15. }
根据代码清单9,我们知道CompositeService的启动实际就是对所有子Service的启动。

复合服务的停止

CompositeService重写的停止方法见代码清单10.
代码清单10

  1. @Override
  2. public synchronized void stop() {
  3. if (this.getServiceState() == STATE.STOPPED) {
  4. // The base composite-service is already stopped, don't do anything again.
  5. return;
  6. }
  7. if (serviceList.size() > 0) {
  8. stop(serviceList.size() - 1);
  9. }
  10. super.stop();
  11. }
根据代码清单10,我们知道CompositeService的停止实际就是对所有子Service的停止。
图1中所示的CLIService、HiveServer2、SessionManager由于继承了CompositeService,所以他们都是复合组件。

HiveThriftServer2与HiveServer2的区别

从图1看到HiveThriftServer2、SparkSQLSessionManager及SparkSQLCLIService在分别继承了HiveServer2、SessionManager及CLIService的同时,又分别实现了ReflectedCompositeService。这种实现貌似很奇怪,但是细细想来其实非常自然。HiveThriftServer2从设计之初就是要重用HiveServer2中的各种功能,这可以大大简化开发的工作量,并且可以直接将HiveServer2已经实现的各种特性继承过来。那么为什么又要实现ReflectedCompositeService呢?在回答这个问题之前,我们先来看看HiveThriftServer2与HiveServer2内部Service的区别,如图2所示。

图2 HiveServer2与HiveThriftServer2的对比
从图2看到,HiveServer2的子Service包括CLIService、ThriftHttpCLIService及ThriftBinaryCLIService。而HiveThriftServer2的子Service包括SparkSQLCLIService、ThriftHttpCLIService及ThriftBinaryCLIService。CLIService和SparkSQLCLIService内部也有不同的子Service。由于HiveThriftServer2内部需要使用Spark的一些API,所以HiveThriftServer2不可能单纯的继承HiveServer2。另一方面,由于HiveThriftServer2继承了HiveServer2,所以默认情况下HiveThriftServer2内部的子Service即为HiveServer2内部的子服务,为了给HiveThriftServer2的子Service做一些定制化的扩展,只能通过反射的方式替换HiveServer2内部的子Service。ReflectedCompositeService提供了反射初始化HiveThriftServer2的子Service的实现,见代码清单11.
代码清单11

  1. private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>
  2. def initCompositeService(hiveConf: HiveConf) {
  3. // Emulating `CompositeService.init(hiveConf)`
  4. val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList")
  5. serviceList.asScala.foreach(_.init(hiveConf))
  6. // Emulating `AbstractService.init(hiveConf)`
  7. invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED)
  8. setAncestorField(this, 3, "hiveConf", hiveConf)
  9. invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.INITED)
  10. getAncestorField[Log](this, 3, "LOG").info(s"Service: $getName is inited.")
  11. }
  12. }
Spark中每个实现了ReflectedCompositeService的服务,内部都重写了init方法,以便于创建那些扩展了Spark相关特性的子Service,并将它们添加到serviceList中,最后调用ReflectedCompositeService提供的initCompositeService方法完成子Service的初始化。

初始化、启动、停止

无论是HiveThriftServer2还是HiveServer2,除了初始化过程,它们的启动过程、停止过程都是类似的。本节将先分别展示HiveServer2和HiveThriftServer2的初始化过程,然后介绍它们共同的启动过程,至于停止过程和启动过程十分相似,就不过多介绍了。
HiveServer2的初始化过程,如图3所示。

图3 HiveServer2的初始化过程
HiveThriftServer2的初始化过程,如图4所示。

图4 HiveThriftServer2的初始化过程
Service的启动过程,如图5所示。

图5 Service的启动过程

由于Service的停止过程与启动过程相类似,因此不再赘述。

最后再附上本文的姊妹篇——《Spark1.6.0功能扩展——为HiveThriftServer2增加HA》


关于Spark内核设计的艺术 架构设计与实现

经过近一年的准备,基于Spark2.1.0版本的《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:


纸质版售卖链接如下:
电子版售卖链接如下:

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多