配色: 字号:
04-Kafka管理工具、flume整合
2022-09-15 | 阅:  转:  |  分享 
  
Kafka管理工具Flume-Kafka整合2019/4/18目录Kafka生产者和消费者编程21、在eclipse中创建maven项目b
igdata-kafka-2.1122、编辑pom.xml23、API介绍34、编写生产者及写分区74、编写消费者117、分
别运行生产者和消费者17Flume-Kafka整合18flume整合kafka(kafkasource)18flume整合kaf
ka(kafkasink)19flume整合kafka(kafkachannel)20其它常用命令21Kafka管理工具Kaf
kamanager配置manager1、下载kafkamanager:https://github.com/yahoo/ka
fka-managerhttps://github.com/yahoo/kafka-manager2、解压,进入conf目录配置:
kafka-manager.zkhosts="localhost:2181"3、运行./sbtcleandist命令将创建一个
zip文件,可用于部署应用程序这个命令执行的会很慢,而且可能会失败,要反复执行几次编译成功后,会在target/universal
下生成一个zip包。或直接使用已经编译好的zip包4、nohupbin/kafka-manager-Dconfig.file=
./conf/application.conf-Dhttp.port=8080&//端口自定义也可以指定jdk:bin/
kafka-manager-java-home/usr/local/oracle-java-85、访问web界面增加一个kaf
ka集群配置查看borker信息查看topic信息创建topci信息KafkaOffsetMonitor配置monitor下载:
https://github.com/quantifind/KafkaOffsetMonitor/releaseshttps://
github.com/quantifind/KafkaOffsetMonitor/releases运行:java-cpKafk
aOffsetMonitor-assembly-0.2.0.jarcom.quantifind.kafka.offsetapp.
OffsetGetterWeb--zklocalhost--port8088--refresh5.seconds-
-retain1.days参数说明:zk:zookeeper主机地址,如果有多个,用逗号隔开port:应用程序端口refre
sh:应用程序在数据库中刷新和存储点的频率retain:在db中保留多长时间dbName:保存的数据库文件名,默认为offs
etapp访问web界面Flume-Kafka整合flume整合kafka(kafkasource)在flume下创建kafka
[kafka-r.conf]Kafka的消费当flume的source。a1.sources=r1a1.sinks=k1a
1.channels=c1#Describe/configurethesourcea1.sources.r1.type=
org.apache.flume.source.kafka.KafkaSourcea1.sources.r1.zookeeperC
onnect=datanode1:2181a1.sources.r1.topic=testa1.sources.r1.groupI
d=g1a1.sources.r1.kafka.consumer.timeout.ms=100#Describethesin
ka1.sinks.k1.type=logger#Useachannelwhichbufferseventsin
memorya1.channels.c1.type=memorya1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100#Bindthesourceandsin
ktothechannela1.sources.r1.channels=c1a1.sinks.k1.channel=
c12、开启flume$>flume-ngagent--conf-filekafka-r.conf-na1-Dflum
e.root.logger=INFO,console3、启动kafka生产者$>kafka-console-producer.sh
--broker-listdatanode1:9092--topictestflume整合kafka(kafkasink
)在flume下创建kafka(kafka-s.conf)a1.sources=r1a1.sinks=k1a1.chann
els=c1#Describe/configurethesourcea1.sources.r1.type=netca
ta1.sources.r1.bind=localhosta1.sources.r1.port=8888#Descri
bethesinka1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic=testa1.sinks.k1.brokerList=datanode1:9092a1.sin
ks.k1.requiredAcks=1a1.sinks.k1.batchSize=20#Useachannelwhich
bufferseventsinmemorya1.channels.c1.type=memorya1.channels.
c1.capacity=1000a1.channels.c1.transactionCapacity=100#Bind
thesourceandsinktothechannela1.sources.r1.channels=c1a1.s
inks.k1.channel=c12、启动kafka消费者$>kafka-console-consumer.sh--zoo
keeperdatanode1:2181--topictest3、开启flume$>flume-ngagent--con
f-filekafka-s.conf-na14、$>nclocalhost8888flume整合kafka(kafka
channel)1、在flume下创建kafka(kafka-c.conf)a1.sources=r1a1.sinks=k
1a1.channels=c1#Describe/configurethesourcea1.sources.r1.typ
e=netcata1.sources.r1.bind=localhosta1.sources.r1.port=8888
#Describethesinka1.sinks.k1.type=logger#channela1.channels
.c1.type=org.apache.flume.channel.kafka.KafkaChannela1.channels.c
1.capacity=10000a1.channels.c1.transactionCapacity=1000a1.channel
s.c1.brokerList=datanode1:9092,datanode2:9092,datanode3:9092a1.ch
annels.c1.topic=testa1.channels.c1.zookeeperConnect=datanode1:218
1a1.channels.c1.parseAsFlumeEvent=false#Bindthesourceandsink
tothechannela1.sources.r1.channels=c1a1.sinks.k1.channel=c
12、开启flume$>flume-ngagent--conf-filekafka-c.conf-na1-Dflume
.root.logger=INFO,console3、$>nclocahost8888其它常用命令$>kafka-consum
er-groups.sh--zookeeperdatanode1:2181-list//列出消费者组信息$>kafka-
consumer-offset-checker.sh--zookeeperdatanode1:2181--groupg1
--topictest//列出消费者偏移量信息$>kafka-topics.sh--zookeeperdatanode1:2181--delete--topictest//删除主题$>kafka-topics.sh--zookeeperdatanode1:2181--describe--topictest//查看主题魁魁语录:打起黄莺儿,莫教枝上啼。啼时惊妾梦,不得到辽西。魁魁语录:绿蚁新醅酒,红泥小火炉。晚来天欲雪,能饮一杯无江湖一哥版权所有江湖一哥版权所有
献花(0)
+1
(本文系王守奎的图...原创)