本文完成对hadoop输入、输出文件方式的控制,完成的功能如下: 1、改写map读取数据的格式:默认的<文件偏移量,行内容>----------->变为<文件名,文件内容> 2、改写输出的格式,输出文件时每个输入文件对应一个输出文件,输出文件的名字跟输入文件名字相同。 直接上代码: coAuInputFormat package an.hadoop.code.audit; /** * The function of this class is revise the input format * the <key,value > ---> map * <path,content> of the map * */ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class coAuInputFormat extends FileInputFormat<Text, Text>{ private CompressionCodecFactory compressionCodecs = null; public void configure(Configuration conf) { compressionCodecs = new CompressionCodecFactory(conf); } /** * @brief isSplitable 不对文件进行切分,必须对文件整体进行处理 * * @param fs * @param file * * @return false */ protected boolean isSplitable(FileSystem fs, Path file) { CompressionCodec codec = compressionCodecs.getCodec(file); return false;//以文件为单位,每个单位作为一个split,即使单个文件的大小超过了64M,也就是Hadoop一个块得大小,也不进行分片 } @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub return new coAuRecordReader(context, split); } } coAuRecordReader package an.hadoop.code.audit; import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class coAuRecordReader extends RecordReader<Text, Text> { private static final Log LOG = LogFactory.getLog(coAuRecordReader.class.getName()); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private byte[] buffer; private String keyName; private FSDataInputStream fileIn; private Text key = null; private Text value = null; public coAuRecordReader(TaskAttemptContext context, InputSplit genericSplit) throws IOException { // TODO Auto-generated constructor stub Configuration job = context.getConfiguration(); FileSplit split = (FileSplit) genericSplit; start = ((FileSplit) split).getStart(); //从中可以看出每个文件是作为一个split的 end = split.getLength() + start; final Path path = split.getPath();// keyName = path.toString();//key 的值是文件路径 LOG.info("filename in hdfs is : " + keyName);//写入日志文件,去哪里查看日志呢? final FileSystem fs = path.getFileSystem(job); fileIn = fs.open(path); fileIn.seek(start); buffer = new byte[(int)(end - start)]; this.pos = start; /*if(key == null){ key = new Text(); key.set(keyName); } if(value == null){ value = new Text(); value.set(utf8); }*/ }//coAuRecordReader() @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); //this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); keyName = file.toString();//key 的值是文件路径 LOG.info("filename in hdfs is : " + keyName);//写入日志文件,去哪里查看日志呢? final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); fileIn.seek(start); buffer = new byte[(int)(end - start)]; this.pos = start; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub //这个是需要做的 if(key == null){ key = new Text(); } key.set(keyName); if(value == null){ value = new Text(); } key.clear(); key.set(keyName);// set the key value.clear();//clear the value while(pos < end){ fileIn.readFully(pos,buffer); value.set(buffer); pos += buffer.length; LOG.info("end is : " + end + " pos is : " + pos); return true; } return false; } @Override public Text getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return value; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float)(end - start)); } } @Override public void close() throws IOException { // TODO Auto-generated method stub if (fileIn != null) { fileIn.close(); } } } coAuOutputFormat package an.hadoop.code.audit; /** * the name of the output file name * * */ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; public class coAuOutputFormat extends MultipleOutputFormat<Text, Text> { private final static String suffix = "_its4"; @Override protected String generateFileNameForKeyValue(Text key, Text value, Configuration conf) { // TODO Auto-generated method stub String path = key.toString(); //文件的路径及名字 String[] dir = path.split("/"); int length = dir.length; String filename = dir[length -1]; return filename + suffix;//输出的文件名,输出的文件名 } } MultipleOutputFormat package an.hadoop.code.audit; /** * the mutiply * */ import java.io.DataOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable> extends FileOutputFormat<K, V> { //默认的是TextOutputFormat private MultiRecordWriter writer = null; public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job));//job ,output path } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {//获得输出路径 Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) {//如果是 workPath = ((FileOutputCommitter) committer).getWorkPath();//工作路径 } else { Path outputPath = super.getOutputPath(conf);//获得conf路径 if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; // } /**通过key, value, conf来确定输出文件名(含扩展名)*/ protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);//抽象方法,被之后的方法重写了 public class MultiRecordWriter extends RecordWriter<K, V> { /**RecordWriter的缓存*/ private HashMap<String, RecordWriter<K, V>> recordWriters = null; private TaskAttemptContext job = null; /**输出目录*/ private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) {//构造函数 super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap<String, RecordWriter<K, V>>(); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException {//多个writer都要关掉 Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } @Override public void write(K key, V value) throws IOException, InterruptedException { //得到输出文件名 String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());//生成输出文件名 RecordWriter<K, V> rw = this.recordWriters.get(baseName);//?? if (rw == null) { rw = getBaseRecordWriter(job, baseName);// this.recordWriters.put(baseName, rw); } rw.write(key, value); } // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension} private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = ","; RecordWriter<K, V> recordWriter = null; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec .createOutputStream(fileOut)), keyValueSeparator); } else { Path file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);//file 是指的file name of the output file recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);//这里调用的LineRecordWriter } return recordWriter; } } } LineRecordWriter package an.hadoop.code.audit;
/*RecordWriter的一个实现,用于把<Key, Value>转化为一行文本。在Hadoop中,这个类作为TextOutputFormat的一个子类存在,
* protected访问权限,因此普通程序无法访问。这里仅仅是把LineRecordWriter从TextOutputFormat抽取出来,作为一个独立的公共*/
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**摘自{@link TextOutputFormat}中的LineRecordWriter。 */
public class LineRecordWriter<K, V> extends RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
private static final byte[] newline;
static {
try {
newline = "\n".getBytes(utf8);// 相当与分隔符
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
public LineRecordWriter(DataOutputStream out) {
this(out, "/t");//"/t"默认的分隔符
}
private void writeObject(Object o) throws IOException {//被write函数调用
if (o instanceof Text) {//
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());//将指定 byte 数组中从偏移量 off 开始的 len 个字节写入基础输出流
} else {
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value) throws IOException {//这个要修改成 只是写成一个文件的格式,
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;//重点是要改写Key,value,之类,value是一个文本,key是地址,这里不写入key了
if (nullKey && nullValue) {
return;
}
/*if (!nullKey) {//这个可以控制是否写入key,seperate and value
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}*/
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
public synchronized void close(TaskAttemptContext context) throws IOException {
out.close();
}
}
CodeAudit package an.hadoop.code.audit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class CodeAudit { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: code audit <in> <out>"); System.exit(2); } Job job = new Job(conf, "code audit"); job.setJarByClass(CodeAudit.class); job.setMapperClass(coAuMapper.class); job.setInputFormatClass(coAuInputFormat.class); //job.setOutputKeyClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(coAuOutputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } |
|