分享

数仓1.1 | 概述| 集群环境搭建

 jasonbetter 2019-07-24

数仓1.1 | 概述| 集群环境搭建

宏观上; 模块调用关系图
复杂问题简单化; 清晰数据结构; 1天数据1层

1. 数据仓库DW

数据来源:
  爬虫 日志采集系统 业务数据 财务系统

数据去向:
  报表系统、用户画像、推荐系统、机器学习、风控系统

项目需求分析

  ① 数据采集平台搭建
  ② 实现用户行为数据仓库的分层搭建
  ③ 实现业务数据仓库的分层搭建
  ④ 针对数据仓库中的数据进行,留存、转化率、GMV(每天交易额)、复购率、活跃等报表行为;

项目框架

技术选型

采集:
方式一: log日志--->flume--->kafka(API)--->hdfs; 方式二: Logstash(读取日志)-->ELK(存储查询)全文检索引擎-sqoop
DataX导数据;  mysql->sqoop

存储:mysql(存储业务--分析结果) ;ES(存、查都很快)<---->HBase(存快,分析慢); S3

计算:Tez(分析hive中指标)&hive; Flink--Spark

查询:Presto,Impala,Kylin

系统架构图:

日志文件| mysql数据表--->分别由flume| sqoop处理--> 分别交给-->kafka| HDFS
由Yarn统一调度
Hive| Presto负责数据查询;
Azkaban任务调度器
最后可视化展示;

系统数据流程:

Web/App埋点行为数据--->log日志服务器(友盟-第三方日志服务器)--->logFile格式->Flume生产-->kafka(kafka(相当于路由池)可以接实时数据、es等)--flume消费-->HDFS
业务交互-->mysql(业务服务器-->Nginx实现负载均衡)->sqoop-->>hdfs--->hive数仓-->把结果存储到mysql

复制代码

框架版本选型
产品            版本
Hadoop          2.7.2Flume           1.7.0Kafka           0.11.0.2Kafka Manager      1.3.3.22Hive            1.2.1Sqoop           1.4.6MySQL           5.6.24Azkaban         2.5.0Java           1.8Zookeeper        3.4.10Presto          0.189集群资源规划设计
            服务器hadoop101    服务器hadoop102    服务器hadoop103
HDFS           NameNode      DataNode          DataNode        
            DataNode                SecondaryNameNode Yarn        NodeManager Resourcemanager   NodeManager
                        NodeManager Zookeeper      Zookeeper Zookeeper      Zookeeper Flume(采集日志)    Flume      Flume Kafka         Kafka     Kafka         Kafka Flume(消费Kafka)                    Flume Hive          Hive MySQL         MySQL Presto         Presto

复制代码

2. 数据生成模块

埋点数据--想记录的数据(web端、app端):
产品字段ap(产品字段可以有多个app)
①公共字段 所有的事件都需要记录的字段,公共的; <<-cm-->>AppBase 
  cm(公共字段基本所有安卓手机都包含的字段); cm公共字段;json对象
  et事件; et事件字段:json数组

②业务字段(埋点上报的字段,有具体的业务类型, 有(用户)具体的行为;)

日志格式:
  时间戳|json字符串
  cm:公共字段
  et:事件(日志)字段(用户行为--针对每一个事件)

 事件日志的设计:

 ①商品列表页(loading)

 View Code

②商品点击(display)

 View Code

③商品详情页(newsdetail)详情页从哪来

 View Code

④广告(ad)

 View Code

⑤消息通知(notification)

 View Code

⑥用户前台活跃(active_foreground)

 View Code

⑦用户后台活跃(active_background)

 View Code

⑧ 评论(comment)

 View Code

⑨收藏(favorites)

 View Code

10 点赞(praise)

 View Code

11 错误日

 View Code

12启动日志数据start  action=1可以算成前台活跃

 View Code

sdk软件开发工具
12个主题(1个appbase公共日志)对应12张表(12张(12个bean对象,再加一个公共的即共13个bean对象)用户行为表), 1张启动日志表;  8张业务表; 数仓分4层; 20*4=80张表;
启动日志1张表-->离线和实时; 需要写flume的拦截器
事件日志kafka的事件event主题 11个; 分的越细越灵活,

