该文章是一个实验内容,采取的是单机实现将图片存储到hbase中,有兴趣的同学可以改成mapreduce版本。
思路大体如下:1.将所有图片上传到HDFS后,将这些小文件写成SequenFile文件。
2.读取SequenFile文件,解析后的key和value值分别作为HBase表中的rowkey与列值。
(HBase中rowkey的设计可自行设计)
下面是此次试验内容具体代码的实现:
public class SequenceFileTest {
static String PATH = "hdfs://server2:9000/out";
static SequenceFile.Writer writer = null;
public static void main(String[] args) throws Exception{
// Configuration conf = new Configuration();
// String path = "hdfs://server2:9000/usr/World_TMS_Image/5";
// URI uri = new URI(path);
// FileSystem fileSystem = FileSystem.get(uri, conf);
// writer = SequenceFile.createWriter(fileSystem, conf, new Path(PATH), Text.class, BytesWritable.class);
// listFileAndWriteToSequenceFile(fileSystem,path);
readSequenceFileAndWriteToHBase(new Path(PATH));
}
/****
* 递归文件;并将文件写成SequenceFile文件
* @param fileSystem
* @param path
* @throws Exception
*/
public static void listFileAndWriteToSequenceFile(FileSystem fileSystem,String path) throws Exception{
final FileStatus[] listStatuses = fileSystem.listStatus(new Path(path));
for (FileStatus fileStatus : listStatuses) {
if(fileStatus.isFile()){
Text fileText = new Text(fileStatus.getPath().toString());
System.out.println(fileText.toString());
FSDataInputStream in = fileSystem.open(new Path(fileText.toString()));
byte[] buffer = IOUtils.toByteArray(in);
in.read(buffer);
BytesWritable value = new BytesWritable(buffer);
//写成SequenceFile文件
writer.append(fileText, value);
}
if(fileStatus.isDirectory()){
listFileAndWriteToSequenceFile(fileSystem,fileStatus.getPath().toString());
}
}
}
//读取sequenceFile文件,并将文件写入HBase
public static void readSequenceFileAndWriteToHBase(Path path1)throws Exception{
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(conf);
//读取sequenceFile文件,创建reader对象
SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, path1, conf);
Text key = new Text();
BytesWritable val = new BytesWritable();
key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
val = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
int i = 0;
while(reader.next(key, val)){
String temp = key.toString();
temp = temp.substring(temp.indexOf("Image")+6, temp.indexOf("."));
String[] tmp = temp.split("/");
//rowKey 设计
String rowKey = Integer.valueOf(tmp[0])-1+"_"+Integer.valueOf(tmp[1])/2+"_"+Integer.valueOf(tmp[2])/2;
System.out.println(rowKey);
//读取图片
// i++;
// BufferedImage imag = ImageIO.read(new ByteArrayInputStream(val.get()));
// BufferedImage inputbig = new BufferedImage(256, 256,BufferedImage.TYPE_INT_BGR);
// inputbig.getGraphics().drawImage(imag, 0, 0, 256, 256, null); //画图
// File file2 =new File("d:/"+i+".jpg");
// ImageIO.write(inputbig, "jpg", file2);
//写入HBase
Configuration hbaseConf = HBaseConfiguration.create();
hbaseConf.setStrings("hbase.zookeeper.quorum", "server2");
//指定表名
HTable htable = new HTable(hbaseConf,"picHbase");
//指定ROWKEY的值
Put put = new Put(Bytes.toBytes(rowKey));
//指定列簇名称、列修饰符、列值
put.add("picinfo".getBytes(), temp.getBytes(), val.getBytes());
htable.put(put);
}
org.apache.hadoop.io.IOUtils.closeStream(reader);
}
}
|