最近正好在做一个OEP + Coherence的实时数据流分析PoC。突然想到开源世界里还有Storm这么个东东,于是心血来潮想要研究一下。 开源东西好的一点就是源码易得,但是Src拿的到不代表看的懂。这里就把每日抽空理解到的内容记录一下,也便于日后自己复习。 本次源码分析的版本是最新的0.9.3。源码分析是从一个本地例子开始的,源码如下: public class TopologyMain { public static void main(String[] args) throws InterruptedException { //Topology definition TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word-reader",new WordReader()); builder.setBolt("word-normalizer", new WordNormalizer()) .shuffleGrouping("word-reader"); builder.setBolt("word-counter", new WordCounter(),1) .fieldsGrouping("word-normalizer", new Fields("word")); //Configuration Config conf = new Config(); conf.put("wordsFile", args[0]); conf.setDebug(false); //Topology run conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology()); Thread.sleep(1000); cluster.shutdown(); } } 从这个例子可以看出Storm的主程序还是很简洁的,定义好topologybuilder以后,相关的Spout,Bolt定义好,在声明好拓扑结构,往LocalCluster里一扔就好了。 这里的LocalCluster是为了本地开发方便而使用的一个开发类,但是其代码调用过程和核心内容与实际生产中使用的类似,于是就先从它下手。 我们可以看到,该类中具体调用时是通过cluster.submitTopology来调用的,参数是 cluster的名字,参数配置cof以及我们创建好的topology。 这段代码运行时,会调用\storm-core\src\clj\backtype\storm\LocalCluster.clj。LocalCluster.clj是Clojure编写的代码,本人对Clojure完全不了解,为了阅读也是硬着头皮硬生生地查了半天。(话说讨厌开源就在这点,就不能用统一的语言来编写啊,非搞这么小众!)。 打开LocalCluster.clj我们能够看到开始两段代码: (ns backtype.storm.LocalCluster (:use [backtype.storm testing config]) (:import [java.util Map]) (:gen-class :init init :implements [backtype.storm.ILocalCluster] :constructors {[] [] [java.util.Map] [] [String Long] []} :state state)) (defn -init ([] (let [ret (mk-local-storm-cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})] [[] ret])) ([^String zk-host ^Long zk-port] (let [ret (mk-local-storm-cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true STORM-ZOOKEEPER-SERVERS (list zk-host) STORM-ZOOKEEPER-PORT zk-port})] [[] ret])) ([^Map stateMap] [[] stateMap])) 其中第一段有:use [backtype.storm testing config] 这样一句声明。这句声明就意味着该clj中会使用相同目录下testing.clj和config.clj这两个文件。 下一段代码 (:gen-class :init init 就类似于Java中的构造类,一位着该代码运行时,需要先运行一下init方法,这个init方法的名字就叫"init"(太糙了。。)。 在第二段代码中,我们看到(defn -init 这个声明就是init方法了。在Init方法中会调用mk-local-storm-cluster 这个方法在testing.clj里定义,我们看看代码 (defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024] (let [zk-tmp (local-temp-path) [zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS) (zk/mk-inprocess-zookeeper zk-tmp)) daemon-conf (merge (read-storm-config) {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true ZMQ-LINGER-MILLIS 0 TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50 STORM-CLUSTER-MODE "local"} (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS) {STORM-ZOOKEEPER-PORT zk-port STORM-ZOOKEEPER-SERVERS ["localhost"]}) daemon-conf) nimbus-tmp (local-temp-path) port-counter (mk-counter supervisor-slot-port-min) nimbus (nimbus/service-handler (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp) (if inimbus inimbus (nimbus/standalone-nimbus))) context (mk-shared-context daemon-conf) 其中,关于配置信息,我们看到调用了read-storm-config这个方法在config.clj里定义,具体代码如下 (defn read-storm-config [] (let [conf (clojurify-structure (Utils/readStormConfig))] (validate-configs-with-schemas conf) conf)) 这个定义意味着调用了java backtype.storm.utils.Utils.java的方法,我们具体看一下代码 public static Map readStormConfig() { Map ret = readDefaultConfig(); String confFile = System.getProperty("storm.conf.file"); Map storm; if (confFile==null || confFile.equals("")) { storm = findAndReadConfigFile("storm.yaml", false); } else { storm = findAndReadConfigFile(confFile, true); } ret.putAll(storm); ret.putAll(readCommandLineOpts()); return ret; } 这个方法就是真正去解析配置的入口,解析过程:先读取默认的defualts.yaml的配置,对于源码来说该文件是在conf目录下,对于release版本则是该文件打到了storm.jar内。 其次,再解析用户配置的storm.yaml中的配置项,如果strom.yaml中有配置项与默认配置文件的配置项有冲突,则会覆盖掉默认配置项。最后,取系统环境变量中设置的storm.options的值,这一般都是没有的,因此这步可以跳过。 注:storm的配置文件用到了yaml这种配置格式,可参考其官方http://www./ |
|
来自: 昵称20874412 > 《Storm》