启动日志-1类
事件日志-11类

复制代码

启动日志页面:1552739869506|{"cm":
{"ln":"-62.5","sv":"V2.8.9","os":"8.2.7","g":"6N617W86@gmail.com","mid":"999","nw":"3G","l":"en","vc":"18","hw":"640*960","ar":"MX","uid":"999","t":"1552692232488","la":"-4.9","md":"HTC-8","vn":"1.0.3","ba":"HTC","sr":"I"},"ap":"gmall","et":[{"ett":"1552655708510",        "en":"display",        "kv":{"goodsid":"245","action":"1","extend1":"2","place":"4","category":"20"}},
      {"ett":"1552683751477",        "en":"ad",        "kv":{"entry":"3","show_style":"3","action":"5","detail":"325","source":"2","behavior":"1","content":"1","newstype":"9"}},
      {"ett":"1552670223504",        "en":"active_foreground",        "kv":{"access":"","push_id":"3"}},
     {"ett":"1552735759451",        "en":"active_background","kv":{"active_source":"2"}}]}

复制代码

复制代码

将生成的jar包log-collector-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷贝到hadoop101服务器上,
并同步到hadoop102的/opt/module路径下 [kris@hadoop101 module]$ xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar 在hadoop102上执行jar程序 [kris@hadoop101 module]$ java -classpath log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.atguigu.appclient.AppMain >/opt/module/test.log 在/tmp/logs路径下查看生成的日志文件 [kris@hadoop101 module]$ cd /tmp/logs/[kris@hadoop101 logs]$ ls

复制代码

Linux环境变量配置:

(1)修改/etc/profile文件:所有用户的Shell都有权使用这些环境变量。
(2)修改~/.bashrc文件:针对某一个特定的用户,如果你需要给某个用户权限使用这些环境变量,你只需要修改其个人用户主目录下的.bashrc文件就可以了。
(3)配置登录远程服务器立即source一下环境变量

[kris@hadoop101 ~]$ cat /etc/profile >> .bashrc
[kris@hadoop102 ~]$ cat /etc/profile >> .bashrc
[kris@hadoop103 ~]$ cat /etc/profile >> .bashrc

日志生成集群启动脚本

 View Code

集群时间同步修改脚本

 View Code

集群所有进程查看脚本; 在/home/kris/bin目录下创建脚本xcall.sh

 View Code

3. 集群的搭建

Hadoop安装

            服务器hadoop101    服务器hadoop102    服务器hadoop103
HDFS           NameNode      DataNode          DataNode        
            DataNode                SecondaryNameNode Yarn        NodeManager Resourcemanager   NodeManager
                       NodeManager

https://www.cnblogs.com/shengyang17/p/10274391.html

添加LZO支持包

输入端采用压缩DEFLATE(deflate)压缩
mapper输出之后采用LZO或snappy
reducer输出之后gzip或bzip2

复制代码

1)下载后的文件名是hadoop-lzo-master,它是一个zip格式的压缩包,先进行解压,然后用maven编译。生成hadoop-lzo-0.4.202)将编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/
[kris@hadoop101 software]$ mv hadoop-lzo-0.4.20.jar /opt/module/hadoop-2.7.2/share/hadoop/common/ [kris@hadoop101 common]$ lshadoop-lzo-0.4.20.jar3)同步hadoop-lzo-0.4.20.jar到hadoop103、hadoop104[kris@hadoop101 common]$ xsync hadoop-lzo-0.4.20.jar

复制代码

2 添加配置
1)core-site.xml增加配置支持LZO压缩

 View Code

2)同步core-site.xml到hadoop102、hadoop103
[kris@hadoop101 hadoop]$ xsync core-site.xml

两种压缩方式配置一种即可

配置Hadoop支持Snappy压缩

1)将编译后支持Snappy压缩的Hadoop jar包解压缩,并将lib/native目录中所有文件上传到hadoop102的/opt/module/hadoop-2.7.2/lib/native目录。

