kafka+storm+hbase架构设计
kafka+storm+hbase架构设计:kafka作为分布式消息系统,实时消息系统,有生产者和消费者;storm作为大数据的实时处理系统;hbase是apache hadoop 的数据库,其具有高效的读写性能!这里把kafka生产的数据作为storm的源头spout来消费,经过bolt处理把结果保存到hbase。 基础环境:这里就不介绍了!! hadoop集群(zookeeper) kafka集群 storm集群 1、kafka测试API(包括生产者消费者) 生产者 import java.util.Properties; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class Producer extends Thread { private final kafka.javaapi.producer.Producer<Integer, String> producer; private final String topic; private final Properties props = new Properties(); public Producer(String topic) { props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list","192.168.80.20:9092,192.168.80.21:9092,192.168.80.22:9092"); producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); this.topic = topic; } public void run() { for (int i = 0; i < 2000; i++) { String messageStr = new String("Message_" + i); System.out.println("product:"+messageStr); producer.send(new KeyedMessage<Integer, String>(topic, messageStr)); } } public static void main(String[] args) { Producer producerThread = new Producer(KafkaProperties.topic); producerThread.start(); } } 2、消费者测试API: import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class Consumer extends Thread { private final ConsumerConnector consumer; private final String topic; public Consumer(String topic) { consumer = kafka.consumer.Consumer .createJavaConsumerConnector(createConsumerConfig()); this.topic = topic; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaProperties.zkConnect); props.put("group.id", KafkaProperties.groupId); //props.put("zookeeper.session.timeout.ms", "400"); //props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "60000");// return new ConsumerConfig(props); } // push消费方式,服务端推送过来。主动方式是pull public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer .createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()){ //逻辑处理 System.out.println("consumer:"+new String(it.next().message())); } } public static void main(String[] args) { Consumer consumerThread = new Consumer(KafkaProperties.topic); consumerThread.start(); } } 3、定义kafka消费者的一些常量: public interface KafkaProperties { final static String zkConnect = "192.168.80.20:2181,192.168.80.20:2181,192.168.80.20:2181"; final static String groupId = "group"; final static String topic = "test"; } 4、在进行项目之前准备一些hbase工具类: import java.util.List; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; public interface HBaseDAO { public void save(Put put,String tableName) ; public void insert(String tableName,String rowKey,String family,String quailifer,String value) ; public void save(List<Put>Put ,String tableName) ; public Result getOneRow(String tableName,String rowKey) ; public List<Result> getRows(String tableName,String rowKey_like) ; public List<Result> getRows(String tableName, String rowKeyLike, String cols[]) ; public List<Result> getRows(String tableName,String startRow,String stopRow) ; } import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.PrefixFilter; import HBaseDAO; public class HBaseDAOImp implements HBaseDAO{ HConnection hTablePool = null; public HBaseDAOImp() { Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum","192.168.80.20,192.168.80.21,192.168.80.22"); conf.set("hbase.rootdir", "hdfs://cluster/hbase"); try { hTablePool = HConnectionManager.createConnection(conf) ; } catch (IOException e) { e.printStackTrace(); } } @Override public void save(Put put, String tableName) { // TODO Auto-generated method stub HTableInterface table = null; try { table = hTablePool.getTable(tableName) ; table.put(put) ; } catch (Exception e) { e.printStackTrace() ; }finally{ try { table.close() ; } catch (IOException e) { e.printStackTrace(); } } } @Override public void insert(String tableName, String rowKey, String family, String quailifer, String value) { // TODO Auto-generated method stub HTableInterface table = null; try { table = hTablePool.getTable(tableName) ; Put put = new Put(rowKey.getBytes()); put.add(family.getBytes(), quailifer.getBytes(), value.getBytes()) ; table.put(put); } catch (Exception e) { e.printStackTrace(); }finally { try { table.close() ; } catch (IOException e) { e.printStackTrace(); } } } @Override public void save(List<Put> Put, String tableName) { // TODO Auto-generated method stub HTableInterface table = null; try { table = hTablePool.getTable(tableName) ; table.put(Put) ; } catch (Exception e) { // TODO: handle exception }finally { try { table.close() ; } catch (IOException e) { e.printStackTrace(); } } } @Override public Result getOneRow(String tableName, String rowKey) { // TODO Auto-generated method stub HTableInterface table = null; Result rsResult = null; try { table = hTablePool.getTable(tableName) ; Get get = new Get(rowKey.getBytes()) ; rsResult = table.get(get) ; } catch (Exception e) { e.printStackTrace() ; } finally { try { table.close() ; } catch (IOException e) { e.printStackTrace(); } } return rsResult; } @Override public List<Result> getRows(String tableName, String rowKeyLike) { // TODO Auto-generated method stub HTableInterface table = null; List<Result> list = null; try { table = hTablePool.getTable(tableName) ; PrefixFilter filter = new PrefixFilter(rowKeyLike.getBytes()); Scan scan = new Scan(); scan.setFilter(filter); ResultScanner scanner = table.getScanner(scan) ; list = new ArrayList<Result>() ; for (Result rs : scanner) { list.add(rs) ; } } catch (Exception e) { e.printStackTrace() ; } finally { try { table.close() ; } catch (IOException e) { e.printStackTrace(); } } return list; } public List<Result> getRows(String tableName, String rowKeyLike ,String cols[]) { // TODO Auto-generated method stub HTableInterface table = null; List<Result> list = null; try { table = hTablePool.getTable(tableName) ; PrefixFilter filter = new PrefixFilter(rowKeyLike.getBytes()); Scan scan = new Scan(); for (int i = 0; i < cols.length; i++) { scan.addColumn("cf".getBytes(), cols[i].getBytes()) ; } scan.setFilter(filter); ResultScanner scanner = table.getScanner(scan) ; list = new ArrayList<Result>() ; for (Result rs : scanner) { list.add(rs) ; } } catch (Exception e) { e.printStackTrace() ; } finally { try { table.close() ; } catch (IOException e) { e.printStackTrace(); } } return list; } public List<Result> getRows(String tableName,String startRow,String stopRow) { HTableInterface table = null; List<Result> list = null; try { table = hTablePool.getTable(tableName) ; Scan scan = new Scan() ; scan.setStartRow(startRow.getBytes()) ; scan.setStopRow(stopRow.getBytes()) ; ResultScanner scanner = table.getScanner(scan) ; list = new ArrayList<Result>() ; for (Result rsResult : scanner) { list.add(rsResult) ; } }catch (Exception e) { e.printStackTrace() ; } finally { try { table.close() ; } catch (IOException e) { e.printStackTrace(); } } return list; } public static void main(String[] args) { // TODO Auto-generated method stub HBaseDAO dao = new HBaseDAOImp(); List<Put> list = new ArrayList<Put>(); Put put = new Put("aa".getBytes()); put.add("cf".getBytes(), "name".getBytes(), "zhangsan".getBytes()) ; list.add(put) ; // dao.save(put, "test") ; put.add("cf".getBytes(), "addr".getBytes(), "beijing".getBytes()) ; list.add(put) ; put.add("cf".getBytes(), "age".getBytes(), "30".getBytes()) ; list.add(put) ; put.add("cf".getBytes(), "tel".getBytes(), "13567882341".getBytes()) ; list.add(put) ; dao.save(list, "test"); // dao.save(put, "test") ; // dao.insert("test", "testrow", "cf", "age", "35") ; // dao.insert("test", "testrow", "cf", "cardid", "12312312335") ; // dao.insert("test", "testrow", "cf", "tel", "13512312345") ; } } 下面正式编写简单项目代码
1)实现写kafka生产者:import java.util.Properties; import java.util.Random; import DateFmt; import backtype.storm.utils.Utils; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class Producer extends Thread { private final kafka.javaapi.producer.Producer<Integer, String> producer; private final String topic; private final Properties props = new Properties(); public Producer(String topic) { props.put("serializer.class", "kafka.serializer.StringEncoder");// 字符串消息 props.put("metadata.broker.list", "192.168.80.20:9092,192.168.80.21:9092,192.168.80.22:9092"); producer = new kafka.javaapi.producer.Producer<Integer, String>( new ProducerConfig(props)); this.topic = topic; } public void run() { // order_id,order_amt,create_time,area_id Random random = new Random(); String[] order_amt = { "10.10", "20.10", "30.10","40.0", "60.10" }; String[] area_id = { "1","2","3","4","5" }; int i =0 ; while(true) { i ++ ; String messageStr = i+"\t"+order_amt[random.nextInt(5)]+"\t"+DateFmt.getCountDate(null, DateFmt.date_long)+"\t"+area_id[random.nextInt(5)] ; System.out.println("product:"+messageStr); producer.send(new KeyedMessage<Integer, String>(topic, messageStr)); //Utils.sleep(1000) ; } } public static void main(String[] args) { Producer producerThread = new Producer(KafkaProperties.topic); producerThread.start(); } } 2)这里用到其他时间转换工具类: import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; public class DateFmt { public static final String date_long = "yyyy-MM-dd HH:mm:ss" ; public static final String date_short = "yyyy-MM-dd" ; public static SimpleDateFormat sdf = new SimpleDateFormat(date_short); public static String getCountDate(String date,String patton) { SimpleDateFormat sdf = new SimpleDateFormat(patton); Calendar cal = Calendar.getInstance(); if (date != null) { try { cal.setTime(sdf.parse(date)) ; } catch (ParseException e) { e.printStackTrace(); } } return sdf.format(cal.getTime()); } public static String getCountDate(String date,String patton,int step) { SimpleDateFormat sdf = new SimpleDateFormat(patton); Calendar cal = Calendar.getInstance(); if (date != null) { try { cal.setTime(sdf.parse(date)) ; } catch (ParseException e) { e.printStackTrace(); } } cal.add(Calendar.DAY_OF_MONTH, step) ; return sdf.format(cal.getTime()); } public static Date parseDate(String dateStr) throws Exception { return sdf.parse(dateStr); } public static void main(String[] args) throws Exception{ System.out.println(DateFmt.getCountDate(null, DateFmt.date_short)); //System.out.println(DateFmt.getCountDate("2014-03-01 12:13:14", DateFmt.date_short)); //System.out.println(parseDate("2014-05-02").after(parseDate("2014-05-01"))); } } 3)下面写项目中的kafka消费comsumer: 这里把消费者的消费的数据保存到一个有顺序的队列里!(为了作为storm spout数据的来源)--------------非常重要哦!!!!!! import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.productor.KafkaProperties; public class OrderConsumer extends Thread { private final ConsumerConnector consumer; private final String topic; private Queue<String> queue = new ConcurrentLinkedQueue<String>() ;//有序队列 public OrderConsumer(String topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); this.topic = topic; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaProperties.zkConnect); props.put("group.id", KafkaProperties.groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000");//zookeeper offset偏移量 return new ConsumerConfig(props); } // push消费方式,服务端推送过来。主动方式是pull public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer .createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()){ //逻辑处理 System.out.println("consumer:"+new String(it.next().message())); queue.add(new String(it.next().message())) ; System.err.println("队列----->"+queue); } } public Queue<String> getQueue() { return queue ; } public static void main(String[] args) { OrderConsumer consumerThread = new OrderConsumer(KafkaProperties.Order_topic); consumerThread.start(); } } 4)下面开始写storm部分包括spout和bolt import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import kafka.consumers.OrderConsumer; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class OrderBaseSpout implements IRichSpout { String topic = null; public OrderBaseSpout(String topic) { this.topic = topic ; } /** * 公共基类spout */ private static final long serialVersionUID = 1L; Integer TaskId = null; SpoutOutputCollector collector = null; Queue<String> queue = new ConcurrentLinkedQueue<String>() ; public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("order")) ; } public void nextTuple() { // TODO Auto-generated method stub if (queue.size() > 0) { String str = queue.poll() ; //进行数据过滤 System.err.println("TaskId:"+TaskId+"; str="+str); collector.emit(new Values(str)) ; } } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector ; TaskId = context.getThisTaskId() ; // Thread.currentThread().getId() OrderConsumer consumer = new OrderConsumer(topic) ; consumer.start() ; queue = consumer.getQueue() ; } public void ack(Object msgId) { // TODO Auto-generated method stub } public void activate() { // TODO Auto-generated method stub } public void close() { // TODO Auto-generated method stub } public void deactivate() { // TODO Auto-generated method stub } public void fail(Object msgId) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } } storm有了源头数据数据,该如何处理呢?下面要根据自己公司业务逻辑进行处理,我这里只是简单处理,只是为了把流程走完整而已! 下面有3个bolt: import java.util.Map; import DateFmt; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class AreaFilterBolt implements IBasicBolt { private static final long serialVersionUID = 1L; @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("area_id","order_amt","order_date")); } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } @Override public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } @Override public void execute(Tuple input, BasicOutputCollector collector) { String order = input.getString(0); if(order != null){ String[] orderArr = order.split("\\t"); // ared_id,order_amt,create_time collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short))); System.out.println("--------------》"+orderArr[3]+orderArr[1]); } } @Override public void cleanup() { // TODO Auto-generated method stub } } import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import HBaseDAO; import HBaseDAOImp; import DateFmt; public class AreaAmtBolt implements IBasicBolt{ private static final long serialVersionUID = 1L; Map <String,Double> countsMap = null ; String today = null; HBaseDAO dao = null; @Override public void cleanup() { //??? countsMap.clear() ; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date_area","amt")) ; } @Override public Map<String, Object> getComponentConfiguration() { return null; } @Override public void prepare(Map stormConf, TopologyContext context) { countsMap = new HashMap<String, Double>() ; dao = new HBaseDAOImp() ; //根据hbase里初始值进行初始化countsMap today = DateFmt.getCountDate(null, DateFmt.date_short); countsMap = this.initMap(today, dao); for(String key:countsMap.keySet()) { System.err.println("key:"+key+"; value:"+countsMap.get(key)); } } @Override public void execute(Tuple input, BasicOutputCollector collector) { if (input != null) { String area_id = input.getString(0) ; double order_amt = 0.0; //order_amt = input.getDouble(1) ; try { order_amt = Double.parseDouble(input.getString(1)) ; } catch (Exception e) { System.out.println(input.getString(1)+":---------------------------------"); e.printStackTrace() ; } String order_date = input.getStringByField("order_date") ; if (! order_date.equals(today)) { //跨天处理 countsMap.clear() ; } Double count = countsMap.get(order_date+"_"+area_id) ; if (count == null) { count = 0.0 ; } count += order_amt ; countsMap.put(order_date+"_"+area_id, count) ; System.err.println("areaAmtBolt:"+order_date+"_"+area_id+"="+count); collector.emit(new Values(order_date+"_"+area_id,count)) ; System.out.println("***********"+order_date+"_"+area_id+count); } } public Map<String, Double> initMap(String rowKeyDate, HBaseDAO dao) { Map <String,Double> countsMap = new HashMap<String, Double>() ; List<Result> list = dao.getRows("area_order", rowKeyDate, new String[]{"order_amt"}); for(Result rsResult : list) { String rowKey = new String(rsResult.getRow()); for(KeyValue keyValue : rsResult.raw()) { if("order_amt".equals(new String(keyValue.getQualifier()))) { countsMap.put(rowKey, Double.parseDouble(new String(keyValue.getValue()))) ; break; } } } return countsMap; } } import java.util.HashMap; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import HBaseDAO; import HBaseDAOImp; public class AreaRsltBolt implements IBasicBolt { private static final long serialVersionUID = 1L; Map <String,Double> countsMap = null ; @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map<String, Object> getComponentConfiguration() { return null; } @Override public void prepare(Map stormConf, TopologyContext context) { dao = new HBaseDAOImp() ; countsMap = new HashMap<String, Double>() ; } HBaseDAO dao = null; long beginTime = System.currentTimeMillis() ; long endTime = 0L ; @Override public void execute(Tuple input, BasicOutputCollector collector) { String date_areaid = input.getString(0); double order_amt = input.getDouble(1) ; countsMap.put(date_areaid, order_amt) ; endTime = System.currentTimeMillis() ; if (endTime - beginTime >= 5 * 1000) { for(String key : countsMap.keySet()) { // put into hbase //这里把处理结果保存到hbase中 dao.insert("area_order", key, "cf", "order_amt", countsMap.get(key)+"") ; System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key)); } } } @Override public void cleanup() { } } 最后 main方法: import kafka.productor.KafkaProperties; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import AreaAmtBolt; import AreaFilterBolt; import AreaRsltBolt; import OrderBaseSpout; public class MYTopology { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new OrderBaseSpout(KafkaProperties.topic), 5); builder.setBolt("filterblot", new AreaFilterBolt() , 5).shuffleGrouping("spout") ; builder.setBolt("amtbolt", new AreaAmtBolt() , 2).fieldsGrouping("filterblot", new Fields("area_id")) ; builder.setBolt("rsltolt", new AreaRsltBolt(), 1).shuffleGrouping("amtbolt"); Config conf = new Config() ; conf.setDebug(false); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } }else { //本地测试!!!!!!!!!!!! LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } } 到这里架构基本完成了单独学kafka、storm、hbase、这些东西不难,如何把他们整合起来,这就是不一样!!!!呵呵 我博客里面有讲kafka、storm、hbase的基础内容,欢迎看 |
|