hadoop中mapreduce的shuffle过程详细解释
1.mapreduce整体执行流程
input – split切片 – map – map shuffle – 分成多个partition – reduce shuffle – reduce拉去对应的partition
到相应的reduce上 – reduce
2.map shuffle
partition
而可以通过自定义partitoner实现自定义分区,是缓解数据倾斜的一种手段。
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
collector
Map的输出结果是由collector处理,每个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中,是个字节数组,叫Kvbuffer,Kvbuffer中的数据刷到磁盘上的过程就叫Spill。
- Kvbuffer的大小可以通过io.sort.mb设置,默认大小为100M
- Spill的门限可以通过io.sort.spill.percent,默认是0.8
sort
当Spill触发后,SortAndSpill先把Kvbuffer中的数据按照partition值和key两个关键字升序排序,同一partition内的按照key有序。
spill
spill阶段,数据从kvbuffer的内存中到了磁盘上。Spill线程根据排过序的Kvmeta挨个partition的把数据吐到这个文件中。
在这个过程中如果用户配置了combiner类,那么在写之前会先调用combineAndSpill(),对结果进行进一步合并后再写出。减少数据倾斜的又一种方式。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果
为了加快查找速度spill溢出的文件spill.out.index文件存放文件索引,如图:
- 每个partition对应一个inde_p文件index_p文件记录了<起始数据,原始数据,压缩数据>
下面你是sortAndSpill源码
private void sortAndSpill() throws IOException, ClassNotFoundException,
InterruptedException {
//approximate the length of the output file to be the length of the
//buffer + header lengths for the partitions
long size = (bufend >= bufstart ? bufend - bufstart : (bufvoid - bufend) + bufstart) +
partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions); //每个partiiton定义一个索引
final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
final int endPosition = (kvend > kvstart) ? kvend : kvoffsets.length + kvend;
// 使用快速排序算法
sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
int spindex = kvstart;
// Spill文件的索引
IndexRecord rec = new IndexRecord();
InMemValBytes value = new InMemValBytes();
for (int i = 0; i < partitions; ++i) { // 循环访问各个分区
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
spilledRecordsCounter);
if (combinerRunner == null) { //没有定义combiner
// spill directly
DataInputBuffer key = new DataInputBuffer();
while (spindex < endPosition &&
kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) {
final int kvoff = kvoffsets[spindex % kvoffsets.length];
getVBytesForOffset(kvoff, value);
key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
(kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART]));
writer.append(key, value);
++spindex;
}
} else { //定义了combiner,使用combiner合并数据
int spstart = spindex;
while (spindex < endPosition &&
kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
if (spstart != spindex) {
combineCollector.setWriter(writer);
RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex);
combinerRunner.combine(kvIter, combineCollector);
}
}
// close the writer
writer.close();
// record offsets
rec.startOffset = segmentStart; //分区键值起始位置
rec.rawLength = writer.getRawLength();//数据原始长度
rec.partLength = writer.getCompressedLength();//数据压缩后的长度
spillRec.putIndex(rec, i);
writer = null;
} finally {
if (null != writer) writer.close();
}
}
// 处理spill文件的索引,如果内存索引大小超过限制,则写入到文件中。
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
LOG.info("Finished spill " + numSpills);
++numSpills;
} finally {
if (out != null) out.close();
}
}
map输出的数据和sort的数据公用kvbuffer
从原点开始bufindex存放的是map溢出的数据,kvmeta是sort的数据,互相朝着相反的方向运动,当运动到快要碰头的地方时,我们的指针方向发生反转(如上图变化到下图,环形缓冲区的本质就是一个字节数组)
merge
merge会将每个partition对应的所有的segment进行合并成为一个segment
如果是多批segment:
(1)先拿出第一批segment,以key为关键字形成最小堆,使用堆排序,
(2)拿到第二批segment,合并成为一个临时segment
(3)重复(2),直到所有的segment全部合并完成
(4)将所有的索引都放入index文件中
3.Reduce shuffle
copy
从网络中拉取数据,reduce默认开启5个线程拉取数据,但这个默认值可以通过mapreduce.reduce.shuffle.parallelcopies属性进行设置
merge
merge过程和reduce的有点像,也是溢写的过程,这里的缓冲区大小要比 map 端的更为灵活,它基于 JVM 的 heap size 设置,reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的
merge 有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘
在将buffer中多个map输出合并写入磁盘之前,如果设置了Combiner,则会化简压缩合并的map输出
Reduce的内存缓冲区可通过mapred.job.shuffle.input.buffer.percent配置,默认是JVM的heap size的70%。内存到磁盘merge的启动门限可以通过mapred.job.shuffle.merge.percent配置,默认是66%。
使用的是多路归并。
reduce
这时就要用到分组,默认的根据键分组,自定义的可是使用 job.setGroupingComparatorClass()方法设置分组函数类。这时会再次对其分组。只要这个比较器比较的两个Key相同,它们就属于同一组。如果是自定义javaBean,可以通过自定实现WritableComparator的cpmpare方法来实现将某一类数据认为是相同的key为同一组,同时进入到同一个reduce中。
参考文献:
https://blog.csdn.net/u014374284/article/details/49205885
http://matt33.com/2016/03/02/hadoop-shuffle/
https://blog.csdn.net/ASN_forever/article/details/81233547
https://blog.csdn.net/dianacody/article/details/39502917