配色: 字号:
06-Hadoo文件类型及序列化
2022-09-15 | 阅:  转:  |  分享 
  
版本修改人修改记录修改时间V1.0王守奎编写2018/2/8目录Hadoop文件类型及序列化1背景4文件类型4SequenceFile4写S
equenceFile5读SequenceFile8Position9Seek10写文件时写入同步点11通过同步点读文件12写压缩
文件(不压缩)13写压缩文件(BZip2)14读压缩文件(BZip2)15MapFile15写Map文件16写Map文件(带索引间
隔)18读Map文件18Seek19查找key最近的值20修复seq文件21ArrayFile22写ArrayFile22读Arr
ayFile23SetFile24写SetFile24读SetFile24序列化、反序列化25hadoop自己的序列化格式Writ
able25Hadoop序列化,反序列化27IntWritableCompare27IntWritablecomparator
28BooleanWritablecompare28Text29MapWritable29ObjectWritable30对象序
列、反序列实例311、定义Person对象312、定义测试类32背景Hadoop在业务开发过程中会使用到自定义的对象类,本文重在
介绍对象类的数据结构及常用的方法体。文件类型SequenceFile日志文件的每一行代表一条日志记录,如果想记录二进制类型,纯文本
并不是一个合适的格式,SequenceFile类适合这个情况。SequenceFile为二进制键/值对提供一个持久化的数据结构,序
列文件的好处是可以切割。SequenceFile包括:header和records根据是否压缩,以及采用记录压缩还是块压缩,存
储格式有所不同:不压缩:按照记录长度、Key长度、Value程度、Key值、Value值依次存储。长度是指字节数。采用指定的Se
rialization进行序列化。Record压缩:只有value被压缩,压缩的codec保存在Header中。Block压缩:
多条记录被压缩在一起,可以利用记录之间的相似性,更节省空间。Block前后都加入了同步标识。Block的最小值由io.seqfi
le.compress.blocksize属性设置。记录格式为:SEQ+version+keyclass+valueclas
s//用hdfsdfs-cat看写SequenceFile注:在hadoop2.x之后,hadoop中的SequenceF
ile.Writer将会逐渐摒弃大量的createWriter()重载方法,而整合为更为简洁的createWriter()方法,除
了配置参数外,其他的参数统统使用SequenceFile.Writer.Option来替代,具体有:新的API里提供的option
参数:FileOptionFileSystemOptionStreamOptionBufferSizeOptionBlockSiz
eOptionReplicationOptionKeyClassOptionValueClassOptionMetadataOpt
ionProgressableOptionCompressionOption这些参数能够满足各种不同的需要,参数之间不存在顺序关系
,这样减少了代码编写工作量,更为直观SequenceFile文件的新旧两种写入方法,没有指定压缩算法默认为:deflate@Tes
tpublicvoidwrite_1()throwsIOException{Configurationconf=n
ewConfiguration();FileSystemfs=FileSystem.get(conf);Pathpath
=newPath("hdfs://namenode:8020/user/hadoop/data/mySequenceFile
.seq");Writerwriter=null;try{该方法已经被新的方法所替代writer=SequenceFil
e.createWriter(fs,conf,path,IntWritable.class,Text.class);Int
Writablekey=newIntWritable();Textval=newText();for(inti
=0;i<100;i++){key.set(i);val.set("tom"+i);writer.append(
key,val);}}catch(Exceptione){e.printStackTrace();IOUtils.clo
seStream(writer);}System.out.println("over!");}@Testpublicvoidw
rite_2()throwsException{SequenceFile.Writerwriter=null;try{C
onfigurationconf=newConfiguration();conf.set("dfs.support.app
end","true");conf.set("dfs.client.block.write.replace-datanode-o
n-failure.policy","NEVER");conf.set("dfs.client.block.write.repl
ace-datanode-on-failure.enable","true");FileSystemfs=FileSyst
em.get(conf);IntWritablekey=newIntWritable();Textvalue=new
Text();Pathpath=newPath("hdfs://namenode:8020/user/hadoop/da
ta/mySequenceFile1.seq");OptionkeyClass=SequenceFile.Writer.ke
yClass(key.getClass());OptionvalueClass=SequenceFile.Writer.va
lueClass(value.getClass());Optionfile=SequenceFile.Writer.file
(path);//指定SequenceFile的输出流(指定了fileoption就不能指定streamoption,二
选一)Optionstream=SequenceFile.Writer.stream(fs.create(path));//
设置缓存大小OptionbufferSize=SequenceFile.Writer.bufferSize(1024);//
文件存在是否追加内容,默认为覆盖原有内容OptionappendIfExists=SequenceFile.Writer.a
ppendIfExists(true);//指定文件的压缩类型和压缩算法(不再需要单独指定压缩类型)CompressionCode
ccodec=ReflectionUtils.newInstance(BZip2Codec.class,conf);Opt
ioncompression2=SequenceFile.Writer.compression(CompressionTyp
e.RECORD,codec);//指定文件的压缩类型/Donotcompressrecords.///NON
E,/Compressvaluesonly,eachseparately.///RECORD,/C
ompresssequencesofrecordstogetherinblocks.///BLOCKOption
compression=SequenceFile.Writer.compression(CompressionType.NO
NE);writer=SequenceFile.createWriter(conf,file,keyClass,append
IfExists,valueClass,bufferSize,compression2);for(inti=0;i<
100;i++){key.set(i);value.set("lucy"+i);System.out.printf("[%
s]\t%s\t%s\n",writer.getLength(),key,value);writer.append(key,
value);}}catch(Exceptione){e.printStackTrace();}finally{IOUti
ls.closeStream(writer);}System.out.println("over!");}读SequenceFil
eFileOption-表示读哪个文件InputStreamOptionStartOptionLengthOption-按照设
置的长度变量来决定读取的字节BufferSizeOptionSequenceFile文件的新旧两种读方法,新的方法为建议@Test
publicvoidread_1()throwsIOException{Configurationconf=new
Configuration();FileSystemfs=FileSystem.get(conf);Pathpath=
newPath("hdfs://namenode:8020/user/hadoop/data/mySequenceFile.s
eq");Readerreader=newSequenceFile.Reader(fs,path,conf);IntW
ritablekey=newIntWritable();Textval=newText();longpositi
on=reader.getPosition();while(reader.next(key,val)){Strings
yncSeen=reader.syncSeen()?"":"";System.out.printf("[%s%s]\
t%s\t%s\n",position,syncSeen,key,val);position=reader.getPo
sition();}IOUtils.closeStream(reader);}@Testpublicvoidread_2()
throwsIOException{Configurationconf=newConfiguration();File
Systemfs=FileSystem.get(conf);Pathpath=newPath("hdfs://nam
enode:8020/user/hadoop/data/mySequenceFile.seq");//设置读取的文件Option
file=SequenceFile.Reader.file(path);//读取文件的缓存区大小OptionbufferSi
ze=SequenceFile.Reader.bufferSize(1024);//按照设置的长度变量来决定读取的字节Opti
onlength=SequenceFile.Reader.length(1000);//开始读取的字节数Optionsta
rt=SequenceFile.Reader.start(20);//指定SequenceFile的输入流(指定了file
option就不能指定streamoption,二选一)Optionstream=SequenceFile.Reade
r.stream(fs.open(path));//开始读取数据,option可以任意多个Readerreader=new
SequenceFile.Reader(conf,file,bufferSize);//自动获取键值对的数据类型Writable
key=(Writable)ReflectionUtils.newInstance(reader.getKeyClass()
,conf);Writablevalue=(Writable)ReflectionUtils.newInstance(r
eader.getValueClass(),conf);//下一条记录的位置longposition=reader.get
Position();while(reader.next(key,value)){//调用reader.next获取下一条记
录时,是否通过一个同步点StringsyncSeen=reader.syncSeen()?"":"";//格式化输
出System.out.printf("[%s%s]\t%s\t%s\n",position,syncSeen,key,v
alue);position=reader.getPosition();//beginningofnextreco
rd}IOUtils.closeStream(reader);}Position读取记录的字节位置信息@Testpublicvo
idreadSeqPosition()throwsIOException{Configurationconf=new
Configuration();FileSystemfs=FileSystem.get(conf);Pathpath=
newPath("hdfs://namenode:8020/user/hadoop/data/mySequenceFile1.
seq");Readerreader=newSequenceFile.Reader(fs,path,conf);Int
Writablekey=newIntWritable();Textval=newText();//Returnt
hecurrentbytepositionintheinputfile.longposition=reade
r.getPosition();System.out.println(position);while(reader.next(k
ey,val)){System.out.println(key+":"+val);//beginningofne
xtrecordSystem.out.println(reader.getPosition());}reader.close()
;}Seek从输入文件跳过指定的字节数,开始读取下一条记录。@TestpublicvoidseekSeq()throwsI
OException{Configurationconf=newConfiguration();FileSystemf
s=FileSystem.get(conf);Pathpath=newPath("hdfs://namenode:80
20/user/hadoop/data/mySequenceFile.seq");Readerreader=newSequ
enceFile.Reader(fs,path,conf);IntWritablekey=newIntWritable
();Textval=newText();reader.seek(278);reader.next(key,val);S
ystem.out.println(key+":"+val);reader.close();}写文件时写入同步点在写sequenc
e文件时通过writer.sync();插入一个同步点@TestpublicvoidwriteWithSync()throw
sIOException{Configurationconf=newConfiguration();FileSyste
mfs=FileSystem.get(conf);Pathpath=newPath("hdfs://namenode
:8020/user/hadoop/data/mySequenceFile2.seq");Writerwriter=Sequ
enceFile.createWriter(fs,conf,path,IntWritable.class,Text.cla
ss);IntWritablekey=newIntWritable();Textval=newText();int
count=5;for(inti=0;i<100;i++){key.set(i);val.set("jer
ry"+i);writer.append(key,val);if(i%count==0){writer.sync();}
}writer.close();System.out.println("over!");}读取文件,同时打印同步点信息@Testp
ublicvoidread_1()throwsIOException{Configurationconf=new
Configuration();FileSystemfs=FileSystem.get(conf);Pathpath=
newPath("hdfs://namenode:8020/user/hadoop/data/mySequenceFile2.s
eq");Readerreader=newSequenceFile.Reader(fs,path,conf);IntW
ritablekey=newIntWritable();Textval=newText();//reader.se
ek(200);longposition=reader.getPosition();while(reader.next(k
ey,val)){if(reader.syncSeen()){System.out.println("");
}System.out.printf("[%s]\t%s\t%s\n",position,key,val);position
=reader.getPosition();}IOUtils.closeStream(reader);}通过同步点读文件读取文
件同步点@Testpublicvoidsync()throwsIOException{Configurationcon
f=newConfiguration();FileSystemfs=FileSystem.get(conf);Path
path=newPath("hdfs://namenode:8020/user/hadoop/data/mySequenc
eFile2.seq");Readerreader=newSequenceFile.Reader(fs,path,co
nf);intsyncPos=200;//Seektothenextsyncmarkpastagiven
position.reader.sync(syncPos);System.out.println(reader.getPositi
on());//从指定的position位置对应同步点开始,后续紧挨的一个同步点开始读取数据IntWritablekey=n
ewIntWritable();Textval=newText();reader.next(key,val);Syst
em.out.println(key+":"+val);reader.close();}写压缩文件(不压缩)不指定压缩类型默认压缩
类型:NONE,压缩算法:deflate@TestpublicvoidwriteWithoutCompress()thro
wsIOException{Configurationconf=newConfiguration();FileSyst
emfs=FileSystem.get(conf);Pathpath=newPath("hdfs://namenod
e:8020/user/hadoop/data/mySequenceFile3.seq");Writerwriter=Seq
uenceFile.createWriter(conf,fs.create(path),IntWritable.class,
Text.class,CompressionType.NONE,null);IntWritablekey=newIntWr
itable();Textval=newText();intcount=5;for(inti=0;i<
100;i++){key.set(i);val.set("tom"+i);writer.append(key,val);
if(i%count==0){writer.sync();}}writer.close();System.out.printl
n("over!");}写压缩文件(BZip2)指定压缩类型和压缩算法,可以根据需要变更压缩算法@Testpublicvoid
writeInBZip2Compress()throwsIOException{Configurationconf=n
ewConfiguration();FileSystemfs=FileSystem.get(conf);Pathpath
=newPath("hdfs://namenode:8020/user/hadoop/data/mySequenceFile
4.seq");CompressionCodeccodec=ReflectionUtils.newInstance(BZip
2Codec.class,conf);Writerwriter=SequenceFile.createWriter(con
f,fs.create(path),IntWritable.class,Text.class,CompressionType
.BLOCK,codec);IntWritablekey=newIntWritable();Textval=new
Text();intcount=5;for(inti=0;i<100;i++){key.set(i);va
l.set("tom"+i);writer.append(key,val);if(i%count==0){writer.
sync();}}writer.close();System.out.println("over!");}读压缩文件(BZip2)
正常的读取文件即可@TestpublicvoidreadInBZip2Compress()throwsIOExceptio
n{Configurationconf=newConfiguration();FileSystemfs=FileS
ystem.get(conf);Pathpath=newPath("hdfs://namenode:8020/user/h
adoop/data/mySequenceFile4.seq");Readerreader=newSequenceFile
.Reader(fs,path,conf);IntWritablekey=newIntWritable();Text
val=newText();while(reader.next(key,val)){System.out.printl
n(key+":"+val);}reader.close();}MapFilemapfile是排序的SequnceFile
,具有索引。包含index(seq)和data(seq)文件要求key按照顺序添加,方便查找index文件:包含key和偏移量值的
映射,可以通过io.map.index.interval设置间隔值,默认128,方便折半查找data文件:key-value可以寻
找与指定key最近的key,可以设置是否向前寻找MapFile是SequenceFile的变种,在SequenceFile中加入索
引并排序后就是MapFile。索引作为一个单独的文件存储,一般每个128个记录存储一个索引。索引可以被载入内存,用于快速查找。存放
数据的文件根据Key定义的顺序排列。MapFile的记录必须按照顺序写入,否则抛出IOException。MapFile的衍生类
型:SetFile:特殊的MapFile,用于存储一序列Writable类型的Key。Key按照顺序写入。ArrayFile:Ke
y为整数,代表在数组中的位置,value为Writable类型。写Map文件MapFile写入的时候产生两个文件,一个index序
列索引文件,一个data数据序列文件index文件定义key的区间范围便于快速查找和定位@Testpublicvoidwrit
e_1()throwsIOException{Configurationconf=newConfiguration(
);FileSystemfs=FileSystem.get(conf);MapFile.Writerwriter=ne
wWriter(conf,fs,"hdfs://namenode:8020/user/hadoop/data/myMapFi
le.map",IntWritable.class,Text.class);writer.append(newIntWrit
able(1),newText("tony1"));writer.append(newIntWritable(2),new
Text("tony2"));writer.append(newIntWritable(3),newText("tony3
"));writer.close();System.out.println("over!");}@Testpublicvoid
write_2()throwsIOException{Configurationconf=newConfigurat
ion();Pathpath=newPath("hdfs://namenode:8020/user/hadoop/data
/myMapFile1.map");IntWritablekey=newIntWritable();Textvalue
=newText();//指定文件的压缩类型和压缩算法(不再需要单独指定压缩类型)CompressionCodeccodec
=ReflectionUtils.newInstance(BZip2Codec.class,conf);Optioncom
pression2=MapFile.Writer.compression(CompressionType.RECORD,co
dec);Optioncompression=MapFile.Writer.compression(CompressionT
ype.NONE);OptionkeyClass=MapFile.Writer.keyClass(key.getClass(
));OptionvalueClass=MapFile.Writer.valueClass(value.getClass()
);MapFile.Writerwriter=null;try{writer=newWriter(conf,path,k
eyClass,valueClass,compression);intindexInterval=writer.getInd
exInterval();System.out.println("---默认索引间隔----"+indexInterval);//
设置索引的新间隔步长writer.setIndexInterval(5);for(inti=0;i<100;i++
){key.set(i);value.set("tom"+i);writer.append(key,value);}}c
atch(Exceptione){e.printStackTrace();}finally{IOUtils.closeStr
eam(writer);}System.out.println("over!");}写Map文件(带索引间隔)@Testpubli
cvoidwriteIndexInterval()throwsIOException{Configurationcon
f=newConfiguration();FileSystemfs=FileSystem.get(conf);MapF
ile.Writerwriter=newWriter(conf,fs,"hdfs://namenode:8020/us
er/hadoop/data/myMapFile2.map",IntWritable.class,Text.class);in
tindexInterval=writer.getIndexInterval();System.out.println(in
dexInterval);//map文件对应的索引文件的间隔大小值writer.setIndexInterval(10);IntW
ritablekey=newIntWritable();Textval=newText();for(inti
=0;i<100;i=i+3){key.set(i);val.set("tom"+i);writer.ap
pend(key,val);}writer.close();System.out.println("over!");}读Map文
件@Testpublicvoidread_1()throwsIOException{Configurationconf
=newConfiguration();FileSystemfs=FileSystem.get(conf);MapFi
le.Readerreader=newReader(fs,"hdfs://namenode:8020/user/hado
op/data/myMapFile1.map",conf);IntWritablekey=newIntWritable(
);Textval=newText();while(reader.next(key,val)){System.out
.println(key+":"+val);}//获得指定KEY值对应的Valueval=(Text)reader.
get(newIntWritable(6),val);System.out.println(val);reader.close
();}@Testpublicvoidread_2()throwsIOException{Configurationc
onf=newConfiguration();FileSystemfs=FileSystem.get(conf);Pa
thpath=newPath("hdfs://namenode:8020/user/hadoop/data/myMapFile
1.map");//读取文件的缓存区大小OptionbufferSize=SequenceFile.Reader.buffe
rSize(1024);MapFile.Readerreader=newReader(path,conf);IntWrit
ablekey=newIntWritable();Textval=newText();while(reader.
next(key,val)){System.out.println(key+":"+val);}//获得指定KEY值对
应的Valueval=(Text)reader.get(newIntWritable(6),val);System.ou
t.println(val);reader.close();}Seek按照key值对应的位置进行跳转@Testpublicvoi
dseek()throwsIOException{Configurationconf=newConfigurati
on();Pathpath=newPath("hdfs://namenode:8020/user/hadoop/data/my
MapFile1.map");MapFile.Readerreader=newReader(path,conf);IntW
ritablekey=newIntWritable();Textval=newText();//查找key对应的位
置,找到返回true,找不到则查找当前key值后续的第一个实体的位置,返回false。booleanseek=reader.
seek(newIntWritable(67));System.out.println(seek);//查找key值位置后的实体
reader.next(key,val);System.out.println(key+":"+val);reader.
close();}查找key最近的值@TestpublicvoidgetClosest()throwsIOExceptio
n{Configurationconf=newConfiguration();Pathpath=newPath("h
dfs://namenode:8020/user/hadoop/data/myMapFile1.map");MapFile.Rea
derreader=newReader(path,conf);IntWritablekey=newIntWrita
ble();Textval=newText();key.set(18);//key查找的key值//如果存在对应的key
值,则返回key值对应的信息,并将data赋值给val。//如果不存在对应的key值,则观察第三个参数(不写默认为false)//
如果为true则查找key前面的最近的一个key返回,并将data赋值给val。//如果为false则查找key后面的最近的一
个key返回,并将data赋值给val。WritableComparableclosest=reader.getCloses
t(key,val,false);System.out.println(closest+":"+val);reader
.close();}修复seq文件1、准备seq文件@TestpublicvoidprepareSeqFile()throw
sIOException{Configurationconf=newConfiguration();Pathpath
=newPath("hdfs://namenode:8020/user/hadoop/data/seqFile.seq");
IntWritablekey=newIntWritable();Textvalue=newText();Seque
nceFile.Writerwriter=null;try{OptionkeyClass=SequenceFile.Wr
iter.keyClass(key.getClass());OptionvalueClass=SequenceFile.Wr
iter.valueClass(value.getClass());Optionfile=SequenceFile.Writ
er.file(path);writer=SequenceFile.createWriter(conf,file,keyCl
ass,valueClass);for(inti=0;i<100;i++){key.set(i);value.s
et("lucy"+i);System.out.printf("[%s]\t%s\t%s\n",writer.getLeng
th(),key,value);writer.append(key,value);}}catch(Exceptione)
{e.printStackTrace();}finally{IOUtils.closeStream(writer);}}@Tes
tpublicvoidprepareSeqFile1()throwsIOException{Configuration
conf=newConfiguration();FileSystemfs=FileSystem.get(conf);S
equenceFile.Writerwriter=SequenceFile.createWriter(fs,conf,n
ewPath("hdfs://namenode:8020/user/hadoop/data1/seqFile1.seq"),I
ntWritable.class,Text.class);for(inti=0;i<100;i++){write
r.append(newIntWritable(i),newText("nini"+i));}writer.close(
);System.out.println("over!");}2、注意手动修改seq文件成data$>hdfsdfs-mv/
user/hadoop/data1/seqFile.seq/user/hadoop/data1/data修复sequence文件
,重建索引,生成mapfile@TestpublicvoidfixSeqFile()throwsIllegalArgume
ntException,Exception{Configurationconf=newConfiguration();
FileSystemfs=FileSystem.get(conf);Pathpath=newPath("hdfs:/
/namenode:8020/user/hadoop/data1");MapFile.fix(fs,path,IntWrita
ble.class,Text.class,false,conf);System.out.println("over!");}
ArrayFile文件结构是key-value格式key:indexvalue:value写ArrayFileimportorg
.apache.hadoop.io.ArrayFile.Writer;@Testpublicvoidwrite()throw
sIOException{Configurationconf=newConfiguration();FileSyste
mfs=FileSystem.get(conf);Stringfile="hdfs://namenode:8020/use
r/hadoop/data/myArrayFile.array";Writerwriter=newWriter(conf,
fs,file,Text.class);writer.append(newText("tony1"));writer.app
end(newText("tony2"));writer.append(newText("tony3"));writer.cl
ose();System.out.println("over!");}读ArrayFile@Testpublicvoidrea
d()throwsIOException{Configurationconf=newConfiguration();
FileSystemfs=FileSystem.get(conf);Stringfile="hdfs://namenode
:8020/user/hadoop/data/myArrayFile.array";Readerreader=newRea
der(fs,file,conf);Textval=newText();while(reader.next(val)
!=null){System.out.println(val);}reader.close();}SetFile文件结构如下
:key:WritableComparablevalue:null写SetFile@Testpublicvoidwrite()
throwsIOException{Configurationconf=newConfiguration();Fil
eSystemfs=FileSystem.get(conf);StringdirName="hdfs://namenode
:8020/user/hadoop/data/mySetFile.set";Writerwriter=newWriter(
conf,fs,dirName,IntWritable.class,CompressionType.NONE);write
r.append(newIntWritable(1));writer.append(newIntWritable(2));wr
iter.append(newIntWritable(3));writer.close();System.out.println
("over!");}读SetFile@Testpublicvoidread()throwsIOException{Co
nfigurationconf=newConfiguration();FileSystemfs=FileSystem
.get(conf);StringdirName="hdfs://namenode:8020/user/hadoop/data/
mySetFile.set";Readerreader=newReader(fs,dirName,conf);IntW
ritablekey=newIntWritable();NullWritableval=NullWritable.
get();while(reader.next(key,val)){System.out.println(key+":"
+val);}reader.close();}序列化、反序列化将对象转换成byte[]//反序列化相反Java中通过下面方
式实现:java.io.SerializableObjectOutputStream/ObjectInputStreamByteA
rrayOutputStream/ByteArrayInputStream常用于进程间通信(interprocesscommu
nication)、远程过程调用(Remoteprocedurecall)hadoop自己的序列化格式WritableHado
oopWritable类层次结构如下图://序列化接口publicinterfaceWritable{//序列化v
oidwrite(DataOutputout)throwsIOException;//反序列化voidreadFi
elds(DataInputin)throwsIOException;hadoop与java中操作相反,从对象出发,给对象传
流。而java是把对象传递给流。Hadoop序列化,反序列化@Testpublicvoidserialize2()throw
sIOException{ByteArrayOutputStreambaos=newByteArrayOutputSt
ream();DataOutputStreamdos=newDataOutputStream(baos);IntWrita
blei=newIntWritable(258);//serializei.write(dos);dos.close()
;baos.close();byte[]buf=baos.toByteArray();System.out.println(
buf.length);//deserializeIntWritablei2=newIntWritable();i2.re
adFields(newDataInputStream(newByteArrayInputStream(baos.toByte
Array())));System.out.println(i2.get());}IntWritableCompare比较两个i
nt数值,小于为-1、等于为0,大于为1@Testpublicvoidcompare()throwsIOException
{IntWritablei1=newIntWritable(1);IntWritablei2=newIntWri
table(2);System.out.println(i1.compareTo(i2));}IntWritablecompar
ator通过比较器比较两个int数值,小于为-1、等于为0,大于为1@Testpublicvoidcomparator()t
hrowsIOException{IntWritable.Comparatorc=newIntWritable.Com
parator();IntWritablei1=newIntWritable(2);IntWritablei2=ne
wIntWritable(1);System.out.println(c.compare(i1,i2));}BooleanWri
tablecompare布尔值进行比较,((a==b)?0:(a==false)?-1:1);@Test
publicvoidbooleanWritable()throwsIOException{BooleanWritable
b1=newBooleanWritable(true);BooleanWritableb2=newBooleanW
ritable(false);System.out.println(b1.compareTo(b2));}@Testpublic
voidbooleanWritable1()throwsIOException{Comparatorcomparator
=newBooleanWritable.Comparator();BooleanWritableb1=newBool
eanWritable(true);BooleanWritableb2=newBooleanWritable(false)
;System.out.println(comparator.compare(b1,b2));}TextText相当于java.
lang.String,示例代码如下:@Testpublicvoidtest1(){Texttxt=newText(
"hadoopoo");//在字符串中查找指定下标位置的字符,从0开始charc=(char)txt.charAt(2);
System.out.println(c);//从指定的位置开始查找对应的字符串,返回第一个查找到的位置下标//查找不到返回-1,
查找的开始位置超过字符长度报异常intpos=txt.find("oo",1);System.out.println(pos
);}MapWritable@TestpublicvoidmapSerialize()throwsIOException
{MapWritablemap=newMapWritable();map.put(newIntWritable(100)
,newText("tom"));map.put(newIntWritable(200),newText("tomas"
));map.put(newIntWritable(300),newText("tomlee"));//serialize
ByteArrayOutputStreambaos=newByteArrayOutputStream();DataOutp
utStreamdataout=newDataOutputStream(baos);map.write(dataout);
dataout.close();baos.close();//deserializeinhadoopMapWritable
map2=newMapWritable();map2.readFields(newDataInputStream(new
ByteArrayInputStream(baos.toByteArray())));System.out.println(map
2.values());}ObjectWritable@TestpublicvoidObjectSerialize()thr
owsIOException{Stringname="tom";String[]names={"abc","ddd
","dddfg"};ObjectWritableobj=newObjectWritable(names);//seri
alizeByteArrayOutputStreambaos=newByteArrayOutputStream();Dat
aOutputStreamdataout=newDataOutputStream(baos);obj.write(data
out);dataout.close();//deserializeinhadoopObjectWritableobj2
=newObjectWritable();obj2.readFields(newDataInputStream(newBy
teArrayInputStream(baos.toByteArray())));System.out.println(obj2.
get());}对象序列、反序列实例定义Person对象packagecom.bm.serialize;importjava.
io.DataInput;importjava.io.DataOutput;importjava.io.IOException
;importorg.apache.hadoop.io.BooleanWritable;importorg.apache.ha
doop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.a
pache.hadoop.io.Writable;/Copyright(C)2018编码界的小菜鸟作者:王守奎
2018年10月18日下午2:48:29/publicclassPersonimplementsWritable{
privateTextname;privateIntWritableage;privateBooleanWritable
male;//serializepublicvoidwrite(DataOutputout)throwsIOException{name.write(out);age.write(out);male.write(out);}//deserializepublicvoidreadFields(DataInputin)throwsIOException{name=newText();age=newIntWritable();male=newBooleanWritable();name.readFields(in);age.readFields(in);male.readFields(in);}publicTextgetName(){returnname;}publicvoidsetName(Textname){this.name=name;}publicIntWritablegetAge(){returnage;}publicvoidsetAge(IntWritableage){this.age=age;}publicBooleanWritablegetMale(){returnmale;}publicvoidsetMale(BooleanWritablemale){this.male=male;}}2、定义测试类packagecom.bm.serialize;importjava.io.ByteArrayInputStream;importjava.io.DataInputStream;importjava.io.DataOutputStream;importjava.io.IOException;importorg.apache.commons.io.output.ByteArrayOutputStream;importorg.apache.hadoop.io.BooleanWritable;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.junit.Test;/Copyright(C)2018编码界的小菜鸟作者:王守奎2018年10月18日下午2:49:25/publicclassTestPerson{@TestpublicvoidtestSeria()throwsIOException{//newPersonPersonp=newPerson();p.setName(newText("tomas"));p.setAge(newIntWritable(12));p.setMale(newBooleanWritable(false));//serializeByteArrayOutputStreambaos=newByteArrayOutputStream();DataOutputStreamdataout=newDataOutputStream(baos);p.write(dataout);dataout.close();//deserializeinhadoopPersonnewPerson=newPerson();newPerson.readFields(newDataInputStream(newByteArrayInputStream(baos.toByteArray())));System.out.println(newPerson.getName());System.out.println(newPerson.getAge().get());}}魁魁语录:路堵啊江湖一哥版权所有
献花(0)
+1
(本文系王守奎的图...原创)