Sink、Channel2018-05-15版本修改人修改记录修改时间V1.0王守奎编写2018/5/15目录Sink4hdfssink4 avrosink级联应用(接力跳)5File_rollsink6HBasesink7AsyncHbasesink8自定义s ink9Channel11Jdbcchannel12Filechannel15可溢出的内存通道16Channelselect or17Sinkhdfssink1、hdfs-s.conf#componentsa1.sources=r1a1.channe ls=c1a1.sinks=k1#sourcea1.sources.r1.type=netcata1.sources. r1.bind=0.0.0.0a1.sources.r1.port=8888#sinka1.sinks=k1a1.si nks.k1.type=hdfsa1.sinks.k1.channel=c1a1.sinks.k1.hdfs.path=/home /hadoop/a/%y-%m-%d/%H/%M/%S#a1.sinks.k1.hdfs.path=hdfs://nameno de/flume/webdataa1.sinks.k1.hdfs.filePrefix=events-a1.sinks.k1. hdfs.round=truea1.sinks.k1.hdfs.roundValue=10a1.sinks.k1.hdfs .roundUnit=minutea1.sinks.k1.hdfs.useLocalTimeStamp=true#chan nela1.channels.c1.type=memory#binda1.sources.r1.channels=c1启动:$> flume-ngagent--conf-filehdfs-s.conf-na1-Dflume.root.logger= INFO,console3、$>nclocalhost88884、在hdfs上查看生成的文件$>hdfsdfs-ls-R /avrosink级联应用(接力跳)1、编写avro-r.conf#componentsa1.sources=r1a1.c hannels=c1a1.sinks=k1a1.sources.r1.type=avroa1.sources.r1.b ind=0.0.0.0a1.sources.r1.port=4444a1.sinks.k1.type=logger#cha nnela1.channels.c1.type=memory#binda1.sources.r1.channels=c1a1.si nks.k1.channel=c12、编写avro-s.conf#componentsa1.sources=r1a1.chan nels=c1a1.sinks=k1a1.sources.r1.type=netcata1.sources.r1.bi nd=0.0.0.0a1.sources.r1.port=8888a1.sinks.k1.type=avroa1.sink s.k1.hostname=localhosta1.sinks.k1.port=4444#channela1.channels.c 1.type=memory#binda1.sources.r1.channels=c1a1.sinks.k1.channel=c1 3、启动$>flume-ngagent--conf-fileavro-r.conf-na1-Dflume.root.l ogger=INFO,console$>flume-ngagent--conf-fileavro-s.conf-na14 、$>nclocalhost8888$flume>helloworldFile_rollsink本地文件系统滚动存储日志,一 分钟生成一个,生成后即可记录数据。1、编写file_roll_s.confa1.sources=r1a1.sinks=k1 a1.channels=c1#Describe/configurethesourcea1.sources.r1.type =netcata1.sources.r1.bind=0.0.0.0a1.sources.r1.port=8888#D escribethesinka1.sinks.k1.type=file_rolla1.sinks.k1.sink.direct ory=/home/hadoop/a#Useachannelwhichbufferseventsinmemor ya1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.cha nnels.c1.transactionCapacity=100#Bindthesourceandsinktot hechannela1.sources.r1.channels=c1a1.sinks.k1.channel=c12、启动 $>flume-ngagent--conf-filefile_roll_s.conf-na13、$>nclocalho st8888$flume>helloworldHBasesink0、建立t1表包含cf1列族$hbase>create''ns :t1'',''cf1''1、编写hbase_s.confa1.sources=r1a1.sinks=k1a1.channels =c1#Describe/configurethesourcea1.sources.r1.type=netcata1 .sources.r1.bind=0.0.0.0a1.sources.r1.port=8888#Describet hesinka1.sinks.k1.type=hbasea1.sinks.k1.table=ns1:t1a1.sinks.k1. columnFamily=cf1a1.sinks.k1.serializer=org.apache.flume.sink.hbas e.SimpleHbaseEventSerializer#Useachannelwhichbuffersevents inmemorya1.channels.c1.type=memorya1.channels.c1.capacity=10 00a1.channels.c1.transactionCapacity=100#Bindthesourceands inktothechannela1.sources.r1.channels=c1a1.sinks.k1.channel =c12、cp/soft/hbase/lib/hbase-client-1.2.4.jar/soft/flume/lib3、 启动$>flume-ngagent--conf-filehbase_s.conf-na14、$>nclocalhost 88885、在hbase中验证$hbase>scan''t1''AsyncHbasesink使用异步模式写入HBASE数据库0、 建立t1表包含cf1列族$hbase>create''t1'',''cf1''1、编写asynchbase_s.confa1.sourc es=r1a1.sinks=k1a1.channels=c1#Describe/configurethesour cea1.sources.r1.type=netcata1.sources.r1.bind=0.0.0.0a1.sour ces.r1.port=8888#Describethesinka1.sinks.k1.type=asynchbase a1.sinks.k1.table=t1a1.sinks.k1.columnFamily=cf1a1.sinks.k1.seria lizer=org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer #Useachannelwhichbufferseventsinmemorya1.channels.c1.type =memorya1.channels.c1.capacity=1000a1.channels.c1.transaction Capacity=100#Bindthesourceandsinktothechannela1.sources .r1.channels=c1a1.sinks.k1.channel=c12、启动$>flume-ngagent--c onf-fileasynchbase_s.conf-na13、$>nclocalhost8888$flume>hello world4、在hbase中验证$hbase>scan''t1''//在hbase中产生pCol列自定义sink1、创建MySin k类packagecom.bm.sink;importorg.apache.flume.Channel;importorg. apache.flume.Event;importorg.apache.flume.EventDeliveryException ;importorg.apache.flume.Sink.Status;importorg.apache.flume.Tran saction;importorg.apache.flume.sink.AbstractSink;/Copyright (C)2018编码界的小菜鸟作者:王守奎2018年11月21日下午5:30:42/publicclassMyS inkextendsAbstractSink{publicStatusprocess()throwsEventDel iveryException{Channelc=getChannel();Transactiontx=c.getTr ansaction();tx.begin();try{Evente=c.take();if(e!=null){St ringstr=newString(e.getBody());System.out.println(str);}tx.co mmit();returnStatus.READY;}catch(Exceptione){tx.rollback();} finally{tx.close();}returnStatus.BACKOFF;}}2、导出jar,复制到flume/li b/下3、编写custom_s.confa1.sources=r1a1.sinks=k1a1.channels=c1# Describe/configurethesourcea1.sources.r1.type=netcata1.sourc es.r1.bind=0.0.0.0a1.sources.r1.port=8888#Describethesinka 1.sinks.k1.type=com.bm.sink.MySink#Useachannelwhichbufferse ventsinmemorya1.channels.c1.type=memorya1.channels.c1.capacit y=1000a1.channels.c1.transactionCapacity=100#Bindthesource andsinktothechannela1.sources.r1.channels=c1a1.sinks.k1.ch annel=c14、启动$>flume-ngagent--conf-filecustom_s.conf-na15、$ >nclocalhost8888ChannelJdbcchannel1、编写mysql_c.confa1.sources= r1a1.sinks=k1a1.channels=c1#Describe/configurethesourcea1 .sources.r1.type=netcata1.sources.r1.bind=localhosta1.sources .r1.port=44444#Describethesinka1.sinks.k1.type=logger#Use achannelwhichbufferseventsinmemorya1.channels.c1.type=com. bm.channel.MyMySqlChannel#Bindthesourceandsinktothechanne la1.sources.r1.channels=c1a1.sinks.k1.channel=c12、建表$>mysql$m ysql>createdatabasetest;$mysql>createtablet1(msgvarchar(50)) ;$mysql>altertablet1addcolumnidintprimarykeyauto_increme nt;3、创建mysql通道packagecom.bm.channel;importjava.sql.Connection;i mportjava.sql.DriverManager;importjava.sql.PreparedStatement;im portjava.sql.ResultSet;importorg.apache.flume.ChannelException; importorg.apache.flume.Event;importorg.apache.flume.Transaction ;importorg.apache.flume.channel.AbstractChannel;importorg.apach e.flume.event.SimpleEvent;/Copyright(C)2018编码界的小菜鸟作者:王守奎 2018年11月21日下午5:50:55/publicclassMyMySqlChannelextendsAbst ractChannel{static{try{Class.forName("com.mysql.jdbc.Driver"); }catch(ClassNotFoundExceptione){e.printStackTrace();}}public synchronizedvoidstart(){super.start();}publicvoidput(Evente vent)throwsChannelException{Transactiontx=getTransaction(); try{tx.begin();Connectionconn=DriverManager.getConnection("jd bc:mysql://localhost:3306/test","root","p@ssw0rd");Stringmsg= newString(event.getBody());PreparedStatementppst=conn.prepar eStatement("insertintot1(msg)values(''"+msg+"'')");ppst.exec uteUpdate();ppst.close();tx.commit();}catch(Exceptione){tx.ro llback();}tx.close();}publicEventtake()throwsChannelException {Transactiontx=getTransaction();try{tx.begin();Connectionco nn=DriverManager.getConnection("jdbc:mysql://localhost:3306/tes t","root","p@ssw0rd");conn.setAutoCommit(false);PreparedStateme ntppst=conn.prepareStatement("selectid,msgfromt1");ResultS etrs=ppst.executeQuery();intid=0;Stringmsg=null;if(rs.n ext()){id=rs.getInt(1);msg=rs.getString(2);}conn.prepareStat ement("deletefromt1whereid="+id).executeUpdate();conn.com mit();rs.close();ppst.close();conn.close();SimpleEvente=newSi mpleEvent();e.setBody(msg.getBytes());returne;}catch(Exception e){tx.rollback();}tx.close();returnnull;}publicTransactionge tTransaction(){returnnewTransaction(){publicvoidbegin(){}p ublicvoidcommit(){}publicvoidrollback(){}publicvoidclose( ){}};}}4、导出jar,并复制到/soft/flume/lib下5、$>flume-ngagent--conf-fil emysql_c.conf-na16、$>nclocalhost44444Filechannel1、编写file_c. confa1.sources=r1a1.sinks=k1a1.channels=c1#Describe/config urethesourcea1.sources.r1.type=netcata1.sources.r1.bind=loc alhosta1.sources.r1.port=44444#Describethesinka1.sinks.k1.ty pe=logger#Useachannela1.channels.c1.type=filea1.channels .c1.checkpointDir=/home/hadoop/flume/checkpointa1.channels.c1.da taDirs=/home/hadoop/flume/data#Bindthesourceandsinktothe channela1.sources.r1.channels=c1a1.sinks.k1.channel=c12、$>flu me-ngagent--conf-filefile_c.conf-na13、$>nclocalhost44444可溢 出的内存通道1、编写spillablemem_c.confa1.sources=r1a1.sinks=k1a1.chann els=c1#Describe/configurethesourcea1.sources.r1.type=netca ta1.sources.r1.bind=localhosta1.sources.r1.port=44444#Descri bethesinka1.sinks.k1.type=logger#Useachannela1.channels.c 1.type=SPILLABLEMEMORYa1.channels.c1.memoryCapacity=10000a1.chann els.c1.overflowCapacity=1000000a1.channels.c1.byteCapacity=800000 a1.channels.c1.checkpointDir=/home/hadoop/flume/checkpointa1.cha nnels.c1.dataDirs=/home/hadoop/flume/data#Bindthesourceands inktothechannela1.sources.r1.channels=c1a1.sinks.k1.channel =c12、$>flume-ngagent--conf-filespillablemem_c.conf-na13、$>n clocalhost44444$flume>helloworldChannelselector复制通道选择器(Replic atingChannelSelector)每个通道都要存放副本,如下图,Replicating会将source过来的even ts发往所有channel,这是默认的ChannelSelector。1、编写rep_selector.confa1.source s=r1a1.sinks=k1k2k3a1.channels=c1c2c3a1.sources.r1.type= netcata1.sources.r1.bind=0.0.0.0a1.sources.r1.port=8888#a1.s ources.r1.selector.type=replicating#a1.sources.r1.selector.optio nal=c3a1.sinks.k1.type=loggera1.sinks.k2.type=loggera1.sinks. k3.type=loggera1.channels.c1.type=memorya1.channels.c2.type= memorya1.channels.c3.type=memorya1.sources.r1.channels=c1c2c 3a1.sinks.k1.channel=c1a1.sinks.k2.channel=c2a1.sinks.k3.chan nel=c32、$>flume-ngagent--conf-filerep_selector.conf-na13、$ >nclocalhost8888$flume>helloworldMultiplexingchannelselector 复用通道选择器。根据情况不同,选择不同的通道.Multiplexing类型的ChannelSelector会根据Event中Hea der中的某个属性决定分发到哪个Channel。1、编写mul-selector.confa1.sources=r1a1.si nks=k1k2k3k4a1.channels=c1c2c3c4#a1.sources.r1.type=n etcata1.sources.r1.type=avroa1.sources.r1.bind=0.0.0.0a1.sour ces.r1.port=8888a1.sources.r1.selector.type=multiplexinga1.so urces.r1.selector.header=statea1.sources.r1.selector.mapping.CZ =c1a1.sources.r1.selector.mapping.US=c2c3a1.sources.r1.selec tor.default=c4a1.sinks.k1.type=loggera1.sinks.k2.type=logge ra1.sinks.k3.type=loggera1.sinks.k4.type=loggera1.channels.c1 .type=memorya1.channels.c2.type=memorya1.channels.c3.type=m emorya1.channels.c4.type=memorya1.sources.r1.channels=c1c2c3c4a1.sinks.k1.channel=c1a1.sinks.k2.channel=c2a1.sinks.k3.channel=c3a1.sinks.k4.channel=c4a)创建头文件和f文件$>echostate=US>header.txt$>echostate=CZ>header1.txt$>echohello>f.txta'')修改mul_selector.conf,能够读取多个文件a1.sources.r1.type=avrob)启动flumeagent$>flume-ngagent--conf-filemul-selector.conf-na1c)使用flume-ngavro-client$>flume-ngavro-client-Hlocalhost-p8888--headerFileheader.txt--filenamef.txt$>flume-ngavro-client-Hlocalhost-p8888-Ff.txt$>flume-ngavro-client-Hlocalhost-p8888--headerFileheader1.txt--filenamef.txt也可以通过如下的方式设置:1、通过如下的方式可以在头信息中增加信息type=12、通过如下的配置,选择channel4A项目组4A项目组魁魁语录:功盖三分国,名成八阵图。江流石不转,遗恨失吞吴江湖一哥版权所有 |
|