分享

使用java api操作HDFS文件

 melodyjian 2017-05-08

全部程序如下:

[java] view plain copy
  1. import java.io.IOException;  
  2. import java.net.URI;  
  3. import java.net.URISyntaxException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.fs.FSDataInputStream;  
  7. import org.apache.hadoop.fs.FSDataOutputStream;  
  8. import org.apache.hadoop.fs.FileStatus;  
  9. import org.apache.hadoop.fs.FileSystem;  
  10. import org.apache.hadoop.fs.FileUtil;  
  11. import org.apache.hadoop.fs.Path;  
  12. import org.apache.hadoop.io.IOUtils;  
  13.   
  14.   
  15. public class HDFSTest {  
  16.       
  17.     //在指定位置新建一个文件,并写入字符  
  18.     public static void WriteToHDFS(String file, String words) throws IOException, URISyntaxException  
  19.     {  
  20.         Configuration conf = new Configuration();  
  21.         FileSystem fs = FileSystem.get(URI.create(file), conf);  
  22.         Path path = new Path(file);  
  23.         FSDataOutputStream out = fs.create(path);   //创建文件  
  24.   
  25.         //两个方法都用于文件写入,好像一般多使用后者  
  26.         out.writeBytes(words);    
  27.         out.write(words.getBytes("UTF-8"));  
  28.           
  29.         out.close();  
  30.         //如果是要从输入流中写入,或是从一个文件写到另一个文件(此时用输入流打开已有内容的文件)  
  31.         //可以使用如下IOUtils.copyBytes方法。  
  32.         //FSDataInputStream in = fs.open(new Path(args[0]));  
  33.         //IOUtils.copyBytes(in, out, 4096, true)        //4096为一次复制块大小,true表示复制完成后关闭流  
  34.     }  
  35.       
  36.     public static void ReadFromHDFS(String file) throws IOException  
  37.     {  
  38.         Configuration conf = new Configuration();  
  39.         FileSystem fs = FileSystem.get(URI.create(file), conf);  
  40.         Path path = new Path(file);  
  41.         FSDataInputStream in = fs.open(path);  
  42.           
  43.         IOUtils.copyBytes(in, System.out, 4096true);  
  44.         //使用FSDataInoutStream的read方法会将文件内容读取到字节流中并返回  
  45.         /** 
  46.          * FileStatus stat = fs.getFileStatus(path); 
  47.       // create the buffer 
  48.        byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))]; 
  49.        is.readFully(0, buffer); 
  50.        is.close(); 
  51.              fs.close(); 
  52.        return buffer; 
  53.          */  
  54.     }  
  55.       
  56.     public static void DeleteHDFSFile(String file) throws IOException  
  57.     {  
  58.         Configuration conf = new Configuration();  
  59.         FileSystem fs = FileSystem.get(URI.create(file), conf);  
  60.         Path path = new Path(file);  
  61.         //查看fs的delete API可以看到三个方法。deleteonExit实在退出JVM时删除,下面的方法是在指定为目录是递归删除  
  62.         fs.delete(path,true);  
  63.         fs.close();  
  64.     }  
  65.       
  66.     public static void UploadLocalFileHDFS(String src, String dst) throws IOException  
  67.     {  
  68.         Configuration conf = new Configuration();  
  69.         FileSystem fs = FileSystem.get(URI.create(dst), conf);  
  70.         Path pathDst = new Path(dst);  
  71.         Path pathSrc = new Path(src);  
  72.           
  73.         fs.copyFromLocalFile(pathSrc, pathDst);  
  74.         fs.close();  
  75.     }  
  76.       
  77.     public static void ListDirAll(String DirFile) throws IOException  
  78.     {  
  79.         Configuration conf = new Configuration();  
  80.         FileSystem fs = FileSystem.get(URI.create(DirFile), conf);  
  81.         Path path = new Path(DirFile);  
  82.           
  83.         FileStatus[] status = fs.listStatus(path);  
  84.         //方法1    
  85.         for(FileStatus f: status)  
  86.         {  
  87.             System.out.println(f.getPath().toString());    
  88.         }  
  89.         //方法2    
  90.         Path[] listedPaths = FileUtil.stat2Paths(status);    
  91.         for (Path p : listedPaths){   
  92.           System.out.println(p.toString());  
  93.         }  
  94.     }  
  95.       
  96.     public static void main(String [] args) throws IOException, URISyntaxException  
  97.     {  
  98.         //下面做的是显示目录下所有文件  
  99.         ListDirAll("hdfs://ubuntu:9000/user/kqiao");  
  100.           
  101.         String fileWrite = "hdfs://ubuntu:9000/user/kqiao/test/FileWrite";  
  102.         String words = "This words is to write into file!\n";  
  103.         WriteToHDFS(fileWrite, words);  
  104.         //这里我们读取fileWrite的内容并显示在终端  
  105.         ReadFromHDFS(fileWrite);  
  106.         //这里删除上面的fileWrite文件  
  107.         DeleteHDFSFile(fileWrite);  
  108.         //假设本地有一个uploadFile,这里上传该文件到HDFS  
  109. //      String LocalFile = "file:///home/kqiao/hadoop/MyHadoopCodes/uploadFile";  
  110. //      UploadLocalFileHDFS(LocalFile, fileWrite    );  
  111.     }  
  112. }  


FSDataOutputStream os = hdfs.create(new Path(args[0])); 

注意:在os.flush()   刷新数据流;

有时写入的文件不能立即被其他读者看见,只有大于一个块时其他读者才能看见第一个块,但还是不能看见当前块。可以使用out.sync()  强制所有缓存与数据节点同步。其实在每一个os.close()中隐含了一个sync()的调用

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多