分享

Hadoop 文件输入和文件输出 | 学步园

 saluteLiu 2014-07-04

本文完成对hadoop输入、输出文件方式的控制,完成的功能如下:

1、改写map读取数据的格式:默认的<文件偏移量,行内容>----------->变为<文件名,文件内容>

2、改写输出的格式,输出文件时每个输入文件对应一个输出文件,输出文件的名字跟输入文件名字相同。

直接上代码

coAuInputFormat

package an.hadoop.code.audit;

/**
 * The function of this class is revise the input format 
 * the <key,value > ---> map
 * <path,content> of the map
 * */

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;





public class coAuInputFormat extends FileInputFormat<Text, Text>{
	private CompressionCodecFactory compressionCodecs = null;
	public void configure(Configuration conf) {
		compressionCodecs = new CompressionCodecFactory(conf);
	}
	
	/**
	 * @brief isSplitable 不对文件进行切分,必须对文件整体进行处理
	 *
	 * @param fs
	 * @param file
	 *
	 * @return false
	 */
	protected boolean isSplitable(FileSystem fs, Path file) {
		CompressionCodec codec = compressionCodecs.getCodec(file);
		return false;//以文件为单位,每个单位作为一个split,即使单个文件的大小超过了64M,也就是Hadoop一个块得大小,也不进行分片
	}

	@Override
	public RecordReader<Text, Text> createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException,
			InterruptedException {
		// TODO Auto-generated method stub
		return new coAuRecordReader(context, split);
	}

}

coAuRecordReader

package an.hadoop.code.audit;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;


public class coAuRecordReader extends RecordReader<Text, Text> {
	
	private static final Log LOG = LogFactory.getLog(coAuRecordReader.class.getName());
	private CompressionCodecFactory compressionCodecs = null;
	private long start;
	private long pos;
	private long end;
	private byte[] buffer;
	private String keyName;
	private FSDataInputStream fileIn;
	private Text key = null;
    private Text value = null;

	public coAuRecordReader(TaskAttemptContext context, InputSplit genericSplit) throws IOException {
		// TODO Auto-generated constructor stub
		
		Configuration job = context.getConfiguration();
		FileSplit split = (FileSplit) genericSplit;
		start = ((FileSplit) split).getStart(); //从中可以看出每个文件是作为一个split的
		end = split.getLength() + start;
		final Path path = split.getPath();//
		keyName = path.toString();//key 的值是文件路径
		LOG.info("filename in hdfs is : " + keyName);//写入日志文件,去哪里查看日志呢?
		final FileSystem fs = path.getFileSystem(job);
		fileIn = fs.open(path);
		fileIn.seek(start);
		buffer = new byte[(int)(end - start)];
		this.pos = start;
		/*if(key == null){
			key = new Text();
			key.set(keyName);
		}
		if(value == null){
			value = new Text();
			value.set(utf8);
		}*/
		
		
	}//coAuRecordReader()

	

	@Override
	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		FileSplit split = (FileSplit) genericSplit;
	    Configuration job = context.getConfiguration();
	    //this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);
	    start = split.getStart();
	    end = start + split.getLength();
	    final Path file = split.getPath();
	    compressionCodecs = new CompressionCodecFactory(job);
	    final CompressionCodec codec = compressionCodecs.getCodec(file);
	    keyName = file.toString();//key 的值是文件路径
		LOG.info("filename in hdfs is : " + keyName);//写入日志文件,去哪里查看日志呢?
		final FileSystem fs = file.getFileSystem(job);
		fileIn = fs.open(file);
		fileIn.seek(start);
		buffer = new byte[(int)(end - start)];
		this.pos = start;
		
		
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		//这个是需要做的
		
		if(key == null){
			key = new Text();
		}
		key.set(keyName);
		if(value == null){
			value = new Text();
		}
		key.clear();
		key.set(keyName);// set the key
		value.clear();//clear the value
		while(pos < end){
			fileIn.readFully(pos,buffer);
			value.set(buffer);
			
			pos += buffer.length;
			LOG.info("end is : " + end  + " pos is : " + pos);
			return true;
		}
		
		
		return false;
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return key;
	}

	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		if (start == end) {
			return 0.0f;
		} else {
			return Math.min(1.0f, (pos - start) / (float)(end - start));
		}
	}

	@Override
	public void close() throws IOException {
		// TODO Auto-generated method stub
		if (fileIn != null) {
	        fileIn.close(); 
	    }
	}

	

}

coAuOutputFormat

package an.hadoop.code.audit;

/**
 * the name of the output file name
 * 
 * */

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;


public class coAuOutputFormat extends MultipleOutputFormat<Text, Text> {
	
	private final static String suffix = "_its4";
	

	@Override
	protected String generateFileNameForKeyValue(Text key, Text value,
			Configuration conf) {
		// TODO Auto-generated method stub
		String path =  key.toString(); //文件的路径及名字
		String[] dir = path.split("/");
		
		int length = dir.length; 
		String filename = dir[length -1];
		return filename + suffix;//输出的文件名,输出的文件名
	}

	
}

MultipleOutputFormat

package an.hadoop.code.audit;