2)重新启动Hadoop。

3)检查支持的压缩方式

[kris@hadoop101 native]$ hadoop checknativehadoop:  true /opt/module/hadoop-2.7.2/lib/native/libhadoop.so
zlib:    true /lib64/libz.so.1snappy:  true /opt/module/hadoop-2.7.2/lib/native/libsnappy.so.1lz4:     true revision:99bzip2:   false

Zookeeper安装

              服务器hadoop101    服务器hadoop102    服务器hadoop103
Zookeeper            Zookeeper        Zookeeper         Zookeeper

详细安装见:

https://www.cnblogs.com/shengyang17/p/10325484.html

zookeeper集群启动脚本;

chmod 777 zk.sh

 View Code

Flume安装

https://flume./releases/content/1.7.0/FlumeUserGuide.html  可使用ctrl+F搜索

            服务器hadoop101    服务器hadoop102   服务器hadoop103
Flume(采集日志)    Flume          Flume    

详细安装见:

https://www.cnblogs.com/shengyang17/p/10405979.html

TailDirSource是Flume 1.7提供的Source组件,在1.6中并没有。

Flume直接读log日志的数据,log日志的格式是app-yyyy-mm-dd.log。

Flume的具体配置如下:

       (1)在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件

[kris@hadoop101 conf]$ vim file-flume-kafka.conf

 View Code

Flume拦截器

本项目中自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。

ETL拦截器主要用于,过滤时间戳不合法和json数据不完整的日志

日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同topic。

拦截器打包之后,只需要单独包,不需要将依赖的包上传。依赖包在flume的lib目录下面已经存在了。打包之后要放入flume的lib文件夹下面。

需要先将打好的包放入到hadoop101的/opt/module/flume/lib文件夹下面。

[kris@hadoop101 lib]$ ls | grep interceptor

  flume-interceptor-1.0-SNAPSHOT.jar

分发flume到hadoop102、hadoop103

[kris@hadoop101 module]$ xsync flume/[kris@hadoop101 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/file-flume-kafka.conf &

日志采集Flume启动停止脚本

roundValue:30s数据滚动一次;开发中一般1/h滚动一次 ;  logFile日志保存30天;

 在/home/kris/bin目录下创建脚本f1.sh;并添加执行权限;chmod +x f1.sh

 View Code

nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。

/dev/null代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称“黑洞”。

Flume(hadoop103)消费Kafka数据写到HDFS

1)在hadoop103的/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件

[kris@hadoop103 conf]$ vim kafka-flume-hdfs.conf  ;配置了不产生大量小文件!

 View Code

日志消费Flume启动停止脚本

2)在/home/kris/bin目录下创建脚本f2.sh;并chmod +x f2.sh

[kris@hadoop101 bin]$ vim f2.sh

 View Code

最快消费(最大吞吐量),消费> 生产;kafka可对接ES等

 Kafka安装

详细安装见:

https://www.cnblogs.com/shengyang17/p/10443115.html

kafka启动关闭脚本:

 View Code

注意:启动Kafka时要先开启JMX端口,是用于后续KafkaManager监控。

Kafka Manager安装

详细见:

https://www.cnblogs.com/shengyang17/p/10459101.html

启动KafkaManager

[kris@hadoop101 kafka-manager-1.3.3.22]$ 
nohup bin/kafka-manager   -Dhttp.port=7456 >/opt/module/kafka-manager-1.3.3.22/start.log 2>&1 &

在浏览器中打开 http://hadoop101:7456

至此,就可以查看整个Kafka集群的状态,包括:Topic的状态、Brokers的状态、Cosumer的状态。

在kafka的/opt/module/kafka-manager-1.3.3.22/application.home_IS_UNDEFINED 目录下面,可以看到kafka-manager的日志。

Kafka Manager启动停止脚本

1)在/home/kris/bin目录下创建脚本km.sh; chmod +x km.sh

[kris@hadoop101 bin]$ vim km.sh

 View Code

 

复制代码

