准备工作 1. 下载zookeeper-3.4.7 2. 下载Storm apache-storm-0.9.3 3. 安装JDK 1.7 注: Storm0.9之前的版本,底层通讯用的是ZeroMQ,所以在安装0.9之前的版本需要安装0MQ,但是在0.9之后 我们直接安装就可以了。 因为在storm被移交到apache之后,这块用java的NIO矿建Netty代替了。 首先建立hadoop用户,我把和大数据相关的框架和软件都放在hadoop用户中。 安装ZK 1. 登陆到10.10.113.41并解压tar包 2. 建立zookeeper的data目录,/home/hadoop/zookeeper/data mkdir -p /home/hadoop/zookeeper/data 3. 建立zk集群的myid文件 (单机版可以跳过该步) cd /home/hadoop/zookeeper/data echo 1 > myid 4. 拷贝zookeeper的conf/zoo_sample.cfg并重命名为zoo.cfg,修改如下: dataDir=/home/hadoop/zookeeper/data server.1=10.10.113.41:2888:3888 server.2=10.10.113.42:2888:3888 server.3=10.10.113.43:2888:3888 dataDir是配置zk的数据目录的 server.A=B:C:D是集群zk使用的。如果你只想用单个zk,可以不配置。 A - 是一个数字,表示这是第几号服务器。与/var/tmp/zkdata下的myid文件内容一致 B - 是该服务器的IP地址 C - 表示该服务器与集群中的Leader服务器交换信息的端口 D - 表示如果万一集群中的Leader服务器挂了,需要各服务器重新选举时所用的通讯端口 5. (Optional)将zk的bin目录路径加入环境变量 修改/etc/profile文件,在尾部添加如下: #zookeeper export ZOOKEEPER==/home/hadoop/zookeeper PATH=$PATH:$ZOOKEEPER/bin 6. 启动zk zkServer.sh start 在剩下两台机器重复以上步骤,注意myid要对应 6.查看zk的运行状态 zkServer.sh status 安装Storm 1. 解压tar包并赋予执行权限 2. 将Storm的bin目录加入系统路径 修改/etc/profile文件,在尾部加入如下: PATH=$PATH:/home/hadoop/storm 使其生效 3. 创建一个Storm的本地数据目录 mkdir -p /home/hadoop/storm/data 以上步骤在Storm的集群上的其他机器上重复执行,然后进行配置: a. 配置storm.yaml 修改storm的conf/storm.yaml文件如下: storm.zookeeper.servers: #zk地址 - '10.10.113.41' - '10.10.113.42' - '10.10.113.43' nimbus.host: '10.10.113.41' #master 节点地址 supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 storm.local.dir: '/home/hadoop/storm/data' #数据存放地址 注意: 在每个配置项前面必须留有空格,否则会无法识别。 启动集群 1. 启动nimbus 在nimbus机器的Storm的bin目录下执行 nohup bin/storm nimbus >/dev/null 2>&1 & #启动主节点 nohup bin/storm ui >/dev/null 2>&1 & #启动stormUI nohup bin/storm logviewer >/dev/null 2>&1 & #启动logviewer 功能 2. 启动supervisor 在supervisor机器的Storm的bin目录下执行,所有supervisor节点都使用如下命令 nohup bin/storm supervisor >/dev/null 2>&1 & nohup bin/storm logviewer >/dev/null 2>&1 & 3. 检查 打开Storm UI 页面。http://10.10.113.41:8080/index.html 默认是启在8080端口上,如果你想改成其他的,如8089,直接修改nimbus的storm.yaml文件,添加 ui.port=8089 部署程序 1. 这里我使用 Intellij IDEA + maven来开发一个wordcount的Demo 2. 添加maven依赖 <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.3< 1. 这里我使用 Intellij IDEA + maven来开发一个wordcount的Demo部署程序 2. 添加maven依赖 <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.3</version> </dependency> 3. 新建项目,编写程序 package cn.oraclers.storm; import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.HashMap; import java.util.Map; import java.util.Random; public class WordCount { public static class SpoutSource extends BaseRichSpout { Map map; TopologyContext topologyContext; SpoutOutputCollector spoutOutputCollector; Random random; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { map = map; topologyContext = topologyContext; spoutOutputCollector = spoutOutputCollector; random = random; } String[] sentences = new String[]{ 'the cow jumped over the moon', 'an apple a day keeps the doctor away', 'four score and seven years ago', 'snow white and the seven dwarfs', 'i am at two with nature' }; @Override public void nextTuple() { Utils.sleep(1000); for (String sentence:sentences){ spoutOutputCollector.emit(new Values(sentence)); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields('sentence')); } } public static class SplitBoltSource extends BaseRichBolt{ Map map; TopologyContext topologyContext; OutputCollector outputCollector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { map = map; topologyContext = topologyContext; outputCollector = outputCollector; } @Override public void execute(Tuple tuple) { String sentence = tuple.getStringByField('sentence'); String[] words = sentence.split(' '); for (String word:words){ this.outputCollector.emit(new Values(word)); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields('word')); } } public static class SumBoltSource extends BaseRichBolt{ Map map; TopologyContext topologyContext; OutputCollector outputCollector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.map = map; this.topologyContext = topologyContext; this.outputCollector = outputCollector; } Map<String,Integer> mapCount = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple) { String word = tuple.getStringByField('word'); Integer count = mapCount.get(word); if(count == null){ count=0; } count++; mapCount.put(word,count); outputCollector.emit(new Values(word,count)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields('word', 'count')); } } public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout('data_source',new SpoutSource()); builder.setBolt('bolt_split',new SplitBoltSource()).shuffleGrouping('data_source'); builder.setBolt('bolt_sum',new SplitBoltSource()).fieldsGrouping('bolt_split',new Fields('word')); try { Config stormConf = new Config(); stormConf.setDebug(true); StormSubmitter.submitTopology('Clustertopology', stormConf,builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } } } 4. 打包部署topology ./storm jar storm jar sd-1.0-SNAPSHOT.jar cn.oraclers.storm.WordCount 5. 查看结果 两种方式, a. 查看StormUI 注意:一定不要使用IE内核的浏览器,否则看不到Topology Summary 下面的东西!!! b. storm的bin目录下运行 Topology_name Status Num_tasks Num_workers Uptime_secs ------------------------------------------------------------------- test ACTIVE 28 3 5254 Clustertopology ACTIVE 4 1 83 mytopo ACTIVE 6 3 555 6. 关闭topology a. StormUI上面点选要关闭的topology,如test,然后在新页面的Topology actions中选kill b. 运行./storm kill test |
|
来自: richard_168 > 《待分类》