Sep 25, 2022


Apache Hadoop MapReduce

Apache Hadoop MapReduce 是一套在计算机集群上并行处理大型数据的开源框架,在工业界用得挺多的。

这次我们邀请了资深软件工程师猴哥来聊聊 MapReduce 的原理和代码实现。以下是活动笔记整理。

MapReduce

概述

先说个问题:为什么要有 GFS/HDFS 这种分布式文件系统?

很简单,数据量越来越大,存不下。但存下来不是为了当网盘用,而是为了后续的计算,而且是那种需要大量数据参与的计算。

那处理海量数据最有效的办法是什么?拆,分而治之。

MapReduce 就是一种在分布式文件系统上跑的分治计算方式。Map 是"映射",把每条记录作为输入,输出若干条处理后的记录。Reduce 是"归并"或"归约",把一组记录作为输入,输出最终结果。

计算程序和数据文件都在分布式集群里,那它们怎么协调和分配?核心原则就一条:让计算往数据那边跑,而不是反过来搬数据。

大概流程

map reduce high level

map reduce high level

  1. 客户端负责计算切片,然后把 Jar 包和配置上传到 HDFS。
  2. MapReduce 程序启动后,把上传的 Jar 包拉到各个计算节点。调度这块由 Yarn 负责,这里先略过。
  3. MapReduce 集群通过反射机制加载各个 Mapper 和 Reducer 类。
  4. 在最合适的节点上启动各个 MapTask,按照预先定义的"一条条"数据作为输入开始计算。
  5. MapTask 的数量取决于 split 的个数。
  6. MapTask 跑完后,生成键值对形式的中间数据。每个键值对都会经过一次分区计算,决定它该去哪个 Reducer 处理。中间数据存在 Map 节点的本地磁盘上。这里用到了 Partitioner,它是基于 Hash 实现的。
  7. 数据被拉到对应的 Reducer 进行计算。ReduceTask 的个数可以手动配置,默认是 1 个。
  8. Reducer 把计算结果写到 HDFS 里。

来看下客户端代码:

public class MyWordCount {
	public static void main(String[] args) throws Exception{
		// 加载 resources 里的 xml 文件
		Configuration conf = new Configuration(true);

		GenericOptionsParser parser = new GenericOptionsParser(conf, args); // 接收命令行参数
		String otherArgs[] = parser.getRemainingArgs(); // 解析参数

		// 让程序在 Mac 和 Windows 上也能跑
		conf.set("mapreduce.app-submission.cross-platform", "true");
		conf.set("mapreduce.framework.name", "yarn");
		Job job = Job.getInstance(conf);
		// 上传这个 jar 文件
		//job.setJar("/Users/shuzheng/IdeaProjects/hadoop-hdfs/target/hadoop-hdfs-1.0-SNAPSHOT.jar");
		// 主类
		job.setJarByClass(MyWordCount.class);
		job.setJobName("myJob");
		
		Path infile = new Path(otherArgs[0]);
		TextInputFormat.addInputPath(job, infile);
		Path outfile = new Path(otherArgs[1]);
		if (outfile.getFileSystem(conf).exists(outfile)) {
			outfile.getFileSystem(conf).delete(outfile, true);
		}
		TextOutputFormat.setOutputPath(job, outfile);
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);  // 反射
		job.setMapOutputValueClass(IntWritable.class);
		job.setReducerClass(MyReducer.class);
		job.setNumReduceTasks(2);  // 设置 reduce 任务数,默认是 1
		job.waitForCompletion(true);
	}
}
public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
	private final static IntWritable ONE = new IntWritable(1);
  private final static Text WORD = new Text();

  // Word count 中,key 是当前这一行在文件中的偏移量,value 是内容
	public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
		// StringTokenizer 按空格、制表符等分割字符串
		StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
	        WORD.set(itr.nextToken());
          context.write(WORD, ONE);  // MapTask 第 1156 行:keySerializer.serialize(key);
        }
    }
}
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	private IntWritable result = new IntWritable();
  // Word Count:key 是单词,value 是 1 或者 combine 后的出现次数,不是最终结果
	public void reduce(Text key, Iterable<IntWritable> values, Context context) 
							throws IOException, InterruptedException { // 其实是 List
		int sum = 0;
		for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}

详细流程和底层原理

map reduce low level

map reduce low level

现在聚焦单个 MapTask 和 ReduceTask。k-v-partition 这个组合是核心。

  1. 客户端计算 split 时,默认大小是 block size,但也可以设成比 block 大或小。默认是 128MB。为什么能调整?因为不同计算任务特性不一样:有的是 I/O 密集型,有的是 CPU 密集型。
  2. 计算程序通过反射加载,后面才能执行里面的计算逻辑。
  3. map 方法每处理一条记录,默认是一行文本,输出的 key-value-Partition 会根据 key 计算出对应的分区号,也就是该去哪个 Reducer。算完分区号后,数据不会立刻落盘,而是先写到环形缓冲区里。分区计算很关键,它承上启下,关系到后面的分治计算。Spark 里的 join,也就是 cogroup 算子,还有 Hive 和 Spark SQL 里的 group by,都依赖 Partitioner。
  4. 环形缓冲区默认 100MB,用到 80% 时就会触发 spill,这个逻辑在 MapOutputBuffer.init 里。Spill 由另一个线程写磁盘,避免阻塞主计算线程。落盘前会根据 key 做排序和 combine。Combine 是可选的,它相当于 map 端的"小 reduce"。
  5. Combine 的逻辑和 reduce 是一样的。
  6. 环形缓冲区经过多次 spill 会生成多个小文件,每个都不到 100MB,最后把它们合并成一个大文件。这个合并过程 I/O 成本比较高。
  7. Map 计算结束后,各个 Reducer 通过 HTTP 协议拉取属于自己的数据,这就是 Shuffle。这里有个关键点:相同的 key 必须去同一个 Partition,也就是同一个 Reducer。Yarn 负责调度 Reducer 来拉取数据。
  8. Reducer 一边归并拉取过来的文件,一边把相同 key 的一组数据交给用户自定义的 reduce() 方法处理,不需要先合并成完整的中间文件。这里用到了两个设计模式:模板方法和迭代器。

