配色: 字号:
01-Flume2_Sink、Channel
2022-09-15 | 阅:  转:  |  分享 
  
Sink、Channel2018-05-15版本修改人修改记录修改时间V1.0王守奎编写2018/5/15目录Sink4hdfssink4
avrosink级联应用(接力跳)5File_rollsink6HBasesink7AsyncHbasesink8自定义s
ink9Channel11Jdbcchannel12Filechannel15可溢出的内存通道16Channelselect
or17Sinkhdfssink1、hdfs-s.conf#componentsa1.sources=r1a1.channe
ls=c1a1.sinks=k1#sourcea1.sources.r1.type=netcata1.sources.
r1.bind=0.0.0.0a1.sources.r1.port=8888#sinka1.sinks=k1a1.si
nks.k1.type=hdfsa1.sinks.k1.channel=c1a1.sinks.k1.hdfs.path=/home
/hadoop/a/%y-%m-%d/%H/%M/%S#a1.sinks.k1.hdfs.path=hdfs://nameno
de/flume/webdataa1.sinks.k1.hdfs.filePrefix=events-a1.sinks.k1.
hdfs.round=truea1.sinks.k1.hdfs.roundValue=10a1.sinks.k1.hdfs
.roundUnit=minutea1.sinks.k1.hdfs.useLocalTimeStamp=true#chan
nela1.channels.c1.type=memory#binda1.sources.r1.channels=c1启动:$>
flume-ngagent--conf-filehdfs-s.conf-na1-Dflume.root.logger=
INFO,console3、$>nclocalhost88884、在hdfs上查看生成的文件$>hdfsdfs-ls-R
/avrosink级联应用(接力跳)1、编写avro-r.conf#componentsa1.sources=r1a1.c
hannels=c1a1.sinks=k1a1.sources.r1.type=avroa1.sources.r1.b
ind=0.0.0.0a1.sources.r1.port=4444a1.sinks.k1.type=logger#cha
nnela1.channels.c1.type=memory#binda1.sources.r1.channels=c1a1.si
nks.k1.channel=c12、编写avro-s.conf#componentsa1.sources=r1a1.chan
nels=c1a1.sinks=k1a1.sources.r1.type=netcata1.sources.r1.bi
nd=0.0.0.0a1.sources.r1.port=8888a1.sinks.k1.type=avroa1.sink
s.k1.hostname=localhosta1.sinks.k1.port=4444#channela1.channels.c
1.type=memory#binda1.sources.r1.channels=c1a1.sinks.k1.channel=c1
3、启动$>flume-ngagent--conf-fileavro-r.conf-na1-Dflume.root.l
ogger=INFO,console$>flume-ngagent--conf-fileavro-s.conf-na14
、$>nclocalhost8888$flume>helloworldFile_rollsink本地文件系统滚动存储日志,一
分钟生成一个,生成后即可记录数据。1、编写file_roll_s.confa1.sources=r1a1.sinks=k1
a1.channels=c1#Describe/configurethesourcea1.sources.r1.type
=netcata1.sources.r1.bind=0.0.0.0a1.sources.r1.port=8888#D
escribethesinka1.sinks.k1.type=file_rolla1.sinks.k1.sink.direct
ory=/home/hadoop/a#Useachannelwhichbufferseventsinmemor
ya1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.cha
nnels.c1.transactionCapacity=100#Bindthesourceandsinktot
hechannela1.sources.r1.channels=c1a1.sinks.k1.channel=c12、启动
$>flume-ngagent--conf-filefile_roll_s.conf-na13、$>nclocalho
st8888$flume>helloworldHBasesink0、建立t1表包含cf1列族$hbase>create''ns
:t1'',''cf1''1、编写hbase_s.confa1.sources=r1a1.sinks=k1a1.channels
=c1#Describe/configurethesourcea1.sources.r1.type=netcata1
.sources.r1.bind=0.0.0.0a1.sources.r1.port=8888#Describet
hesinka1.sinks.k1.type=hbasea1.sinks.k1.table=ns1:t1a1.sinks.k1.
columnFamily=cf1a1.sinks.k1.serializer=org.apache.flume.sink.hbas
e.SimpleHbaseEventSerializer#Useachannelwhichbuffersevents
inmemorya1.channels.c1.type=memorya1.channels.c1.capacity=10
00a1.channels.c1.transactionCapacity=100#Bindthesourceands
inktothechannela1.sources.r1.channels=c1a1.sinks.k1.channel
=c12、cp/soft/hbase/lib/hbase-client-1.2.4.jar/soft/flume/lib3、
启动$>flume-ngagent--conf-filehbase_s.conf-na14、$>nclocalhost
88885、在hbase中验证$hbase>scan''t1''AsyncHbasesink使用异步模式写入HBASE数据库0、
建立t1表包含cf1列族$hbase>create''t1'',''cf1''1、编写asynchbase_s.confa1.sourc
es=r1a1.sinks=k1a1.channels=c1#Describe/configurethesour
cea1.sources.r1.type=netcata1.sources.r1.bind=0.0.0.0a1.sour
ces.r1.port=8888#Describethesinka1.sinks.k1.type=asynchbase
a1.sinks.k1.table=t1a1.sinks.k1.columnFamily=cf1a1.sinks.k1.seria
lizer=org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
#Useachannelwhichbufferseventsinmemorya1.channels.c1.type
=memorya1.channels.c1.capacity=1000a1.channels.c1.transaction
Capacity=100#Bindthesourceandsinktothechannela1.sources
.r1.channels=c1a1.sinks.k1.channel=c12、启动$>flume-ngagent--c
onf-fileasynchbase_s.conf-na13、$>nclocalhost8888$flume>hello
world4、在hbase中验证$hbase>scan''t1''//在hbase中产生pCol列自定义sink1、创建MySin
k类packagecom.bm.sink;importorg.apache.flume.Channel;importorg.
apache.flume.Event;importorg.apache.flume.EventDeliveryException
;importorg.apache.flume.Sink.Status;importorg.apache.flume.Tran
saction;importorg.apache.flume.sink.AbstractSink;/Copyright
(C)2018编码界的小菜鸟作者:王守奎2018年11月21日下午5:30:42/publicclassMyS
inkextendsAbstractSink{publicStatusprocess()throwsEventDel
iveryException{Channelc=getChannel();Transactiontx=c.getTr
ansaction();tx.begin();try{Evente=c.take();if(e!=null){St
ringstr=newString(e.getBody());System.out.println(str);}tx.co
mmit();returnStatus.READY;}catch(Exceptione){tx.rollback();}
finally{tx.close();}returnStatus.BACKOFF;}}2、导出jar,复制到flume/li
b/下3、编写custom_s.confa1.sources=r1a1.sinks=k1a1.channels=c1#
Describe/configurethesourcea1.sources.r1.type=netcata1.sourc
es.r1.bind=0.0.0.0a1.sources.r1.port=8888#Describethesinka
1.sinks.k1.type=com.bm.sink.MySink#Useachannelwhichbufferse
ventsinmemorya1.channels.c1.type=memorya1.channels.c1.capacit
y=1000a1.channels.c1.transactionCapacity=100#Bindthesource
andsinktothechannela1.sources.r1.channels=c1a1.sinks.k1.ch
annel=c14、启动$>flume-ngagent--conf-filecustom_s.conf-na15、$
>nclocalhost8888ChannelJdbcchannel1、编写mysql_c.confa1.sources=
r1a1.sinks=k1a1.channels=c1#Describe/configurethesourcea1
.sources.r1.type=netcata1.sources.r1.bind=localhosta1.sources
.r1.port=44444#Describethesinka1.sinks.k1.type=logger#Use
achannelwhichbufferseventsinmemorya1.channels.c1.type=com.
bm.channel.MyMySqlChannel#Bindthesourceandsinktothechanne
la1.sources.r1.channels=c1a1.sinks.k1.channel=c12、建表$>mysql$m
ysql>createdatabasetest;$mysql>createtablet1(msgvarchar(50))
;$mysql>altertablet1addcolumnidintprimarykeyauto_increme
nt;3、创建mysql通道packagecom.bm.channel;importjava.sql.Connection;i
mportjava.sql.DriverManager;importjava.sql.PreparedStatement;im
portjava.sql.ResultSet;importorg.apache.flume.ChannelException;
importorg.apache.flume.Event;importorg.apache.flume.Transaction
;importorg.apache.flume.channel.AbstractChannel;importorg.apach
e.flume.event.SimpleEvent;/Copyright(C)2018编码界的小菜鸟作者:王守奎
2018年11月21日下午5:50:55/publicclassMyMySqlChannelextendsAbst
ractChannel{static{try{Class.forName("com.mysql.jdbc.Driver");
}catch(ClassNotFoundExceptione){e.printStackTrace();}}public
synchronizedvoidstart(){super.start();}publicvoidput(Evente
vent)throwsChannelException{Transactiontx=getTransaction();
try{tx.begin();Connectionconn=DriverManager.getConnection("jd
bc:mysql://localhost:3306/test","root","p@ssw0rd");Stringmsg=
newString(event.getBody());PreparedStatementppst=conn.prepar
eStatement("insertintot1(msg)values(''"+msg+"'')");ppst.exec
uteUpdate();ppst.close();tx.commit();}catch(Exceptione){tx.ro
llback();}tx.close();}publicEventtake()throwsChannelException
{Transactiontx=getTransaction();try{tx.begin();Connectionco
nn=DriverManager.getConnection("jdbc:mysql://localhost:3306/tes
t","root","p@ssw0rd");conn.setAutoCommit(false);PreparedStateme
ntppst=conn.prepareStatement("selectid,msgfromt1");ResultS
etrs=ppst.executeQuery();intid=0;Stringmsg=null;if(rs.n
ext()){id=rs.getInt(1);msg=rs.getString(2);}conn.prepareStat
ement("deletefromt1whereid="+id).executeUpdate();conn.com
mit();rs.close();ppst.close();conn.close();SimpleEvente=newSi
mpleEvent();e.setBody(msg.getBytes());returne;}catch(Exception
e){tx.rollback();}tx.close();returnnull;}publicTransactionge
tTransaction(){returnnewTransaction(){publicvoidbegin(){}p
ublicvoidcommit(){}publicvoidrollback(){}publicvoidclose(
){}};}}4、导出jar,并复制到/soft/flume/lib下5、$>flume-ngagent--conf-fil
emysql_c.conf-na16、$>nclocalhost44444Filechannel1、编写file_c.
confa1.sources=r1a1.sinks=k1a1.channels=c1#Describe/config
urethesourcea1.sources.r1.type=netcata1.sources.r1.bind=loc
alhosta1.sources.r1.port=44444#Describethesinka1.sinks.k1.ty
pe=logger#Useachannela1.channels.c1.type=filea1.channels
.c1.checkpointDir=/home/hadoop/flume/checkpointa1.channels.c1.da
taDirs=/home/hadoop/flume/data#Bindthesourceandsinktothe
channela1.sources.r1.channels=c1a1.sinks.k1.channel=c12、$>flu
me-ngagent--conf-filefile_c.conf-na13、$>nclocalhost44444可溢
出的内存通道1、编写spillablemem_c.confa1.sources=r1a1.sinks=k1a1.chann
els=c1#Describe/configurethesourcea1.sources.r1.type=netca
ta1.sources.r1.bind=localhosta1.sources.r1.port=44444#Descri
bethesinka1.sinks.k1.type=logger#Useachannela1.channels.c
1.type=SPILLABLEMEMORYa1.channels.c1.memoryCapacity=10000a1.chann
els.c1.overflowCapacity=1000000a1.channels.c1.byteCapacity=800000
a1.channels.c1.checkpointDir=/home/hadoop/flume/checkpointa1.cha
nnels.c1.dataDirs=/home/hadoop/flume/data#Bindthesourceands
inktothechannela1.sources.r1.channels=c1a1.sinks.k1.channel
=c12、$>flume-ngagent--conf-filespillablemem_c.conf-na13、$>n
clocalhost44444$flume>helloworldChannelselector复制通道选择器(Replic
atingChannelSelector)每个通道都要存放副本,如下图,Replicating会将source过来的even
ts发往所有channel,这是默认的ChannelSelector。1、编写rep_selector.confa1.source
s=r1a1.sinks=k1k2k3a1.channels=c1c2c3a1.sources.r1.type=
netcata1.sources.r1.bind=0.0.0.0a1.sources.r1.port=8888#a1.s
ources.r1.selector.type=replicating#a1.sources.r1.selector.optio
nal=c3a1.sinks.k1.type=loggera1.sinks.k2.type=loggera1.sinks.
k3.type=loggera1.channels.c1.type=memorya1.channels.c2.type=
memorya1.channels.c3.type=memorya1.sources.r1.channels=c1c2c
3a1.sinks.k1.channel=c1a1.sinks.k2.channel=c2a1.sinks.k3.chan
nel=c32、$>flume-ngagent--conf-filerep_selector.conf-na13、$
>nclocalhost8888$flume>helloworldMultiplexingchannelselector
复用通道选择器。根据情况不同,选择不同的通道.Multiplexing类型的ChannelSelector会根据Event中Hea
der中的某个属性决定分发到哪个Channel。1、编写mul-selector.confa1.sources=r1a1.si
nks=k1k2k3k4a1.channels=c1c2c3c4#a1.sources.r1.type=n
etcata1.sources.r1.type=avroa1.sources.r1.bind=0.0.0.0a1.sour
ces.r1.port=8888a1.sources.r1.selector.type=multiplexinga1.so
urces.r1.selector.header=statea1.sources.r1.selector.mapping.CZ
=c1a1.sources.r1.selector.mapping.US=c2c3a1.sources.r1.selec
tor.default=c4a1.sinks.k1.type=loggera1.sinks.k2.type=logge
ra1.sinks.k3.type=loggera1.sinks.k4.type=loggera1.channels.c1
.type=memorya1.channels.c2.type=memorya1.channels.c3.type=m
emorya1.channels.c4.type=memorya1.sources.r1.channels=c1c2c3c4a1.sinks.k1.channel=c1a1.sinks.k2.channel=c2a1.sinks.k3.channel=c3a1.sinks.k4.channel=c4a)创建头文件和f文件$>echostate=US>header.txt$>echostate=CZ>header1.txt$>echohello>f.txta'')修改mul_selector.conf,能够读取多个文件a1.sources.r1.type=avrob)启动flumeagent$>flume-ngagent--conf-filemul-selector.conf-na1c)使用flume-ngavro-client$>flume-ngavro-client-Hlocalhost-p8888--headerFileheader.txt--filenamef.txt$>flume-ngavro-client-Hlocalhost-p8888-Ff.txt$>flume-ngavro-client-Hlocalhost-p8888--headerFileheader1.txt--filenamef.txt也可以通过如下的方式设置:1、通过如下的方式可以在头信息中增加信息type=12、通过如下的配置,选择channel4A项目组4A项目组魁魁语录:功盖三分国,名成八阵图。江流石不转,遗恨失吞吴江湖一哥版权所有
献花(0)
+1
(本文系王守奎的图...原创)