MapReduce源码分析--Shuffle阶段

一、收集

执行NewOutputCollector对象的write方法就开启了对map输出数据的收集过程:

? collector是一个MapOutputCollector类型的变量,它是实际执行收集数据功能的对象载体:

private final MapOutputCollector<K,V> collector;

? 但是它的实际类型是一个MapOutputBuffer类型的对象,可以从debug时变量的类型信息里看到:

MapReduce源码分析--Shuffle阶段

? 这是因为MapOutputBuffer实现了MapOutputCollector这个接口,收集器类必须实现collect、write、flush方法:

public static class MapOutputBuffer<K extends Object, V extends Object>
    implements MapOutputCollector<K, V>, IndexedSortable {
		……
}

? NewOutputCollector中的write方法,可以看出NewOutputCollector相对于是对MapOutputBuffer的一种封装。

@Override
public void write(K key, V value) throws IOException, InterruptedException {
  collector.collect(key, value,
                    partitioner.getPartition(key, value, partitions));
}

? 收集map输出的key-value对的时候,还要获取这个键值对的分区号,以便将不同的key/value数据划分到不同的分区,从而输出到不同的reducer中区。其中,默认的分区只有一个,所以默认也只有一个分区号。当我们想要多个分区的时候,可以在驱动类里手动设置分区的数量:

job.setNumReduceTasks(5);

? 默认分配区号的方式,是根据数据的key值得hashcode值进行分配得,当然我们也可以自己自定义分区得方式,前提是自己定义得分区类必须继承Partitioner,最后也要在驱动类里面设置。

job.setPartitionerClass(ProvincePartitioner.class);
// 设置好使用自定义的Partitioner,必须设置NumReduceTasks,否则还会按照默认的1来只开启一个
// reducetask进程,输出的分区文件也只有一个
// 如果自定义分区的个数小于设置的NumReduceTasks值,会抛出异常
job.setNumReduceTasks(5);

? 再来看在collect方法内部,此时这个collect方法就是MapOutputBuffer对象实现得collect方法,此方法通过其注释可知道就是序列化key、value到环形缓冲区。

private Class<K> keyClass;
private Class<V> valClass;

keyClass = (Class<K>)job.getMapOutputKeyClass();
valClass = (Class<V>)job.getMapOutputValueClass();

? 都会判断一下,传进来的参数key和value是否和成员变量keyClass、valClass的类型一致

if (key.getClass() != keyClass) {
  throw new IOException("Type mismatch in key from map: expected "
                        + keyClass.getName() + ", received "
                        + key.getClass().getName());
}
if (value.getClass() != valClass) {
  throw new IOException("Type mismatch in value from map: expected "
                        + valClass.getName() + ", received "
                        + value.getClass().getName());
}

? 也会判断一下,当前传进collect方法的分区号是否合法,即分区号不能为负,也不能大于等于分区总数

if (partition < 0 || partition >= partitions) {
  throw new IOException("Illegal partition for " + key + " (" +
      partition + ")");
}

? 对数据进行collect的时候,会先进行一个判断即判断当前缓冲区的剩余空间减去元数据的大小以后是否大于0,如果不大于0就要进行溢写操作,将环形缓冲区里面得内容写出去:

bufferRemaining -= METASIZE;
if (bufferRemaining <= 0) {
    ……
}

? bufferremaining:buffer剩余空间,字节为单位

? 如果bufferRemaining减去元数据的大小之后,大于0,那么就将当前传入的key/value以及对应的元数据写入环形缓冲区。

? 先是将key序列化写入环形缓冲区,序列化之后,会判断bufindex是否小于keystart

// serialize key bytes into buffer
int keystart = bufindex;
keySerializer.serialize(key);
if (bufindex < keystart) {
  // wrapped the key; must make contiguous
  bb.shiftBufferedKey();
  keystart = 0;
}

? 如果bufindex < keystart,那么说明key并不是连续存入缓冲区的,而是分开存储的;或者是key未完全存入到缓冲区的时候就满了,导致只写了一部分的key,所以此时就要执行bb.shiftBufferedKey()方法。要保证key是连续存储的原因是要对key进行排序,所以不能分开。

? 而value则是直接写入,而不用管是否分开存储了。

final int valstart = bufindex;
valSerializer.serialize(value);

写入对应key/value的元数据:

kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));

? 元数据由四个int型数据组成,分别是分区号、key起始位置、value起始位置、value的长度

? 环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,key/value的元数据存储的格式是int类型,每个key/value对应一个元数据。

? key/value序列化的数据和元数据在环形缓冲区中的存储是由equator分隔的,equator是缓冲区的分割线,专门用来分割数据和数据的元数据;key/value按照索引递增的方向存储,meta则按照索引递减的方向存储,将其数组抽象为一个环形结构之后,以equator为界,key/value顺时针存储,meta逆时针存储。

二、溢写并分区排序

? map将数据进行读取、处理并向外输出完成之后,程序会退出mapper.run方法,然后继续向下执行,依次关闭input对象和output对象

mapper.run(mapperContext);
  mapPhase.complete();
  setPhase(TaskStatus.Phase.SORT);
  statusUpdate(umbilical);
  input.close();
  input = null;
  output.close(mapperContext);

? 关闭output对象的时候,本质上执行的也是MapOutputBuffer对象collector的flush方法,从close方法实现就可以看出来:

@Override
public void close(TaskAttemptContext context
                  ) throws IOException,InterruptedException {
  try {
    collector.flush();
  } catch (ClassNotFoundException cnf) {
    throw new IOException("can‘t find class ", cnf);
  }
  collector.close();
}

? 用户结束map阶段的数据处理和输出工作之后,已经没有数据再输出到缓冲区了,但是缓冲区中还有一些数据没有溢写到磁盘中,所以需要将最后遗留的一些数据溢出到磁盘上,这就是由flush完成的。

? 在溢写到磁盘之前,先要对数据进行一次缓冲区内的排序,并在必要时对数据进行合并、压缩等操作。详情步骤是:

? 步骤1:利用快速排序算法对缓存区内(内存中)的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。

? 步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);

? 这段代码就是以partition和key来排序的,目的是聚合相同partition的key-value,并以key的顺序排列。溢写到磁盘以后会生成一个对应的临时文件。这些临时文件将会在合并阶段被合并成为一个大的数据文件。

三、合并相同的分区多个溢写文件

? 分区排序并溢写完成之后,开始进行mergePart操作,merge阶段的目的是将多个spill生成的中间文件合并为一个输出文件,保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。

? 在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

? 观察源码可以发现,在mergePart方法内部定义了一个Path类型的数组,它是用于存储临时spill文件路径的。其中变量numSpills的值就是spill文件的总数量。

final Path[] filename = new Path[numSpills];
下面这段代码是依次获取spill文件路径和文件大小的过程,finalOutFileSize用于累加所有spill文件的大小,以此作为最终输出文件的大小:
for(int i = 0; i < numSpills; i++) {
  filename[i] = mapOutputFile.getSpillFile(i);
  finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}

? 让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

参考文档:

https://www.cnblogs.com/pricks/p/3875026.html

相关推荐