最终结果也是以 HDFS 文件的形式输出,自然有多个副本,数据可靠性有保障。

举例

MapReduce-3.png

实际运行结果:

(base) [root@hadoop-04 ~]# hdfs dfs -text /data/wc/output2/part-r-00000
aaa	1
hi	3
mr	2
(base) [root@hadoop-04 ~]# hdfs dfs -text /data/wc/output2/part-r-00001
bbb	1
bigdata	2
gfs	2
hello	6
world	1

图中数据是假设了各个 key 的 Hash 值,所以结果在最终文件中的分布会变化,但不影响正确性和相对顺序。

问题来了:词频统计怎么做?

总结

block : split
		 1:1
		 N:1
		 1:N
split : map
		 1:1
	map : reduce
		 N:1  默认
		 N:N (相同 key 必须去同一个 reduce)
		 1:1
		 ~~1:N~~ 不允许
key:partion N:1

相关源码(hadoop-2.6.5,只列了一些要点):

  1. 客户端定义 split 计算:Job.waitForCompletion → JobSubmitter.writeNewSplits() → FileInputFormat.getSplits。不特别设置的话,默认 split 大小就是 block size。
  2. MapTask 的调用:MapTask.run,其中每一行文本调用一次 map 方法。这是默认行为,由 TextInputFormat 和 LineRecordReader 实现。
  3. map 方法里调用 context.write 时会做这些事:计算分区号,分区计算很重要,Hive 和 Spark SQL 的 group by 都靠它。然后是排序,combine,序列化。序列化不会覆盖相同 key,最后是 spill。
  4. Shuffle 部分:Shuffle → Fetcher.run → copyFromHost → Fetcher.copyMapOutput → mapOutput.shuffle。
  5. Fetcher.copyFromHost 是拉模型,Reducer 去 Mapper 拉数据。Reducer 端负责发起 Shuffle,用 HTTP 方式,见 Fetcher.connection 成员。
  6. ReduceTask.run:封装迭代器传给 context,再传给用户定义的 reduce 方法。MergeQueue 才是真正的迭代器,其 next 方法实现有序迭代已排序的大文件。

Screen Shot 2022-09-28 at 12.32.41 PM.png

  1. 总结:算法、数据结构、设计模式、线程模型、I/O 模型是基础。

延伸

MapReduce 的缺点是什么?Spark 怎么解决的?

  1. Spark 缓存中间数据,可以存在内存、磁盘或分布式文件系统。RDD 的 iterator、cache 和 checkpoint 都支持这个功能,方便重复使用和失败重试。
  2. 多个算子搭配,相当于多个 MapReduce 顺序执行,不用每个 MR 程序单独启动。

spark word count

spark word count

  1. 读取效率方面,Spark 有的算子比如 sortByKey 会做数据抽样,计算数据集的步进,减少 I/O,比 MapReduce 优化了。
  2. 从 MR 程序可以看出,除了配置,用户需要上传的就是 map() 和 reduce() 两个方法,也就是两个函数。Spark 和 Scala 的函数式编程正好优化了代码量,让程序更简洁。
  3. Spark 通过构建 DAG 和划分 Stage,达到更细粒度的任务调度。一旦某个任务失败,只重试该 Stage,不用像 MapReduce 那样,中间某步失败就得从头来一遍。
  4. MapReduce 是"冷程序",提交后才分配资源启动执行,执行完就结束进程。Spark 是"热服务",资源一开始就抢占好,任务提交上来直接开始执行,执行完后 Spark 进程还在,继续用抢占的资源接新任务。
  5. 内存优化。更细粒度划分内存:on/off heap memory,堆外内存节省使用量,还不用麻烦 GC,但读写需要序列化、反序列化。执行内存和存储内存之间可以弹性伸缩、挤压,靠的是 BlockManager 和钨丝计划。
  6. 更频繁且优雅地使用迭代器模式。用迭代器串联多个算子的类似递归调用,比如 word count:foreach,println(reduceByKey(map(flatMap())))。
  7. IFile.Reader.nextRawKey/Value() 每次只读一个 K 或 V。Spark 有的算子比如 sortByKey 会抽样,计算数据集的步进,减少 I/O,比 MapReduce 优化了。

还有其他维度的解决办法吗?多个 MapReduce 任务怎么平滑地依次执行?MapReduce 集群是不是功能太杂了?业务上有没有改进空间?

map reduce shuffle combine 这个流程,可以把 MR 集群拆成微服务。这是 Spark 之外的另一个拓展方向。