hadoop在执行MapReduce任务时,在map阶段,map函数产生的输出,并不是直接写入磁盘的。为了提高效率,它将输出结果先写入到内存中(即环形内存缓冲区,默认大小100M),再从缓冲区(溢)写入磁盘。
下面我们就来看看这段代码。
1、找到环形内存缓冲区
在运行job时,有条输出:
09/04/07 12:34:35 INFO mapred.MapTask: io.sort.mb = 100
上面的io.sort.mb,即map环形内存缓冲区的大小。
在org.apache.hadoop.mapred.MapTask中的第764行找到“io.sort.mb”
第781行:
- kvbuffer = new byte[maxMemUsage - recordCapacity];
kvbuffer是在第715行定义的:
看,这个内存缓冲区竟然是个byte数组!!
2、什么时候溢写到磁盘的?
- 第762行:
- final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
-
-
-
- 这100M,还分成2块:数据缓存和记录缓存
- 第707行:
- private final int[] kvoffsets;
-
-
-
- 第941行:
-
-
- return comparator.compare(kvbuffer,
- kvindices[ii + KEYSTART],
- kvindices[ii + VALSTART] - kvindices[ii + KEYSTART],
- kvbuffer,
- kvindices[ij + KEYSTART],
- kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]);
-
综上,溢写发生在:
1) 溢写比设置了<1的值,并且该值到了的时候
2) 溢写比为1,缓存满了的时候
3、缓冲区怎么成环形的?
答:通过折行。
----------------------------------
“折行写”
- 第1038行:
- boolean buffull = false;
-
- 1) bufindex + len > bufvoid
-
-
-
- 2) bufindex + len > bufstart
-
- 第1039行:
- boolean wrap = false;
-
-
- 1) bufstart <= bufend && bufend <= bufindex
-
- 2) (bufvoid - bufindex) + bufstart > len
-
-
-
-
- if (buffull) {
-
- final int gaplen = bufvoid - bufindex;
- System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
- len -= gaplen;
- off += gaplen;
- bufindex = 0;
- }
-
----------------------------------
“折行写”后的reset
- Line996-1014的reset()方法:
-
- 这个方法被调用的地点:第895行,
-
- 第893行,在collect方法中:
- if (bufindex < keystart) {
-
- bb.reset();
- keystart = 0;
- }
-
-
-
-
-
- 这个方法被调用的时候(bufindex < keystart == true):
-
- 一定是,序列化后的key被写入缓存区,而且是被wrap(折行)写入的!
-
-
- 这个方法里的解释:
-
- protected synchronized void reset() throws IOException {
-
-
-
-
-
- int headbytelen = bufvoid - bufmark;
-
-
-
-
- bufvoid = bufmark;
- if (bufindex + headbytelen < bufstart) {
-
-
-
- System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
- System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
- bufindex += headbytelen;
- } else {
-
-
- byte[] keytmp = new byte[bufindex];
- System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
- bufindex = 0;
-
-
-
-
-
-
- out.write(kvbuffer, bufmark, headbytelen);
- out.write(keytmp);
- }
- }
- }
4、“成环”示意图
上面的代码一定看的眼花缭乱吧?呵呵,我一开始看的时候,也被弄得很糊涂。请看下面的示意图,就会对这个环形缓冲有个好的理解了。
5、“溢写”过程
- bufend = bufmark;
- sortAndSpill();
- bufstart = bufend;
即:
溢写完毕后,原来的bufmark变成了bufstart
6、缓存为什么要设计成环形的?有什么好处?
答:使输入输出并行工作,即“写缓冲”可以和“溢写”并行。“溢写”工作由单独的线程来做。
- 解读“溢写”代码:
- bufend = bufmark;
- sortAndSpill();
- bufstart = bufend;
-
- 1)溢写前:
- bufend = bufmark;
-
- 则溢写的范围是:从bufstart到bufend。
-
- 2)在溢写的过程中,bufmark还是有可能增长的!
- 3)溢写完毕,bufstart = bufend;
*** THE END***