概览
在实现MapReduce的过程中,主要是实现一个MapReduce框架。这个框架是干嘛的呢?具体来说,就是使用者只需要定义自己的Map和Reduce方法,即可完成MapReduce功能。如下所示:
如下所示:
input is divided into "splits"
Input Map -> a,1 b,1 c,1
Input Map -> b,1
Input Map -> a,1 c,1
| | |
| -> Reduce -> c,2
-----> Reduce -> b,2
主要逻辑如下:
- MR会对每个split来调用Map(),并产生中间结果集k2,v2
- 对于每个k2,MR会聚合所有与之相对应的中间值v2。然后调用Reduce()
- 从Reduce()会产生最终的结果集
下面看一个具体的例子:word count
input is thousands of text files
Map(k, v)
split v into words
for each word w
emit(w, "1")
Reduce(k, v)
emit(len(v))
如上图:
- 输入:上千个文本文件
- Map(k, v):将输入v切割成具体的word,对于每个word,发出(w,”1”)
- Reduce(k, v):计算v的长度,然后返回给调用者。
MR框架实现
上面给出了word count的功能,以及用户定义的Map和Reduce方法的作用,从下面开始,我们开始用代码来实现具体的功能。
根据论文描述,MapReduce主要分为两个阶段:
Map阶段:
- 分配到map任务的worker会读取对应的输入文件的内容。
- 将文件内容传递给Map方法。
- map方法生成的K/V中间结果会被保存在本地文件中。
Reduce阶段:
- 当master通知一个reduce worker这些数据保存的位置时,该worker使用RPC调用来读取本地磁盘保存的数据。
- 将中间结果按照key进行归类。
- 将归类后的k/v集合传递给Reduce方法。
- 得到最终结果。
为了实现word count功能。可以由浅入深,分为四个步骤进行:
- 单线程情况下,完成MR最核心的功能,即MR的doMap()和doReduce()方法。
- 单线程下,完成用户自定义的Map()和Reduce()方法,实现word count功能。
- 多线程下,会新增master,完成master的schedule()方法,实现word count功能。
- 实现故障容错。
单线程MapReduce
在单线程情况下,MR首先会执行doMap,然后执行doReduce。
doMap():被map worker调用,读取一个输入文件,得到文件内容,然后调用用户自定义的map方法,并把map方法返回的结果保存在本地文件中。
doReduce():被reduce worker调用,读取在map阶段产生的中间结果,并对key进行归类,然后调用用户自定义的reduce方法,得到最终结果。
所以先实现MR框架的doMap()方法,如下:
代码如下:
|
|
然后再实现doReduce方法,如下:
代码如下:
|
|
单线程的word count
经过上面的步骤,我们已经完成的MR最核心的两个方法,现在再来自定义具体的map和reduce方法,实现word count功能。
map():完成文本切分,并返回KeyValue的集合。
reduce():对每个key的value进行计算。得到最终结果。
map()和reduce()方法如下:
|
|
多线程MapReduce
经过上面的步骤,已经完成的单线程版的word count功能。
但是在多线程的情况下,会有多个worker同时工作,这时候会有一个master来分配和管理。
在不考虑故障容错的情况下,根据论文描述,master主要的职责如下:
- worker启动时候会注册到master上。
- master为空闲的worker分配doMap或者doReduce任务。
master的调度主要体现在schedule()方法中:
实现如下:
|
|
故障转移
在上面多线程实现中,我们没有考虑worker失败的情况,那么当worker失败该如何处理呢?
在worker执行的过程中,每个worker都是无状态的节点。所以当worker失败的时候,master可以简单的将这个处理失败的任务分配给其他的worker进行处理。
改进schedule()方法,RPC调用每个worker进行处理的时候,如果rpc失败,则将任务分配给其他的worker进行处理,一直到rpc返回ok。
最简单粗暴的方法来实现:
|
|