Kafka管理工具Flume-Kafka整合2019/4/18目录Kafka生产者和消费者编程21、在eclipse中创建maven项目b igdata-kafka-2.1122、编辑pom.xml23、API介绍34、编写生产者及写分区74、编写消费者117、分 别运行生产者和消费者17Flume-Kafka整合18flume整合kafka(kafkasource)18flume整合kaf ka(kafkasink)19flume整合kafka(kafkachannel)20其它常用命令21Kafka管理工具Kaf kamanager配置manager1、下载kafkamanager:https://github.com/yahoo/ka fka-managerhttps://github.com/yahoo/kafka-manager2、解压,进入conf目录配置: kafka-manager.zkhosts="localhost:2181"3、运行./sbtcleandist命令将创建一个 zip文件,可用于部署应用程序这个命令执行的会很慢,而且可能会失败,要反复执行几次编译成功后,会在target/universal 下生成一个zip包。或直接使用已经编译好的zip包4、nohupbin/kafka-manager-Dconfig.file= ./conf/application.conf-Dhttp.port=8080&//端口自定义也可以指定jdk:bin/ kafka-manager-java-home/usr/local/oracle-java-85、访问web界面增加一个kaf ka集群配置查看borker信息查看topic信息创建topci信息KafkaOffsetMonitor配置monitor下载: https://github.com/quantifind/KafkaOffsetMonitor/releaseshttps:// github.com/quantifind/KafkaOffsetMonitor/releases运行:java-cpKafk aOffsetMonitor-assembly-0.2.0.jarcom.quantifind.kafka.offsetapp. OffsetGetterWeb--zklocalhost--port8088--refresh5.seconds- -retain1.days参数说明:zk:zookeeper主机地址,如果有多个,用逗号隔开port:应用程序端口refre sh:应用程序在数据库中刷新和存储点的频率retain:在db中保留多长时间dbName:保存的数据库文件名,默认为offs etapp访问web界面Flume-Kafka整合flume整合kafka(kafkasource)在flume下创建kafka [kafka-r.conf]Kafka的消费当flume的source。a1.sources=r1a1.sinks=k1a 1.channels=c1#Describe/configurethesourcea1.sources.r1.type= org.apache.flume.source.kafka.KafkaSourcea1.sources.r1.zookeeperC onnect=datanode1:2181a1.sources.r1.topic=testa1.sources.r1.groupI d=g1a1.sources.r1.kafka.consumer.timeout.ms=100#Describethesin ka1.sinks.k1.type=logger#Useachannelwhichbufferseventsin memorya1.channels.c1.type=memorya1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100#Bindthesourceandsin ktothechannela1.sources.r1.channels=c1a1.sinks.k1.channel= c12、开启flume$>flume-ngagent--conf-filekafka-r.conf-na1-Dflum e.root.logger=INFO,console3、启动kafka生产者$>kafka-console-producer.sh --broker-listdatanode1:9092--topictestflume整合kafka(kafkasink )在flume下创建kafka(kafka-s.conf)a1.sources=r1a1.sinks=k1a1.chann els=c1#Describe/configurethesourcea1.sources.r1.type=netca ta1.sources.r1.bind=localhosta1.sources.r1.port=8888#Descri bethesinka1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic=testa1.sinks.k1.brokerList=datanode1:9092a1.sin ks.k1.requiredAcks=1a1.sinks.k1.batchSize=20#Useachannelwhich bufferseventsinmemorya1.channels.c1.type=memorya1.channels. c1.capacity=1000a1.channels.c1.transactionCapacity=100#Bind thesourceandsinktothechannela1.sources.r1.channels=c1a1.s inks.k1.channel=c12、启动kafka消费者$>kafka-console-consumer.sh--zoo keeperdatanode1:2181--topictest3、开启flume$>flume-ngagent--con f-filekafka-s.conf-na14、$>nclocalhost8888flume整合kafka(kafka channel)1、在flume下创建kafka(kafka-c.conf)a1.sources=r1a1.sinks=k 1a1.channels=c1#Describe/configurethesourcea1.sources.r1.typ e=netcata1.sources.r1.bind=localhosta1.sources.r1.port=8888 #Describethesinka1.sinks.k1.type=logger#channela1.channels .c1.type=org.apache.flume.channel.kafka.KafkaChannela1.channels.c 1.capacity=10000a1.channels.c1.transactionCapacity=1000a1.channel s.c1.brokerList=datanode1:9092,datanode2:9092,datanode3:9092a1.ch annels.c1.topic=testa1.channels.c1.zookeeperConnect=datanode1:218 1a1.channels.c1.parseAsFlumeEvent=false#Bindthesourceandsink tothechannela1.sources.r1.channels=c1a1.sinks.k1.channel=c 12、开启flume$>flume-ngagent--conf-filekafka-c.conf-na1-Dflume .root.logger=INFO,console3、$>nclocahost8888其它常用命令$>kafka-consum er-groups.sh--zookeeperdatanode1:2181-list//列出消费者组信息$>kafka- consumer-offset-checker.sh--zookeeperdatanode1:2181--groupg1 --topictest//列出消费者偏移量信息$>kafka-topics.sh--zookeeperdatanode1:2181--delete--topictest//删除主题$>kafka-topics.sh--zookeeperdatanode1:2181--describe--topictest//查看主题魁魁语录:打起黄莺儿,莫教枝上啼。啼时惊妾梦,不得到辽西。魁魁语录:绿蚁新醅酒,红泥小火炉。晚来天欲雪,能饮一杯无江湖一哥版权所有江湖一哥版权所有 |
|