查看所有Kafka Topic
[kris@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --list
bin/kafka-topics.sh --delete --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic_start  ##删除主题
生产消息
[kris@hadoop101 kafka]$ bin/kafka-console-producer.sh \--broker-list hadoop101:9092 --topic topic_start>hello world>kris  kris

消费消息;可以检测下
[kris@hadoop102 kafka]$ bin/kafka-console-consumer.sh \--zookeeper hadoop101:2181 --from-beginning --topic topic_start

复制代码

1)Kafka压测

用Kafka官方自带的脚本,对Kafka进行压测。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。

kafka-consumer-perf-test.sh

kafka-producer-perf-test.sh

2)Kafka Producer压力测试

(1)在/opt/module/kafka/bin目录下面有这两个文件。我们来测试一下

复制代码

[kris@hadoop101 kafka]$ bin/kafka-producer-perf-test.sh  --topic test --record-size 100 --num-recor 100000 --throughput 1000 --producer-props bootstrap.servers=hadoop101:9092,hadoop102:9092,hadoop103:90925000 records sent, 1000.0 records/sec (0.10 MB/sec), 2.6 ms avg latency, 183.0 max latency.5012 records sent, 1002.4 records/sec (0.10 MB/sec), 1.0 ms avg latency, 36.0 max latency.5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.6 ms avg latency, 8.0 max latency.5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.4 ms avg latency, 22.0 max latency.5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.6 ms avg latency, 45.0 max latency.5002 records sent, 1000.2 records/sec (0.10 MB/sec), 0.3 ms avg latency, 3.0 max latency.5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.8 ms avg latency, 27.0 max latency.5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.5 ms avg latency, 54.0 max latency.5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.7 ms avg latency, 60.0 max latency.5003 records sent, 1000.4 records/sec (0.10 MB/sec), 0.4 ms avg latency, 29.0 max latency.5000 records sent, 1000.0 records/sec (0.10 MB/sec), 0.7 ms avg latency, 50.0 max latency.5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.9 ms avg latency, 82.0 max latency.5003 records sent, 1000.2 records/sec (0.10 MB/sec), 0.4 ms avg latency, 32.0 max latency.5000 records sent, 1000.0 records/sec (0.10 MB/sec), 0.8 ms avg latency, 67.0 max latency.5002 records sent, 1000.2 records/sec (0.10 MB/sec), 0.9 ms avg latency, 80.0 max latency.5002 records sent, 1000.0 records/sec (0.10 MB/sec), 0.4 ms avg latency, 18.0 max latency.5000 records sent, 1000.0 records/sec (0.10 MB/sec), 0.9 ms avg latency, 75.0 max latency.5001 records sent, 1000.2 records/sec (0.10 MB/sec), 0.5 ms avg latency, 23.0 max latency.5003 records sent, 1000.2 records/sec (0.10 MB/sec), 0.5 ms avg latency, 26.0 max latency.100000 records sent, 999.950002 records/sec (0.10 MB/sec), 0.72 ms avg latency, 183.00 ms max latency, 0 ms 50th, 1 ms 95th, 3 ms 99th, 44 ms 99.9th

复制代码

 测试生成了多少数据,消费了多少数据;每条信息大小,总共发送的条数;每秒多少条数据;

说明:record-size是一条信息有多大,单位是字节。num-records是总共发送多少条信息。throughput 是每秒多少条信息。

参数解析:本例中一共写入10w条消息,每秒向Kafka写入了0.10MB的数据,平均是1000条消息/秒,每次写入的平均延迟为0.72毫秒,最大的延迟为183毫秒。

Kafka Consumer压力测试

Consumer的测试,如果这四个指标(IO,CPU,内存,网络)都不能改变,考虑增加分区数来提升性能。

[kris@hadoop103 kafka]$ bin/kafka-consumer-perf-test.sh --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic_event --fetch-size 10000 --messages 10000000 --threads 1     start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec2019-03-15 00:04:21:474, 2019-03-15 00:04:21:740, 1.1851, 4.4551, 1492, 5609.0226

参数说明:

