版本修改人修改记录修改时间V1.0王守奎编写2018/2/8目录Hadoop文件类型及序列化1背景4文件类型4SequenceFile4写S equenceFile5读SequenceFile8Position9Seek10写文件时写入同步点11通过同步点读文件12写压缩 文件(不压缩)13写压缩文件(BZip2)14读压缩文件(BZip2)15MapFile15写Map文件16写Map文件(带索引间 隔)18读Map文件18Seek19查找key最近的值20修复seq文件21ArrayFile22写ArrayFile22读Arr ayFile23SetFile24写SetFile24读SetFile24序列化、反序列化25hadoop自己的序列化格式Writ able25Hadoop序列化,反序列化27IntWritableCompare27IntWritablecomparator 28BooleanWritablecompare28Text29MapWritable29ObjectWritable30对象序 列、反序列实例311、定义Person对象312、定义测试类32背景Hadoop在业务开发过程中会使用到自定义的对象类,本文重在 介绍对象类的数据结构及常用的方法体。文件类型SequenceFile日志文件的每一行代表一条日志记录,如果想记录二进制类型,纯文本 并不是一个合适的格式,SequenceFile类适合这个情况。SequenceFile为二进制键/值对提供一个持久化的数据结构,序 列文件的好处是可以切割。SequenceFile包括:header和records根据是否压缩,以及采用记录压缩还是块压缩,存 储格式有所不同:不压缩:按照记录长度、Key长度、Value程度、Key值、Value值依次存储。长度是指字节数。采用指定的Se rialization进行序列化。Record压缩:只有value被压缩,压缩的codec保存在Header中。Block压缩: 多条记录被压缩在一起,可以利用记录之间的相似性,更节省空间。Block前后都加入了同步标识。Block的最小值由io.seqfi le.compress.blocksize属性设置。记录格式为:SEQ+version+keyclass+valueclas s//用hdfsdfs-cat看写SequenceFile注:在hadoop2.x之后,hadoop中的SequenceF ile.Writer将会逐渐摒弃大量的createWriter()重载方法,而整合为更为简洁的createWriter()方法,除 了配置参数外,其他的参数统统使用SequenceFile.Writer.Option来替代,具体有:新的API里提供的option 参数:FileOptionFileSystemOptionStreamOptionBufferSizeOptionBlockSiz eOptionReplicationOptionKeyClassOptionValueClassOptionMetadataOpt ionProgressableOptionCompressionOption这些参数能够满足各种不同的需要,参数之间不存在顺序关系 ,这样减少了代码编写工作量,更为直观SequenceFile文件的新旧两种写入方法,没有指定压缩算法默认为:deflate@Tes tpublicvoidwrite_1()throwsIOException{Configurationconf=n ewConfiguration();FileSystemfs=FileSystem.get(conf);Pathpath =newPath("hdfs://namenode:8020/user/hadoop/data/mySequenceFile .seq");Writerwriter=null;try{该方法已经被新的方法所替代writer=SequenceFil e.createWriter(fs,conf,path,IntWritable.class,Text.class);Int Writablekey=newIntWritable();Textval=newText();for(inti =0;i<100;i++){key.set(i);val.set("tom"+i);writer.append( key,val);}}catch(Exceptione){e.printStackTrace();IOUtils.clo seStream(writer);}System.out.println("over!");}@Testpublicvoidw rite_2()throwsException{SequenceFile.Writerwriter=null;try{C onfigurationconf=newConfiguration();conf.set("dfs.support.app end","true");conf.set("dfs.client.block.write.replace-datanode-o n-failure.policy","NEVER");conf.set("dfs.client.block.write.repl ace-datanode-on-failure.enable","true");FileSystemfs=FileSyst em.get(conf);IntWritablekey=newIntWritable();Textvalue=new Text();Pathpath=newPath("hdfs://namenode:8020/user/hadoop/da ta/mySequenceFile1.seq");OptionkeyClass=SequenceFile.Writer.ke yClass(key.getClass());OptionvalueClass=SequenceFile.Writer.va lueClass(value.getClass());Optionfile=SequenceFile.Writer.file (path);//指定SequenceFile的输出流(指定了fileoption就不能指定streamoption,二 选一)Optionstream=SequenceFile.Writer.stream(fs.create(path));// 设置缓存大小OptionbufferSize=SequenceFile.Writer.bufferSize(1024);// 文件存在是否追加内容,默认为覆盖原有内容OptionappendIfExists=SequenceFile.Writer.a ppendIfExists(true);//指定文件的压缩类型和压缩算法(不再需要单独指定压缩类型)CompressionCode ccodec=ReflectionUtils.newInstance(BZip2Codec.class,conf);Opt ioncompression2=SequenceFile.Writer.compression(CompressionTyp e.RECORD,codec);//指定文件的压缩类型/Donotcompressrecords.///NON E,/Compressvaluesonly,eachseparately.///RECORD,/C ompresssequencesofrecordstogetherinblocks.///BLOCKOption compression=SequenceFile.Writer.compression(CompressionType.NO NE);writer=SequenceFile.createWriter(conf,file,keyClass,append IfExists,valueClass,bufferSize,compression2);for(inti=0;i< 100;i++){key.set(i);value.set("lucy"+i);System.out.printf("[% s]\t%s\t%s\n",writer.getLength(),key,value);writer.append(key, value);}}catch(Exceptione){e.printStackTrace();}finally{IOUti ls.closeStream(writer);}System.out.println("over!");}读SequenceFil eFileOption-表示读哪个文件InputStreamOptionStartOptionLengthOption-按照设 置的长度变量来决定读取的字节BufferSizeOptionSequenceFile文件的新旧两种读方法,新的方法为建议@Test publicvoidread_1()throwsIOException{Configurationconf=new Configuration();FileSystemfs=FileSystem.get(conf);Pathpath= newPath("hdfs://namenode:8020/user/hadoop/data/mySequenceFile.s eq");Readerreader=newSequenceFile.Reader(fs,path,conf);IntW ritablekey=newIntWritable();Textval=newText();longpositi on=reader.getPosition();while(reader.next(key,val)){Strings yncSeen=reader.syncSeen()?"":"";System.out.printf("[%s%s]\ t%s\t%s\n",position,syncSeen,key,val);position=reader.getPo sition();}IOUtils.closeStream(reader);}@Testpublicvoidread_2() throwsIOException{Configurationconf=newConfiguration();File Systemfs=FileSystem.get(conf);Pathpath=newPath("hdfs://nam enode:8020/user/hadoop/data/mySequenceFile.seq");//设置读取的文件Option file=SequenceFile.Reader.file(path);//读取文件的缓存区大小OptionbufferSi ze=SequenceFile.Reader.bufferSize(1024);//按照设置的长度变量来决定读取的字节Opti onlength=SequenceFile.Reader.length(1000);//开始读取的字节数Optionsta rt=SequenceFile.Reader.start(20);//指定SequenceFile的输入流(指定了file option就不能指定streamoption,二选一)Optionstream=SequenceFile.Reade r.stream(fs.open(path));//开始读取数据,option可以任意多个Readerreader=new SequenceFile.Reader(conf,file,bufferSize);//自动获取键值对的数据类型Writable key=(Writable)ReflectionUtils.newInstance(reader.getKeyClass() ,conf);Writablevalue=(Writable)ReflectionUtils.newInstance(r eader.getValueClass(),conf);//下一条记录的位置longposition=reader.get Position();while(reader.next(key,value)){//调用reader.next获取下一条记 录时,是否通过一个同步点StringsyncSeen=reader.syncSeen()?"":"";//格式化输 出System.out.printf("[%s%s]\t%s\t%s\n",position,syncSeen,key,v alue);position=reader.getPosition();//beginningofnextreco rd}IOUtils.closeStream(reader);}Position读取记录的字节位置信息@Testpublicvo idreadSeqPosition()throwsIOException{Configurationconf=new Configuration();FileSystemfs=FileSystem.get(conf);Pathpath= newPath("hdfs://namenode:8020/user/hadoop/data/mySequenceFile1. seq");Readerreader=newSequenceFile.Reader(fs,path,conf);Int Writablekey=newIntWritable();Textval=newText();//Returnt hecurrentbytepositionintheinputfile.longposition=reade r.getPosition();System.out.println(position);while(reader.next(k ey,val)){System.out.println(key+":"+val);//beginningofne xtrecordSystem.out.println(reader.getPosition());}reader.close() ;}Seek从输入文件跳过指定的字节数,开始读取下一条记录。@TestpublicvoidseekSeq()throwsI OException{Configurationconf=newConfiguration();FileSystemf s=FileSystem.get(conf);Pathpath=newPath("hdfs://namenode:80 20/user/hadoop/data/mySequenceFile.seq");Readerreader=newSequ enceFile.Reader(fs,path,conf);IntWritablekey=newIntWritable ();Textval=newText();reader.seek(278);reader.next(key,val);S ystem.out.println(key+":"+val);reader.close();}写文件时写入同步点在写sequenc e文件时通过writer.sync();插入一个同步点@TestpublicvoidwriteWithSync()throw sIOException{Configurationconf=newConfiguration();FileSyste mfs=FileSystem.get(conf);Pathpath=newPath("hdfs://namenode :8020/user/hadoop/data/mySequenceFile2.seq");Writerwriter=Sequ enceFile.createWriter(fs,conf,path,IntWritable.class,Text.cla ss);IntWritablekey=newIntWritable();Textval=newText();int count=5;for(inti=0;i<100;i++){key.set(i);val.set("jer ry"+i);writer.append(key,val);if(i%count==0){writer.sync();} }writer.close();System.out.println("over!");}读取文件,同时打印同步点信息@Testp ublicvoidread_1()throwsIOException{Configurationconf=new Configuration();FileSystemfs=FileSystem.get(conf);Pathpath= newPath("hdfs://namenode:8020/user/hadoop/data/mySequenceFile2.s eq");Readerreader=newSequenceFile.Reader(fs,path,conf);IntW ritablekey=newIntWritable();Textval=newText();//reader.se ek(200);longposition=reader.getPosition();while(reader.next(k ey,val)){if(reader.syncSeen()){System.out.println(""); }System.out.printf("[%s]\t%s\t%s\n",position,key,val);position =reader.getPosition();}IOUtils.closeStream(reader);}通过同步点读文件读取文 件同步点@Testpublicvoidsync()throwsIOException{Configurationcon f=newConfiguration();FileSystemfs=FileSystem.get(conf);Path path=newPath("hdfs://namenode:8020/user/hadoop/data/mySequenc eFile2.seq");Readerreader=newSequenceFile.Reader(fs,path,co nf);intsyncPos=200;//Seektothenextsyncmarkpastagiven position.reader.sync(syncPos);System.out.println(reader.getPositi on());//从指定的position位置对应同步点开始,后续紧挨的一个同步点开始读取数据IntWritablekey=n ewIntWritable();Textval=newText();reader.next(key,val);Syst em.out.println(key+":"+val);reader.close();}写压缩文件(不压缩)不指定压缩类型默认压缩 类型:NONE,压缩算法:deflate@TestpublicvoidwriteWithoutCompress()thro wsIOException{Configurationconf=newConfiguration();FileSyst emfs=FileSystem.get(conf);Pathpath=newPath("hdfs://namenod e:8020/user/hadoop/data/mySequenceFile3.seq");Writerwriter=Seq uenceFile.createWriter(conf,fs.create(path),IntWritable.class, Text.class,CompressionType.NONE,null);IntWritablekey=newIntWr itable();Textval=newText();intcount=5;for(inti=0;i< 100;i++){key.set(i);val.set("tom"+i);writer.append(key,val); if(i%count==0){writer.sync();}}writer.close();System.out.printl n("over!");}写压缩文件(BZip2)指定压缩类型和压缩算法,可以根据需要变更压缩算法@Testpublicvoid writeInBZip2Compress()throwsIOException{Configurationconf=n ewConfiguration();FileSystemfs=FileSystem.get(conf);Pathpath =newPath("hdfs://namenode:8020/user/hadoop/data/mySequenceFile 4.seq");CompressionCodeccodec=ReflectionUtils.newInstance(BZip 2Codec.class,conf);Writerwriter=SequenceFile.createWriter(con f,fs.create(path),IntWritable.class,Text.class,CompressionType .BLOCK,codec);IntWritablekey=newIntWritable();Textval=new Text();intcount=5;for(inti=0;i<100;i++){key.set(i);va l.set("tom"+i);writer.append(key,val);if(i%count==0){writer. sync();}}writer.close();System.out.println("over!");}读压缩文件(BZip2) 正常的读取文件即可@TestpublicvoidreadInBZip2Compress()throwsIOExceptio n{Configurationconf=newConfiguration();FileSystemfs=FileS ystem.get(conf);Pathpath=newPath("hdfs://namenode:8020/user/h adoop/data/mySequenceFile4.seq");Readerreader=newSequenceFile .Reader(fs,path,conf);IntWritablekey=newIntWritable();Text val=newText();while(reader.next(key,val)){System.out.printl n(key+":"+val);}reader.close();}MapFilemapfile是排序的SequnceFile ,具有索引。包含index(seq)和data(seq)文件要求key按照顺序添加,方便查找index文件:包含key和偏移量值的 映射,可以通过io.map.index.interval设置间隔值,默认128,方便折半查找data文件:key-value可以寻 找与指定key最近的key,可以设置是否向前寻找MapFile是SequenceFile的变种,在SequenceFile中加入索 引并排序后就是MapFile。索引作为一个单独的文件存储,一般每个128个记录存储一个索引。索引可以被载入内存,用于快速查找。存放 数据的文件根据Key定义的顺序排列。MapFile的记录必须按照顺序写入,否则抛出IOException。MapFile的衍生类 型:SetFile:特殊的MapFile,用于存储一序列Writable类型的Key。Key按照顺序写入。ArrayFile:Ke y为整数,代表在数组中的位置,value为Writable类型。写Map文件MapFile写入的时候产生两个文件,一个index序 列索引文件,一个data数据序列文件index文件定义key的区间范围便于快速查找和定位@Testpublicvoidwrit e_1()throwsIOException{Configurationconf=newConfiguration( );FileSystemfs=FileSystem.get(conf);MapFile.Writerwriter=ne wWriter(conf,fs,"hdfs://namenode:8020/user/hadoop/data/myMapFi le.map",IntWritable.class,Text.class);writer.append(newIntWrit able(1),newText("tony1"));writer.append(newIntWritable(2),new Text("tony2"));writer.append(newIntWritable(3),newText("tony3 "));writer.close();System.out.println("over!");}@Testpublicvoid write_2()throwsIOException{Configurationconf=newConfigurat ion();Pathpath=newPath("hdfs://namenode:8020/user/hadoop/data /myMapFile1.map");IntWritablekey=newIntWritable();Textvalue =newText();//指定文件的压缩类型和压缩算法(不再需要单独指定压缩类型)CompressionCodeccodec =ReflectionUtils.newInstance(BZip2Codec.class,conf);Optioncom pression2=MapFile.Writer.compression(CompressionType.RECORD,co dec);Optioncompression=MapFile.Writer.compression(CompressionT ype.NONE);OptionkeyClass=MapFile.Writer.keyClass(key.getClass( ));OptionvalueClass=MapFile.Writer.valueClass(value.getClass() );MapFile.Writerwriter=null;try{writer=newWriter(conf,path,k eyClass,valueClass,compression);intindexInterval=writer.getInd exInterval();System.out.println("---默认索引间隔----"+indexInterval);// 设置索引的新间隔步长writer.setIndexInterval(5);for(inti=0;i<100;i++ ){key.set(i);value.set("tom"+i);writer.append(key,value);}}c atch(Exceptione){e.printStackTrace();}finally{IOUtils.closeStr eam(writer);}System.out.println("over!");}写Map文件(带索引间隔)@Testpubli cvoidwriteIndexInterval()throwsIOException{Configurationcon f=newConfiguration();FileSystemfs=FileSystem.get(conf);MapF ile.Writerwriter=newWriter(conf,fs,"hdfs://namenode:8020/us er/hadoop/data/myMapFile2.map",IntWritable.class,Text.class);in tindexInterval=writer.getIndexInterval();System.out.println(in dexInterval);//map文件对应的索引文件的间隔大小值writer.setIndexInterval(10);IntW ritablekey=newIntWritable();Textval=newText();for(inti =0;i<100;i=i+3){key.set(i);val.set("tom"+i);writer.ap pend(key,val);}writer.close();System.out.println("over!");}读Map文 件@Testpublicvoidread_1()throwsIOException{Configurationconf =newConfiguration();FileSystemfs=FileSystem.get(conf);MapFi le.Readerreader=newReader(fs,"hdfs://namenode:8020/user/hado op/data/myMapFile1.map",conf);IntWritablekey=newIntWritable( );Textval=newText();while(reader.next(key,val)){System.out .println(key+":"+val);}//获得指定KEY值对应的Valueval=(Text)reader. get(newIntWritable(6),val);System.out.println(val);reader.close ();}@Testpublicvoidread_2()throwsIOException{Configurationc onf=newConfiguration();FileSystemfs=FileSystem.get(conf);Pa thpath=newPath("hdfs://namenode:8020/user/hadoop/data/myMapFile 1.map");//读取文件的缓存区大小OptionbufferSize=SequenceFile.Reader.buffe rSize(1024);MapFile.Readerreader=newReader(path,conf);IntWrit ablekey=newIntWritable();Textval=newText();while(reader. next(key,val)){System.out.println(key+":"+val);}//获得指定KEY值对 应的Valueval=(Text)reader.get(newIntWritable(6),val);System.ou t.println(val);reader.close();}Seek按照key值对应的位置进行跳转@Testpublicvoi dseek()throwsIOException{Configurationconf=newConfigurati on();Pathpath=newPath("hdfs://namenode:8020/user/hadoop/data/my MapFile1.map");MapFile.Readerreader=newReader(path,conf);IntW ritablekey=newIntWritable();Textval=newText();//查找key对应的位 置,找到返回true,找不到则查找当前key值后续的第一个实体的位置,返回false。booleanseek=reader. seek(newIntWritable(67));System.out.println(seek);//查找key值位置后的实体 reader.next(key,val);System.out.println(key+":"+val);reader. close();}查找key最近的值@TestpublicvoidgetClosest()throwsIOExceptio n{Configurationconf=newConfiguration();Pathpath=newPath("h dfs://namenode:8020/user/hadoop/data/myMapFile1.map");MapFile.Rea derreader=newReader(path,conf);IntWritablekey=newIntWrita ble();Textval=newText();key.set(18);//key查找的key值//如果存在对应的key 值,则返回key值对应的信息,并将data赋值给val。//如果不存在对应的key值,则观察第三个参数(不写默认为false)// 如果为true则查找key前面的最近的一个key返回,并将data赋值给val。//如果为false则查找key后面的最近的一 个key返回,并将data赋值给val。WritableComparableclosest=reader.getCloses t(key,val,false);System.out.println(closest+":"+val);reader .close();}修复seq文件1、准备seq文件@TestpublicvoidprepareSeqFile()throw sIOException{Configurationconf=newConfiguration();Pathpath =newPath("hdfs://namenode:8020/user/hadoop/data/seqFile.seq"); IntWritablekey=newIntWritable();Textvalue=newText();Seque nceFile.Writerwriter=null;try{OptionkeyClass=SequenceFile.Wr iter.keyClass(key.getClass());OptionvalueClass=SequenceFile.Wr iter.valueClass(value.getClass());Optionfile=SequenceFile.Writ er.file(path);writer=SequenceFile.createWriter(conf,file,keyCl ass,valueClass);for(inti=0;i<100;i++){key.set(i);value.s et("lucy"+i);System.out.printf("[%s]\t%s\t%s\n",writer.getLeng th(),key,value);writer.append(key,value);}}catch(Exceptione) {e.printStackTrace();}finally{IOUtils.closeStream(writer);}}@Tes tpublicvoidprepareSeqFile1()throwsIOException{Configuration conf=newConfiguration();FileSystemfs=FileSystem.get(conf);S equenceFile.Writerwriter=SequenceFile.createWriter(fs,conf,n ewPath("hdfs://namenode:8020/user/hadoop/data1/seqFile1.seq"),I ntWritable.class,Text.class);for(inti=0;i<100;i++){write r.append(newIntWritable(i),newText("nini"+i));}writer.close( );System.out.println("over!");}2、注意手动修改seq文件成data$>hdfsdfs-mv/ user/hadoop/data1/seqFile.seq/user/hadoop/data1/data修复sequence文件 ,重建索引,生成mapfile@TestpublicvoidfixSeqFile()throwsIllegalArgume ntException,Exception{Configurationconf=newConfiguration(); FileSystemfs=FileSystem.get(conf);Pathpath=newPath("hdfs:/ /namenode:8020/user/hadoop/data1");MapFile.fix(fs,path,IntWrita ble.class,Text.class,false,conf);System.out.println("over!");} ArrayFile文件结构是key-value格式key:indexvalue:value写ArrayFileimportorg .apache.hadoop.io.ArrayFile.Writer;@Testpublicvoidwrite()throw sIOException{Configurationconf=newConfiguration();FileSyste mfs=FileSystem.get(conf);Stringfile="hdfs://namenode:8020/use r/hadoop/data/myArrayFile.array";Writerwriter=newWriter(conf, fs,file,Text.class);writer.append(newText("tony1"));writer.app end(newText("tony2"));writer.append(newText("tony3"));writer.cl ose();System.out.println("over!");}读ArrayFile@Testpublicvoidrea d()throwsIOException{Configurationconf=newConfiguration(); FileSystemfs=FileSystem.get(conf);Stringfile="hdfs://namenode :8020/user/hadoop/data/myArrayFile.array";Readerreader=newRea der(fs,file,conf);Textval=newText();while(reader.next(val) !=null){System.out.println(val);}reader.close();}SetFile文件结构如下 :key:WritableComparablevalue:null写SetFile@Testpublicvoidwrite() throwsIOException{Configurationconf=newConfiguration();Fil eSystemfs=FileSystem.get(conf);StringdirName="hdfs://namenode :8020/user/hadoop/data/mySetFile.set";Writerwriter=newWriter( conf,fs,dirName,IntWritable.class,CompressionType.NONE);write r.append(newIntWritable(1));writer.append(newIntWritable(2));wr iter.append(newIntWritable(3));writer.close();System.out.println ("over!");}读SetFile@Testpublicvoidread()throwsIOException{Co nfigurationconf=newConfiguration();FileSystemfs=FileSystem .get(conf);StringdirName="hdfs://namenode:8020/user/hadoop/data/ mySetFile.set";Readerreader=newReader(fs,dirName,conf);IntW ritablekey=newIntWritable();NullWritableval=NullWritable. get();while(reader.next(key,val)){System.out.println(key+":" +val);}reader.close();}序列化、反序列化将对象转换成byte[]//反序列化相反Java中通过下面方 式实现:java.io.SerializableObjectOutputStream/ObjectInputStreamByteA rrayOutputStream/ByteArrayInputStream常用于进程间通信(interprocesscommu nication)、远程过程调用(Remoteprocedurecall)hadoop自己的序列化格式WritableHado oopWritable类层次结构如下图://序列化接口publicinterfaceWritable{//序列化v oidwrite(DataOutputout)throwsIOException;//反序列化voidreadFi elds(DataInputin)throwsIOException;hadoop与java中操作相反,从对象出发,给对象传 流。而java是把对象传递给流。Hadoop序列化,反序列化@Testpublicvoidserialize2()throw sIOException{ByteArrayOutputStreambaos=newByteArrayOutputSt ream();DataOutputStreamdos=newDataOutputStream(baos);IntWrita blei=newIntWritable(258);//serializei.write(dos);dos.close() ;baos.close();byte[]buf=baos.toByteArray();System.out.println( buf.length);//deserializeIntWritablei2=newIntWritable();i2.re adFields(newDataInputStream(newByteArrayInputStream(baos.toByte Array())));System.out.println(i2.get());}IntWritableCompare比较两个i nt数值,小于为-1、等于为0,大于为1@Testpublicvoidcompare()throwsIOException {IntWritablei1=newIntWritable(1);IntWritablei2=newIntWri table(2);System.out.println(i1.compareTo(i2));}IntWritablecompar ator通过比较器比较两个int数值,小于为-1、等于为0,大于为1@Testpublicvoidcomparator()t hrowsIOException{IntWritable.Comparatorc=newIntWritable.Com parator();IntWritablei1=newIntWritable(2);IntWritablei2=ne wIntWritable(1);System.out.println(c.compare(i1,i2));}BooleanWri tablecompare布尔值进行比较,((a==b)?0:(a==false)?-1:1);@Test publicvoidbooleanWritable()throwsIOException{BooleanWritable b1=newBooleanWritable(true);BooleanWritableb2=newBooleanW ritable(false);System.out.println(b1.compareTo(b2));}@Testpublic voidbooleanWritable1()throwsIOException{Comparatorcomparator =newBooleanWritable.Comparator();BooleanWritableb1=newBool eanWritable(true);BooleanWritableb2=newBooleanWritable(false) ;System.out.println(comparator.compare(b1,b2));}TextText相当于java. lang.String,示例代码如下:@Testpublicvoidtest1(){Texttxt=newText( "hadoopoo");//在字符串中查找指定下标位置的字符,从0开始charc=(char)txt.charAt(2); System.out.println(c);//从指定的位置开始查找对应的字符串,返回第一个查找到的位置下标//查找不到返回-1, 查找的开始位置超过字符长度报异常intpos=txt.find("oo",1);System.out.println(pos );}MapWritable@TestpublicvoidmapSerialize()throwsIOException {MapWritablemap=newMapWritable();map.put(newIntWritable(100) ,newText("tom"));map.put(newIntWritable(200),newText("tomas" ));map.put(newIntWritable(300),newText("tomlee"));//serialize ByteArrayOutputStreambaos=newByteArrayOutputStream();DataOutp utStreamdataout=newDataOutputStream(baos);map.write(dataout); dataout.close();baos.close();//deserializeinhadoopMapWritable map2=newMapWritable();map2.readFields(newDataInputStream(new ByteArrayInputStream(baos.toByteArray())));System.out.println(map 2.values());}ObjectWritable@TestpublicvoidObjectSerialize()thr owsIOException{Stringname="tom";String[]names={"abc","ddd ","dddfg"};ObjectWritableobj=newObjectWritable(names);//seri alizeByteArrayOutputStreambaos=newByteArrayOutputStream();Dat aOutputStreamdataout=newDataOutputStream(baos);obj.write(data out);dataout.close();//deserializeinhadoopObjectWritableobj2 =newObjectWritable();obj2.readFields(newDataInputStream(newBy teArrayInputStream(baos.toByteArray())));System.out.println(obj2. get());}对象序列、反序列实例定义Person对象packagecom.bm.serialize;importjava. io.DataInput;importjava.io.DataOutput;importjava.io.IOException ;importorg.apache.hadoop.io.BooleanWritable;importorg.apache.ha doop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.a pache.hadoop.io.Writable;/Copyright(C)2018编码界的小菜鸟作者:王守奎 2018年10月18日下午2:48:29/publicclassPersonimplementsWritable{ privateTextname;privateIntWritableage;privateBooleanWritable male;//serializepublicvoidwrite(DataOutputout)throwsIOException{name.write(out);age.write(out);male.write(out);}//deserializepublicvoidreadFields(DataInputin)throwsIOException{name=newText();age=newIntWritable();male=newBooleanWritable();name.readFields(in);age.readFields(in);male.readFields(in);}publicTextgetName(){returnname;}publicvoidsetName(Textname){this.name=name;}publicIntWritablegetAge(){returnage;}publicvoidsetAge(IntWritableage){this.age=age;}publicBooleanWritablegetMale(){returnmale;}publicvoidsetMale(BooleanWritablemale){this.male=male;}}2、定义测试类packagecom.bm.serialize;importjava.io.ByteArrayInputStream;importjava.io.DataInputStream;importjava.io.DataOutputStream;importjava.io.IOException;importorg.apache.commons.io.output.ByteArrayOutputStream;importorg.apache.hadoop.io.BooleanWritable;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.junit.Test;/Copyright(C)2018编码界的小菜鸟作者:王守奎2018年10月18日下午2:49:25/publicclassTestPerson{@TestpublicvoidtestSeria()throwsIOException{//newPersonPersonp=newPerson();p.setName(newText("tomas"));p.setAge(newIntWritable(12));p.setMale(newBooleanWritable(false));//serializeByteArrayOutputStreambaos=newByteArrayOutputStream();DataOutputStreamdataout=newDataOutputStream(baos);p.write(dataout);dataout.close();//deserializeinhadoopPersonnewPerson=newPerson();newPerson.readFields(newDataInputStream(newByteArrayInputStream(baos.toByteArray())));System.out.println(newPerson.getName());System.out.println(newPerson.getAge().get());}}魁魁语录:路堵啊江湖一哥版权所有 |
|