/**
 * the mutiply 
 * */

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable>
		extends FileOutputFormat<K, V> { //默认的是TextOutputFormat
	private MultiRecordWriter writer = null;
	public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException,
			InterruptedException {
		if (writer == null) {
			writer = new MultiRecordWriter(job, getTaskOutputPath(job));//job ,output path
		}
		return writer;
	}
	private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {//获得输出路径
		Path workPath = null;
		OutputCommitter committer = super.getOutputCommitter(conf);
		if (committer instanceof FileOutputCommitter) {//如果是
			workPath = ((FileOutputCommitter) committer).getWorkPath();//工作路径
		} else {
			Path outputPath = super.getOutputPath(conf);//获得conf路径
			if (outputPath == null) {
				throw new IOException("Undefined job output-path");
			}
			workPath = outputPath;
		}
		return workPath; //
	}
	/**通过key, value, conf来确定输出文件名(含扩展名)*/
	protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);//抽象方法,被之后的方法重写了
	public class MultiRecordWriter extends RecordWriter<K, V> {
		/**RecordWriter的缓存*/
		private HashMap<String, RecordWriter<K, V>> recordWriters = null;
		private TaskAttemptContext job = null;
		/**输出目录*/
		private Path workPath = null;
		public MultiRecordWriter(TaskAttemptContext job, Path workPath) {//构造函数
			super();
			this.job = job;
			this.workPath = workPath;
			recordWriters = new HashMap<String, RecordWriter<K, V>>();
		}
		@Override
		public void close(TaskAttemptContext context) throws IOException, InterruptedException {//多个writer都要关掉
			Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();
			while (values.hasNext()) {
				values.next().close(context);
			}
			this.recordWriters.clear();
		}
		@Override
		public void write(K key, V value) throws IOException, InterruptedException {
			//得到输出文件名
			String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());//生成输出文件名
			RecordWriter<K, V> rw = this.recordWriters.get(baseName);//??
			if (rw == null) {
				rw = getBaseRecordWriter(job, baseName);//
				this.recordWriters.put(baseName, rw);
			}
			rw.write(key, value);
		}
		// ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}
		private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)
				throws IOException, InterruptedException {
			Configuration conf = job.getConfiguration();
			boolean isCompressed = getCompressOutput(job);
			String keyValueSeparator = ",";
			RecordWriter<K, V> recordWriter = null;
			if (isCompressed) {
				Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
						GzipCodec.class);
				CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
				Path file = new Path(workPath, baseName + codec.getDefaultExtension());
				FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
				recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec
						.createOutputStream(fileOut)), keyValueSeparator);
			} else {
				Path file = new Path(workPath, baseName);
				FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);//file 是指的file name of the output file
				recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);//这里调用的LineRecordWriter
			}
			return recordWriter;
		}
	}
}

LineRecordWriter

package an.hadoop.code.audit;

/*RecordWriter的一个实现,用于把<Key, Value>转化为一行文本。在Hadoop中,这个类作为TextOutputFormat的一个子类存在,
 * protected访问权限,因此普通程序无法访问。这里仅仅是把LineRecordWriter从TextOutputFormat抽取出来,作为一个独立的公共*/

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**摘自{@link TextOutputFormat}中的LineRecordWriter。 */
public class LineRecordWriter<K, V> extends RecordWriter<K, V> {
	private static final String utf8 = "UTF-8";
	private static final byte[] newline;
	static {
		try {
			newline = "\n".getBytes(utf8);// 相当与分隔符
		} catch (UnsupportedEncodingException uee) {
			throw new IllegalArgumentException("can't find " + utf8 + " encoding");
		}
	}
	protected DataOutputStream out;
	private final byte[] keyValueSeparator;
	public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
		this.out = out;
		try {
			this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
		} catch (UnsupportedEncodingException uee) {
			throw new IllegalArgumentException("can't find " + utf8 + " encoding");
		}
	}
	public LineRecordWriter(DataOutputStream out) {
		this(out, "/t");//"/t"默认的分隔符
	}
	private void writeObject(Object o) throws IOException {//被write函数调用
		if (o instanceof Text) {//
			Text to = (Text) o;
			out.write(to.getBytes(), 0, to.getLength());//将指定 byte 数组中从偏移量 off 开始的 len 个字节写入基础输出流
		} else {
			out.write(o.toString().getBytes(utf8));
		}
	}
	public synchronized void write(K key, V value) throws IOException {//这个要修改成 只是写成一个文件的格式,
		boolean nullKey = key == null || key instanceof NullWritable;
		boolean nullValue = value == null || value instanceof NullWritable;//重点是要改写Key,value,之类,value是一个文本,key是地址,这里不写入key了
		if (nullKey && nullValue) {
			return;
		}
		/*if (!nullKey) {//这个可以控制是否写入key,seperate and value
			writeObject(key);
		}
		if (!(nullKey || nullValue)) {
			out.write(keyValueSeparator);
		}*/
		if (!nullValue) {
			writeObject(value);
		}
		out.write(newline);
	}
	public synchronized void close(TaskAttemptContext context) throws IOException {
		out.close();
	}
}

CodeAudit

package an.hadoop.code.audit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;





public class CodeAudit {
	public static void main(String[] args) throws Exception {
	    Configuration conf = new Configuration();
	    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	    if (otherArgs.length != 2) {
	      System.err.println("Usage: code audit <in> <out>");
	      System.exit(2);
	    }
	    Job job = new Job(conf, "code audit");

	    job.setJarByClass(CodeAudit.class);
	    job.setMapperClass(coAuMapper.class);
	    job.setInputFormatClass(coAuInputFormat.class);
	    //job.setOutputKeyClass(NullWritable.class);
	    job.setOutputKeyClass(Text.class);
	    job.setOutputValueClass(Text.class);
	    
	    job.setOutputFormatClass(coAuOutputFormat.class);
	    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
	    System.exit(job.waitForCompletion(true) ? 0 : 1);
	  }


}

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多