Sep 25, 2022
Apache Hadoop MapReduce
Apache Hadoop MapReduce 是一套在计算机集群上并行处理大型数据的开源框架,在工业界用得挺多的。
这次我们邀请了资深软件工程师猴哥来聊聊 MapReduce 的原理和代码实现。以下是活动笔记整理。
MapReduce
概述
先说个问题:为什么要有 GFS/HDFS 这种分布式文件系统?
很简单,数据量越来越大,存不下。但存下来不是为了当网盘用,而是为了后续的计算,而且是那种需要大量数据参与的计算。
那处理海量数据最有效的办法是什么?拆,分而治之。
MapReduce 就是一种在分布式文件系统上跑的分治计算方式。Map 是"映射",把每条记录作为输入,输出若干条处理后的记录。Reduce 是"归并"或"归约",把一组记录作为输入,输出最终结果。
计算程序和数据文件都在分布式集群里,那它们怎么协调和分配?核心原则就一条:让计算往数据那边跑,而不是反过来搬数据。
大概流程

map reduce high level
- 客户端负责计算切片,然后把 Jar 包和配置上传到 HDFS。
- MapReduce 程序启动后,把上传的 Jar 包拉到各个计算节点。调度这块由 Yarn 负责,这里先略过。
- MapReduce 集群通过反射机制加载各个 Mapper 和 Reducer 类。
- 在最合适的节点上启动各个 MapTask,按照预先定义的"一条条"数据作为输入开始计算。
- MapTask 的数量取决于 split 的个数。
- MapTask 跑完后,生成键值对形式的中间数据。每个键值对都会经过一次分区计算,决定它该去哪个 Reducer 处理。中间数据存在 Map 节点的本地磁盘上。这里用到了 Partitioner,它是基于 Hash 实现的。
- 数据被拉到对应的 Reducer 进行计算。ReduceTask 的个数可以手动配置,默认是 1 个。
- Reducer 把计算结果写到 HDFS 里。
来看下客户端代码:
public class MyWordCount {
public static void main(String[] args) throws Exception{
// 加载 resources 里的 xml 文件
Configuration conf = new Configuration(true);
GenericOptionsParser parser = new GenericOptionsParser(conf, args); // 接收命令行参数
String otherArgs[] = parser.getRemainingArgs(); // 解析参数
// 让程序在 Mac 和 Windows 上也能跑
conf.set("mapreduce.app-submission.cross-platform", "true");
conf.set("mapreduce.framework.name", "yarn");
Job job = Job.getInstance(conf);
// 上传这个 jar 文件
//job.setJar("/Users/shuzheng/IdeaProjects/hadoop-hdfs/target/hadoop-hdfs-1.0-SNAPSHOT.jar");
// 主类
job.setJarByClass(MyWordCount.class);
job.setJobName("myJob");
Path infile = new Path(otherArgs[0]);
TextInputFormat.addInputPath(job, infile);
Path outfile = new Path(otherArgs[1]);
if (outfile.getFileSystem(conf).exists(outfile)) {
outfile.getFileSystem(conf).delete(outfile, true);
}
TextOutputFormat.setOutputPath(job, outfile);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class); // 反射
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(MyReducer.class);
job.setNumReduceTasks(2); // 设置 reduce 任务数,默认是 1
job.waitForCompletion(true);
}
}
public class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
private final static Text WORD = new Text();
// Word count 中,key 是当前这一行在文件中的偏移量,value 是内容
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// StringTokenizer 按空格、制表符等分割字符串
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
WORD.set(itr.nextToken());
context.write(WORD, ONE); // MapTask 第 1156 行:keySerializer.serialize(key);
}
}
}
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
// Word Count:key 是单词,value 是 1 或者 combine 后的出现次数,不是最终结果
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException { // 其实是 List
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
详细流程和底层原理

map reduce low level
现在聚焦单个 MapTask 和 ReduceTask。k-v-partition 这个组合是核心。
- 客户端计算 split 时,默认大小是 block size,但也可以设成比 block 大或小。默认是 128MB。为什么能调整?因为不同计算任务特性不一样:有的是 I/O 密集型,有的是 CPU 密集型。
- 计算程序通过反射加载,后面才能执行里面的计算逻辑。
- map 方法每处理一条记录,默认是一行文本,输出的 key-value-Partition 会根据 key 计算出对应的分区号,也就是该去哪个 Reducer。算完分区号后,数据不会立刻落盘,而是先写到环形缓冲区里。分区计算很关键,它承上启下,关系到后面的分治计算。Spark 里的 join,也就是 cogroup 算子,还有 Hive 和 Spark SQL 里的 group by,都依赖 Partitioner。
- 环形缓冲区默认 100MB,用到 80% 时就会触发 spill,这个逻辑在 MapOutputBuffer.init 里。Spill 由另一个线程写磁盘,避免阻塞主计算线程。落盘前会根据 key 做排序和 combine。Combine 是可选的,它相当于 map 端的"小 reduce"。
- Combine 的逻辑和 reduce 是一样的。
- 环形缓冲区经过多次 spill 会生成多个小文件,每个都不到 100MB,最后把它们合并成一个大文件。这个合并过程 I/O 成本比较高。
- Map 计算结束后,各个 Reducer 通过 HTTP 协议拉取属于自己的数据,这就是 Shuffle。这里有个关键点:相同的 key 必须去同一个 Partition,也就是同一个 Reducer。Yarn 负责调度 Reducer 来拉取数据。
- Reducer 一边归并拉取过来的文件,一边把相同 key 的一组数据交给用户自定义的 reduce() 方法处理,不需要先合并成完整的中间文件。这里用到了两个设计模式:模板方法和迭代器。
最终结果也是以 HDFS 文件的形式输出,自然有多个副本,数据可靠性有保障。
举例

实际运行结果:
(base) [root@hadoop-04 ~]# hdfs dfs -text /data/wc/output2/part-r-00000
aaa 1
hi 3
mr 2
(base) [root@hadoop-04 ~]# hdfs dfs -text /data/wc/output2/part-r-00001
bbb 1
bigdata 2
gfs 2
hello 6
world 1
图中数据是假设了各个 key 的 Hash 值,所以结果在最终文件中的分布会变化,但不影响正确性和相对顺序。
问题来了:词频统计怎么做?
总结:
block : split
1:1
N:1
1:N
split : map
1:1
map : reduce
N:1 默认
N:N (相同 key 必须去同一个 reduce)
1:1
~~1:N~~ 不允许
key:partion N:1
相关源码(hadoop-2.6.5,只列了一些要点):
- 客户端定义 split 计算:Job.waitForCompletion → JobSubmitter.writeNewSplits() → FileInputFormat.getSplits。不特别设置的话,默认 split 大小就是 block size。
- MapTask 的调用:MapTask.run,其中每一行文本调用一次 map 方法。这是默认行为,由 TextInputFormat 和 LineRecordReader 实现。
- map 方法里调用 context.write 时会做这些事:计算分区号,分区计算很重要,Hive 和 Spark SQL 的 group by 都靠它。然后是排序,combine,序列化。序列化不会覆盖相同 key,最后是 spill。
- Shuffle 部分:Shuffle → Fetcher.run → copyFromHost → Fetcher.copyMapOutput → mapOutput.shuffle。
- Fetcher.copyFromHost 是拉模型,Reducer 去 Mapper 拉数据。Reducer 端负责发起 Shuffle,用 HTTP 方式,见 Fetcher.connection 成员。
- ReduceTask.run:封装迭代器传给 context,再传给用户定义的 reduce 方法。MergeQueue 才是真正的迭代器,其 next 方法实现有序迭代已排序的大文件。

- 总结:算法、数据结构、设计模式、线程模型、I/O 模型是基础。
延伸
MapReduce 的缺点是什么?Spark 怎么解决的?
- Spark 缓存中间数据,可以存在内存、磁盘或分布式文件系统。RDD 的 iterator、cache 和 checkpoint 都支持这个功能,方便重复使用和失败重试。
- 多个算子搭配,相当于多个 MapReduce 顺序执行,不用每个 MR 程序单独启动。

spark word count
- 读取效率方面,Spark 有的算子比如 sortByKey 会做数据抽样,计算数据集的步进,减少 I/O,比 MapReduce 优化了。
- 从 MR 程序可以看出,除了配置,用户需要上传的就是 map() 和 reduce() 两个方法,也就是两个函数。Spark 和 Scala 的函数式编程正好优化了代码量,让程序更简洁。
- Spark 通过构建 DAG 和划分 Stage,达到更细粒度的任务调度。一旦某个任务失败,只重试该 Stage,不用像 MapReduce 那样,中间某步失败就得从头来一遍。
- MapReduce 是"冷程序",提交后才分配资源启动执行,执行完就结束进程。Spark 是"热服务",资源一开始就抢占好,任务提交上来直接开始执行,执行完后 Spark 进程还在,继续用抢占的资源接新任务。
- 内存优化。更细粒度划分内存:on/off heap memory,堆外内存节省使用量,还不用麻烦 GC,但读写需要序列化、反序列化。执行内存和存储内存之间可以弹性伸缩、挤压,靠的是 BlockManager 和钨丝计划。
- 更频繁且优雅地使用迭代器模式。用迭代器串联多个算子的类似递归调用,比如 word count:foreach,println(reduceByKey(map(flatMap())))。
- IFile.Reader.nextRawKey/Value() 每次只读一个 K 或 V。Spark 有的算子比如 sortByKey 会抽样,计算数据集的步进,减少 I/O,比 MapReduce 优化了。
还有其他维度的解决办法吗?多个 MapReduce 任务怎么平滑地依次执行?MapReduce 集群是不是功能太杂了?业务上有没有改进空间?
map reduce shuffle combine 这个流程,可以把 MR 集群拆成微服务。这是 Spark 之外的另一个拓展方向。