hadoop中mapreduce的shuffle过程详细解释

1.mapreduce整体执行流程

input – split切片 – map – map shuffle – 分成多个partition – reduce shuffle – reduce拉去对应的partition
到相应的reduce上 – reduce

2.map shuffle


partition

而可以通过自定义partitoner实现自定义分区,是缓解数据倾斜的一种手段。

public class HashPartitioner<K, V> extends Partitioner<K, V> {
 
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
 
}

collector

Map的输出结果是由collector处理,每个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中,是个字节数组,叫Kvbuffer,Kvbuffer中的数据刷到磁盘上的过程就叫Spill。

  • Kvbuffer的大小可以通过io.sort.mb设置,默认大小为100M
  • Spill的门限可以通过io.sort.spill.percent,默认是0.8

sort

当Spill触发后,SortAndSpill先把Kvbuffer中的数据按照partition值和key两个关键字升序排序,同一partition内的按照key有序。

spill

spill阶段,数据从kvbuffer的内存中到了磁盘上。Spill线程根据排过序的Kvmeta挨个partition的把数据吐到这个文件中。

在这个过程中如果用户配置了combiner类,那么在写之前会先调用combineAndSpill(),对结果进行进一步合并后再写出。减少数据倾斜的又一种方式。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果

为了加快查找速度spill溢出的文件spill.out.index文件存放文件索引,如图:

  • 每个partition对应一个inde_p文件index_p文件记录了<起始数据,原始数据,压缩数据>

    下面你是sortAndSpill源码
private void sortAndSpill() throws IOException, ClassNotFoundException,
                                       InterruptedException {

      //approximate the length of the output file to be the length of the
      //buffer + header lengths for the partitions

      long size = (bufend >= bufstart ? bufend - bufstart : (bufvoid - bufend) + bufstart) +
                   partitions * APPROX_HEADER_LENGTH;

      FSDataOutputStream out = null;

      try {
        // create spill file
        final SpillRecord spillRec = new SpillRecord(partitions); //每个partiiton定义一个索引
        final Path filename =  mapOutputFile.getSpillFileForWrite(numSpills, size);
        out = rfs.create(filename);
        final int endPosition = (kvend > kvstart) ? kvend : kvoffsets.length + kvend;
        // 使用快速排序算法
        sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);

        int spindex = kvstart;
        // Spill文件的索引
        IndexRecord rec = new IndexRecord();
        InMemValBytes value = new InMemValBytes();

        for (int i = 0; i < partitions; ++i) {  // 循环访问各个分区
          IFile.Writer<K, V> writer = null;
          try {
            long segmentStart = out.getPos();
            writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
                                      spilledRecordsCounter);

            if (combinerRunner == null) {  //没有定义combiner
              // spill directly
              DataInputBuffer key = new DataInputBuffer();
              while (spindex < endPosition &&
                  kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) {

                final int kvoff = kvoffsets[spindex % kvoffsets.length];
                getVBytesForOffset(kvoff, value);
                key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
                          (kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART]));
                writer.append(key, value);
                ++spindex;

              }

            } else { //定义了combiner,使用combiner合并数据
              int spstart = spindex;
              while (spindex < endPosition &&
                       kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) {
                ++spindex;
              }

              // Note: we would like to avoid the combiner if we've fewer
              // than some threshold of records for a partition
              if (spstart != spindex) {
                combineCollector.setWriter(writer);
                RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex);
                combinerRunner.combine(kvIter, combineCollector);
              }

            }

            // close the writer
            writer.close();
            // record offsets
            rec.startOffset = segmentStart; //分区键值起始位置
            rec.rawLength = writer.getRawLength();//数据原始长度
            rec.partLength = writer.getCompressedLength();//数据压缩后的长度
            spillRec.putIndex(rec, i);

            writer = null;

          } finally {
            if (null != writer) writer.close();
          }

        }

        // 处理spill文件的索引,如果内存索引大小超过限制,则写入到文件中。

        if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
          // create spill index file

          Path indexFilename =
              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);

          spillRec.writeToFile(indexFilename, job);

        } else {
          indexCacheList.add(spillRec);

          totalIndexCacheMemory +=
            spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
        }
        LOG.info("Finished spill " + numSpills);
        ++numSpills;

      } finally {

        if (out != null) out.close();

      }

    }

map输出的数据和sort的数据公用kvbuffer

从原点开始bufindex存放的是map溢出的数据,kvmeta是sort的数据,互相朝着相反的方向运动,当运动到快要碰头的地方时,我们的指针方向发生反转(如上图变化到下图,环形缓冲区的本质就是一个字节数组)

merge


merge会将每个partition对应的所有的segment进行合并成为一个segment

如果是多批segment:
(1)先拿出第一批segment,以key为关键字形成最小堆,使用堆排序,
(2)拿到第二批segment,合并成为一个临时segment
(3)重复(2),直到所有的segment全部合并完成
(4)将所有的索引都放入index文件中

3.Reduce shuffle

copy

从网络中拉取数据,reduce默认开启5个线程拉取数据,但这个默认值可以通过mapreduce.reduce.shuffle.parallelcopies属性进行设置

merge

merge过程和reduce的有点像,也是溢写的过程,这里的缓冲区大小要比 map 端的更为灵活,它基于 JVM 的 heap size 设置,reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的

merge 有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘

在将buffer中多个map输出合并写入磁盘之前,如果设置了Combiner,则会化简压缩合并的map输出

Reduce的内存缓冲区可通过mapred.job.shuffle.input.buffer.percent配置,默认是JVM的heap size的70%。内存到磁盘merge的启动门限可以通过mapred.job.shuffle.merge.percent配置,默认是66%。

使用的是多路归并。

reduce

这时就要用到分组,默认的根据键分组,自定义的可是使用 job.setGroupingComparatorClass()方法设置分组函数类。这时会再次对其分组。只要这个比较器比较的两个Key相同,它们就属于同一组。如果是自定义javaBean,可以通过自定实现WritableComparator的cpmpare方法来实现将某一类数据认为是相同的key为同一组,同时进入到同一个reduce中。

参考文献:
https://blog.csdn.net/u014374284/article/details/49205885
http://matt33.com/2016/03/02/hadoop-shuffle/
https://blog.csdn.net/ASN_forever/article/details/81233547
https://blog.csdn.net/dianacody/article/details/39502917

全部评论

相关推荐

10-17 12:16
同济大学 Java
7182oat:快快放弃了然后发给我,然后让我也泡他七天最后再拒掉,狠狠羞辱他一把😋
点赞 评论 收藏
分享
1 2 评论
分享
牛客网
牛客企业服务