- 解压缩 : tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /opt/ ;
- 重命名 : mv /opt/apache-flume-1.6.0-bin/ /opt/flume(可省略) ;
- 复制配置文件 : cp conf/flume-env.sh.template conf/flume-env.sh ;
- 修改conf/flume-env.sh : JAVA_HOME ;
- 复制flume到其他节点 : scp -r …… 。
常见架构
实战需求
- A、B两台机器实时生产日志主要类型为
access.log 、ugcheader.log 、ugctail.log , 要求:
- 把A、B 机器中的access.log、ugcheader.log、ugctail.log 汇总到C机器上然后统一收集到hdfs和Kafka中。
- 在hdfs中要求的目录为:用作离线统计。
/source/access/2016-01-01/ /source/ugcheader/2016-01-01/ /source/ugctail/2016-01-01/
- Kafka分topic , 用作实时分析。
画图
准备
- 启动zookeeper : /opt/zookeeper/bin/zkServer.sh start
- 启动kafka : nohup /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties >>/opt/logs/kafka-server.log 2>&1 &
- 启动hdfs : start-dfs.sh
- 这里查看一下xxo10的进程 :
1 2 3 4 5 6
| [root@xxo10 flume]# jps 1305 Kafka 1252 QuorumPeerMain 1786 Jps 1542 DataNode 1454 NameNode
|
- 创建topic
1 2 3 4 5 6 7
| [root@xxo08 kafka]# bin/kafka-topics.sh --create --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --replication-factor 3 --partition 3 --topic access [root@xxo08 kafka]# bin/kafka-topics.sh --create --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --replication-factor 3 --partition 3 --topic ugchead [root@xxo08 kafka]# bin/kafka-topics.sh --create --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --replication-factor 3 --partition 3 --topic ugctail [root@xxo08 kafka]# bin/kafka-topics.sh --list --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 ###查看 access ugchead ugctail
|
C机器1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| [root@xxo10 flume]# vim conf/hdfs_kafka.conf
#################################### C机器 ######################################### #################################### 两个channel、两个sink ##########################
# Name the components on this agent a1.sources = r1 a1.sinks = kfk fs a1.channels = c1 c2
# varo source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444
# source r1定义拦截器,为消息添加时间戳 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
# kfk sink a1.sinks.kfk.type = org.apache.flume.sink.kafka.KafkaSink #a1.sinks.kfk.topic = mytopic a1.sinks.kfk.brokerList = xxo08:9092,xxo09:9092,xxo10:9092
# hdfs sink a1.sinks.fs.type = hdfs a1.sinks.fs.hdfs.path = hdfs://xxo10:9000/source/%{type}/%Y%m%d a1.sinks.fs.hdfs.filePrefix = events- a1.sinks.fs.hdfs.fileType = DataStream #a1.sinks.fs.hdfs.fileType = CompressedStream #a1.sinks.fs.hdfs.codeC = gzip #不按照条数生成文件 a1.sinks.fs.hdfs.rollCount = 0 #如果压缩存储的话HDFS上的文件达到64M时生成一个文件注意是压缩前大小为64生成一个文件,然后压缩存储。 a1.sinks.fs.hdfs.rollSize = 67108864 a1.sinks.fs.hdfs.rollInterval = 0
# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000
a1.channels.c2.type = memory a1.channels.c2.capacity = 10000 a1.channels.c2.transactionCapacity = 1000
# Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.kfk.channel = c1 a1.sinks.fs.channel = c2
|
- 启动
1 2 3 4 5
| [root@xxo10 ~]# cd /opt/apache-flume/ [root@xxo10 flume]# bin/flume-ng agent --conf conf/ --conf-file conf/hdfs_kafka.conf --name a1 -Dflume.root.logger=INFO,console & ...... ...... ......Component type: SINK, name: kfk started ##启动成功
|
A机器1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| [root@xxo08 flume]# vim conf/hdfs_kafka.conf
#################################### A机器 ######################################### #################################### 3个source ##################################### #################################### 2个拦截器 ###################################### # Name the components on this agent a1.sources = access ugchead ugctail a1.sinks = k1 a1.channels = c1
# 三个sources a1.sources.access.type = exec a1.sources.access.command = tail -F /root/data/access.log
a1.sources.ugchead.type = exec a1.sources.ugchead.command = tail -F /root/data/ugchead.log
a1.sources.ugctail.type = exec a1.sources.ugctail.command = tail -F /root/data/ugctail.log
# sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = xxo10 a1.sinks.k1.port = 44444
# channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000
# interceptor a1.sources.access.interceptors = i1 i2 a1.sources.access.interceptors.i1.type=static a1.sources.access.interceptors.i1.key = type a1.sources.access.interceptors.i1.value = access a1.sources.access.interceptors.i2.type=static a1.sources.access.interceptors.i2.key = topic a1.sources.access.interceptors.i2.value = access
a1.sources.ugchead.interceptors = i1 i2 a1.sources.ugchead.interceptors.i1.type=static a1.sources.ugchead.interceptors.i1.key = type a1.sources.ugchead.interceptors.i1.value = ugchead a1.sources.ugchead.interceptors.i2.type=static a1.sources.ugchead.interceptors.i2.key = topic a1.sources.ugchead.interceptors.i2.value = ugchead
a1.sources.ugctail.interceptors = i1 i2 a1.sources.ugctail.interceptors.i1.type=static a1.sources.ugctail.interceptors.i1.key = type a1.sources.ugctail.interceptors.i1.value = ugctail a1.sources.ugctail.interceptors.i2.type=static a1.sources.ugctail.interceptors.i2.key = topic a1.sources.ugctail.interceptors.i2.value = ugctail
# Bind the source and sink to the channel a1.sources.access.channels = c1 a1.sources.ugchead.channels = c1 a1.sources.ugctail.channels = c1 a1.sinks.k1.channel = c1
|
- 启动A机器
1
| [root@xxo08 flume]# bin/flume-ng agent --conf conf/ --conf-file conf/hdfs_kafka.conf --name a1 -Dflume.root.logger=INFO,console &
|
验证功能
- 我这里启动了一个向
access.log 、ugcheader.log 、ugctail.log 添加数据的java程序:1
| [root@xxo08 ~]# java -cp KafkaFlumeProject_20160519-1.0-SNAPSHOT-jar-with-dependencies.jar com.xxo.utils.Creator
|
查看hdfs的情况
1 2 3 4 5 6
| [root@xxo10 ~]# hdfs dfs -text /source/ugchead/20160523/* | more 1001 221.8.9.6 80 be83f3fd-a218-4f98-91d8-6b4f0bb4558b 750b6203-4a7d-42d5-82e4-906415b70f63 10207 {"ugctype":"consumer", "userId":"40604","coin":"10","number":"2"} 1463685721663 1003 218.75.100.114 ea11f1d2-680d-4645-a52e-74d5f2317dfd 8109eda1-aeac-43fe-94b1-85d2d1934913 20101 {"ugctype":"fav","user Id":"40604","item":"13"} 1463685722666 ......
|
kafka消费者
1 2 3 4
| ########################## 这里查看一下access ############################### [root@xxo09 ~]# /opt/kafka/bin/kafka-console-consumer.sh --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --topic access --from-beginning 1001 218.26.219.186 070c8525-b857-414d-98b6-13134da08401 10201 0 GET /tologin HTTP/1.1 408 /update/pass Mozilla/5.0 (Windows; U; Windows NT 5.1)Gecko/20070803 Firefox/1.5.0.12 1463676319717 ......
|
查看日志: tac /opt/flume/logs/flume.log | more
同步节点
- 【B机器】
1 2 3 4 5 6
| ####################### 拷贝 ################################## [root@xxo08 flume]# scp /opt/flume/conf/hdfs_kafka.conf root@xxo09:/opt/flume/conf/ hdfs_kafka.conf 100% 1803 1.8KB/s 00:00
####################### 远程启动 ############################### [root@xxo08 flume]# ssh root@xxo09 "/opt/flume/bin/flume-ng agent --conf /opt/flume/conf/ --conf-file /opt/flume/conf/hdfs_kafka.conf --name a1" &
|
|