分享

Storm源码解析1

 昵称20874412 2014-12-18
最近正好在做一个OEP + Coherence的实时数据流分析PoC。突然想到开源世界里还有Storm这么个东东,于是心血来潮想要研究一下。
开源东西好的一点就是源码易得,但是Src拿的到不代表看的懂。这里就把每日抽空理解到的内容记录一下,也便于日后自己复习。

本次源码分析的版本是最新的0.9.3。源码分析是从一个本地例子开始的,源码如下:
public class TopologyMain {
    public static void main(String[] args) throws InterruptedException {
        
        //Topology definition
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader",new WordReader());
        builder.setBolt("word-normalizer", new WordNormalizer())
            .shuffleGrouping("word-reader");
        builder.setBolt("word-counter", new WordCounter(),1)
            .fieldsGrouping("word-normalizer", new Fields("word"));
       
        //Configuration
        Config conf = new Config();
        conf.put("wordsFile", args[0]);
        conf.setDebug(false);
        //Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
        Thread.sleep(1000);
        cluster.shutdown();
    }
}


从这个例子可以看出Storm的主程序还是很简洁的,定义好topologybuilder以后,相关的Spout,Bolt定义好,在声明好拓扑结构,往LocalCluster里一扔就好了。

这里的LocalCluster是为了本地开发方便而使用的一个开发类,但是其代码调用过程和核心内容与实际生产中使用的类似,于是就先从它下手。

我们可以看到,该类中具体调用时是通过cluster.submitTopology来调用的,参数是 cluster的名字,参数配置cof以及我们创建好的topology。

这段代码运行时,会调用\storm-core\src\clj\backtype\storm\LocalCluster.clj。LocalCluster.clj是Clojure编写的代码,本人对Clojure完全不了解,为了阅读也是硬着头皮硬生生地查了半天。(话说讨厌开源就在这点,就不能用统一的语言来编写啊,非搞这么小众!)。

打开LocalCluster.clj我们能够看到开始两段代码:
(ns backtype.storm.LocalCluster
  (:use [backtype.storm testing config])
  (:import [java.util Map])
  (:gen-class
    :init init
    :implements [backtype.storm.ILocalCluster]
    :constructors {[] [] [java.util.Map] [] [String Long] []}
    :state state))

(defn -init
  ([]
   (let [ret (mk-local-storm-cluster
               :daemon-conf
               {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})]
     [[] ret]))
  ([^String zk-host ^Long zk-port]
   (let [ret (mk-local-storm-cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true
                                                     STORM-ZOOKEEPER-SERVERS (list zk-host)
                                                     STORM-ZOOKEEPER-PORT zk-port})]
     [[] ret]))
  ([^Map stateMap]
   [[] stateMap]))

其中第一段有
:use [backtype.storm testing config] 这样一句声明。这句声明就意味着该clj中会使用相同目录下testing.clj和config.clj这两个文件。
下一段代码
  (:gen-class
    :init init
就类似于Java中的构造类,一位着该代码运行时,需要先运行一下init方法,这个init方法的名字就叫"init"(太糙了。。)。

在第二段代码中,我们看到
(defn -init 这个声明就是init方法了。在Init方法中会调用mk-local-storm-cluster 这个方法在testing.clj里定义,我们看看代码
(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024]
  (let [zk-tmp (local-temp-path)
        [zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
                              (zk/mk-inprocess-zookeeper zk-tmp))
        daemon-conf (merge (read-storm-config)
                           {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
                            ZMQ-LINGER-MILLIS 0
                            TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false
                            TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50
                            STORM-CLUSTER-MODE "local"}
                           (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
                             {STORM-ZOOKEEPER-PORT zk-port
                              STORM-ZOOKEEPER-SERVERS ["localhost"]})
                           daemon-conf)
        nimbus-tmp (local-temp-path)
        port-counter (mk-counter supervisor-slot-port-min)
        nimbus (nimbus/service-handler
                 (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)
                 (if inimbus inimbus (nimbus/standalone-nimbus)))
        context (mk-shared-context daemon-conf)

其中,关于配置信息,我们看到调用了
read-storm-config这个方法在config.clj里定义,具体代码如下
(defn read-storm-config
  []
  (let [conf (clojurify-structure (Utils/readStormConfig))]
    (validate-configs-with-schemas conf)
    conf))

这个定义意味着调用了java backtype.storm.utils.Utils.java的方法,我们具体看一下代码
    public static Map readStormConfig() {
        Map ret = readDefaultConfig();
        String confFile = System.getProperty("storm.conf.file");
        Map storm;
        if (confFile==null || confFile.equals("")) {
            storm = findAndReadConfigFile("storm.yaml", false);
        } else {
            storm = findAndReadConfigFile(confFile, true);
        }
        ret.putAll(storm);
        ret.putAll(readCommandLineOpts());
        return ret;
    }


这个方法就是真正去解析配置的入口,解析过程:先读取默认的defualts.yaml的配置,对于源码来说该文件是在conf目录下,对于release版本则是该文件打到了storm.jar内。

其次,再解析用户配置的storm.yaml中的配置项,如果strom.yaml中有配置项与默认配置文件的配置项有冲突,则会覆盖掉默认配置项。最后,取系统环境变量中设置的storm.options的值,这一般都是没有的,因此这步可以跳过。

         :storm的配置文件用到了yaml这种配置格式,可参考其官方http://www./

        

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多