--zookeeper 指定zookeeper的链接信息

--topic 指定topic的名称

--fetch-size 指定每次fetch的数据的大小

--messages 总共要消费的消息个数

测试结果说明:

开始测试时间,结束测试时间;最大吞吐率1.1851MB/S;最近每秒消费4.4551MB/S;最大每秒消费1492条;平均每秒消费5609.0226条;

Kafka机器数量计算

Kafka机器数量(经验公式)=2*(峰值生产速度*副本数/100)+1

先要预估一天大概产生多少数据,然后用Kafka自带的生产压测(只测试Kafka的写入速度,保证数据不积压),计算出峰值生产速度。再根据设定的副本数,就能预估出需要部署Kafka的数量。

比如我们采用压力测试测出写入的速度是10M/s一台,峰值的业务数据的速度是50M/s。副本数为2。

Kafka机器数量=2*(50*2/100)+1=3台

采集通道启动/停止脚本

1)在/home/kris/bin目录下创建脚本cluster.sh

[kris@hadoop101 bin]$ vim cluster.sh

 View Code

改时间重新启动集群,因为flume和kafka会去通信看时间,时间偏差大就会挂掉

 Hive&Mysql的安装

详细安装:只在1台节点hadoop101上安装即可

https://www.cnblogs.com/shengyang17/p/10372242.html

Hive运行引擎Tez的安装配置

复制代码

1)下载tez的依赖包:http://tez.2)拷贝apache-tez-0.9.1-bin.tar.gz到hadoop102的/opt/module目录
[kris@hadoop101 module]$ lsapache-tez-0.9.1-bin.tar.gz3)解压缩apache-tez-0.9.1-bin.tar.gz
[kris@hadoop101 module]$ tar -zxvf apache-tez-0.9.1-bin.tar.gz4)修改名称
[kris@hadoop101 module]$ mv apache-tez-0.9.1-bin/ tez-0.9.1

复制代码

在Hive中配置Tez 

1)进入到Hive的配置目录:/opt/module/hive/conf
[kris@hadoop101 conf]$ pwd/opt/module/hive/conf2)在hive-env.sh文件中添加tez环境变量配置和依赖包环境变量配置
[kris@hadoop101 conf]$ vim hive-env.sh添加如下配置
 View Code

3)在hive-site.xml文件中添加如下配置,更改hive计算引擎

<property>
  <name>hive.execution.engine</name>
  <value>tez</value>
</property>

配置Tez

1)在Hive 的/opt/module/hive/conf下面创建一个tez-site.xml文件
[kris@hadoop101 conf]$ pwd/opt/module/hive/conf
[kris@hadoop101 conf]$ vim tez-site.xml
添加如下内容
 View Code

上传Tez到集群

1)将/opt/module/tez-0.9.1上传到HDFS的/tez路径
[kris@hadoop101 conf]$ hadoop fs -mkdir /tez
[kris@hadoop101 conf]$ hadoop fs -put /opt/module/tez-0.9.1/ /tez
[kris@hadoop101 conf]$ hadoop fs -ls /tez/tez/tez-0.9.1

测试

复制代码

1)启动Hive
[kris@hadoop101 hive]$ bin/hive2)创建LZO表
hive (default)> create table student(id int,
name string);3)向表中插入数据
hive (default)> insert into student values(1,"zhangsan");4)如果没有报错就表示成功了
hive (default)> select * from student;1       zhangsan

复制代码

小结
1)运行Tez时检查到用过多内存而被NodeManager杀死进程问题:

这种问题是从机上运行的Container试图使用过多的内存,而被NodeManager kill掉了。

解决方法:

方案一:或者是关掉虚拟内存检查。我们选这个,修改yarn-site.xml;修改完之后要分发

<property>
     <name>yarn.nodemanager.vmem-check-enabled</name>
     <value>false</value>
</property>

方案二:mapred-site.xml中设置Map和Reduce任务的内存配置如下:(value中实际配置的内存需要根据自己机器内存大小及应用情况进行修改)

 View Code

分类: BDwarehouse

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多