Map-Reduce
2018-01-11

http://www.justabug.net/mind-map-reduce/ (左右滚动条在最下方)
3.2 容错 java实现 例如,计算一个大的文档集合中每个单词出现的次数 map(String key, String value): // key: document name // value: document contents for each word w in value: EmitIntermediate(w, "1"); reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result)); Map函数输出文档中的每个词、以及这个词的出现次数(在这个简单的例子里就是1)。 Reduce函数把Map函数产生的每一个特定的词的计数累加起来。 例子 Master Worker故障 master失败 原理 执行过程 在失效方面的处理机制 1.切片 2.Master分配任务给work 3.work解析K-V,使用Map函数处理 4.将缓存的K-V写入磁盘 5.worker获取数据并排序 6.Reduce 7.完成并唤醒用户。 用户程序首先调用的MapReduce库将输入文件分成M个数据片度 每个数据片段的大小一般从 16MB到64MB(可以通过可选的参数来控制每个数据片段的大小)。然后用户程序在机群中创建大量的程序副本。 这些程序副本中的有一个特殊的程序--master。副本中其它的程序都是worker程序,由master分配任务。 有M个Map任务和R个Reduce任务将被分配,master将一个Map任务或Reduce任务分配给一个空闲的worker。 被分配了map任务的worker程序读取相关的输入数据片段,从输入的数据片段中解析出K-V, 然后把K-V传递给用户自定义的Map函数,由Map函数生成并输出的中间K-V,并缓存在内存中。 缓存的K-V通过分区函数分成R个区域,之后周期性的写入到本地磁盘上。 缓存的K-V在本地磁盘上的存储位置将被回传给master,由master负责把这些存储位置再传送给Reduce worker。 当Reduce worker程序接收到master程序发来的数据存储位置信息后,使用RPC从Map worker所在主机的磁盘上读取这些缓存数据。 当Reduce worker读取了所有的中间数据后,通过对key进行排序后使得具有相同key值的数据聚合在一起。 由于许多不同的key值会映射到相同的Reduce任务上,因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。 Reduce worker程序遍历排序后的中间数据, 对于每一个唯一的中间key值,Reduce worker程序将这个key值和它相关的中间value值的集合传递给用户自定义的Reduce函数。 Reduce函数的输出被追加到所属分区的输出文件。 当所有的Map和Reduce任务都完成之后,master唤醒用户程序。 在这个时候,在用户程序里的对MapReduce调用才返回。 持有一些数据结构,它存储每一个Map和Reduce任务的状态(空闲、工作中或完成),以及Worker机器(非空闲任务的机器)的标识。 Master就像一个数据管道,中间文件存储区域的位置信息通过这个管道从Map传递到Reduce。 因此,对于每个已经完成的Map任务,master存储了Map任务产生的R个中间文件存储区域的大小和位置。 当Map任务完成时,Master接收到位置和大小的更新信息,这些信息被逐步递增的推送给那些正在工作的Reduce任务。 当用户提供的Map和Reduce操作是幂等的,我们的分布式实现在任何情况下的输出都和所有程序没有出现任何错误、顺序的执行产生的输出是一样的。 当一个Map任务完成时,worker发送一个包含R个临时文件名的完成消息给master。 如果master从一个已经完成的Map任务再次接收到到一个完成消息,master将忽略这个消息;否则,master将这R个文件的名字记录在数据结构里。 当Reduce任务完成时,Reduce worker进程以原子的方式把临时文件重命名为最终的输出文件。 如果同一个Reduce任务在多台机器上执行,针对同一个最终的输出文件将有多个重命名操作执行。 我们依赖底层文件系统提供的重命名操作的原子性来保证最终的文件系统状态仅仅包含一个Reduce任务产生的数据。 我们依赖【对Map和Reduce任务的输出是原子提交的】这个特性。每个工作中的任务把它的输出写到私有的临时文件中。 每个Reduce任务生成一个这样的文件,而每个Map任务则生成R个这样的文件(Reduce任务与文件一一对应) 1.周期性检查与失效 master周期性的ping每个worker。如果在一个约定的时间范围内没有收到worker返回的信息,master将把这个worker标记为失效。 所有由这个失效的worker完成的Map任务被重设为初始的空闲状态,之后这些任务就可以被安排给其他的worker。 同样的,worker失效时正在运行的Map或Reduce任务也将被重新置为空闲状态,等待重新调度。 2.故障处理 当worker故障时,由于已经完成的Map任务的输出存储在这台机器上,Map任务的输出已不可访问了,因此必须重新执行。 而已经完成的Reduce任务的输出存储在全局文件系统上,因此不需要再次执行。 3.重复执行(接盘) 当一个Map任务首先被worker A执行,之后由于worker A失效了又被调度到worker B执行,这个“重新执行”的动作会被通知给所有执行Reduce任务的worker。 任何还没有从worker A读取数据的Reduce任务将从worker B读取数据。 4.MapReduce可以处理大规模worker失效的情况。 MapReduce master只需要简单的再次执行那些不可访问的worker完成的工作,之后继续执行未完成的任务,直到最终完成这个MapReduce操作 一个简单的解决办法是让master周期性的将上面描述的数据结构(alex注:指3.2节)的写入磁盘,即检查点(checkpoint)。 如果这个master任务失效了,可以从最后一个检查点(checkpoint)开始启动另一个master进程。 然而,由于只有一个master进程,master失效后再恢复是比较麻烦的,因此我们现在的实现是如果master失效,就中止MapReduce运算。 客户可以检查到这个状态,并且可以根据需要重新执行MapReduce操作。 使用MapReduce模型的程序员可以很容易的理解他们程序的行为,因为我们绝大多数的Map和Reduce操作是确定性的(幂等), 而且存在这样的一个事实:我们的[失效处理机制]等价于一个[顺序的执行的操作]。 当Map或/和Reduce操作是不确定性的时候,我们提供虽然较弱但是依然合理的处理机制。 当使用非确定操作的时候,一个Reduce任务R1的输出等价于一个非确定性程序顺序执行产生时的输出。 但是,另一个Reduce任务R2的输出也许符合一个不同的非确定顺序程序执行产生的R2的输出。 细节 存储相关 任务粒度 备用任务 技巧 分区函数 顺序保证 Combiner函数 输入和输出的类型 性能 (Google家的测试) GFS把每个文件按64MB一个Block分隔,每个Block保存在多台机器上,环境中就存放了多份拷贝(一般是3个拷贝)。 MapReduce的master在调度Map任务时会考虑输入文件的位置信息,尽量将一个Map任务调度在包含相关输入数据拷贝的机器上执行 如果上述努力失败了,master将尝试在保存有输入数据拷贝的机器附近的机器上执行Map任务(例如,分配到一个和包含输入数据的机器在一个switch里的worker机器上执行) 当在一个足够大的cluster集群上运行大型MapReduce操作的时候,大部分的输入数据都能从本地机器读取,因此消耗非常少的网络带宽。 我们把Map拆分成了M个片段、把Reduce拆分成R个片段执行。 理想情况:M和R应当比集群中worker的机器数量要多得多。 在每台worker机器都执行大量的不同任务能够提高集群的动态的负载均衡能力,并且能够加快故障恢复的速度: 失效机器上执行的大量Map任务都可以分布到所有其他的worker机器上去执行。 实际情况:在我们的具体实现中对M和R的取值都有一定的客观限制,因为master必须执行O(M+R)次调度,并且在内存中保存O(M*R)个状态 (对影响内存使用的因素还是比较小的:O(M*R)块状态,大概每对Map任务/Reduce任务1个字节就可以了)。 更进一步:R值通常是由用户指定的,因为每个Reduce任务最终都会生成一个独立的输出文件。 实际使用时我们也倾向于选择合适的M值,以使得每一个独立任务都是处理大约16M到64M的输入数据(这样,上面描写的输入数据本地存储优化策略才最有效)。 另外,我们把R值设置为我们想使用的worker机器数量的小的倍数。我们通常会用这样的比例来执行MapReduce:M=200000,R=5000,使用2000台worker机器。 分布式的Grep 计算URL访问频率 倒转网络链接图 倒排索引 分布式排序 “落伍者” 减少“落伍者” 影响一个MapReduce的总执行时间最通常的因素是“落伍者” 备用(backup)任务进程,用于处理剩下的1% 在中间key上使用分区函数来对数据进行分区 MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R)。 我们在中间key上使用分区函数来对数据进行分区,之后再输入到后续任务执行进程。 一个缺省的分区函数是使用hash方法(比如,hash(key) mod R)进行分区。hash方法能产生非常平衡的分区。 然而,有的时候,其它的一些分区函数对key值进行的分区将非常有用。 比如,输出的key值是URLs,我们希望每个主机的所有条目保持在同一个输出文件中。 为了支持类似的情况,MapReduce库的用户需要提供专门的分区函数。 例如,使用“hash(Hostname(urlkey)) mod R”作为分区函数就可以把所有来自同一个主机的URLs保存在同一个输出文件中。 按KEY值排序 我们确保在给定的分区中,中间key/value pair数据的处理顺序是按照key值增量顺序处理的。 这样的顺序保证对每个分成生成一个有序的输出文件,这对于需要对输出文件按key值随机存取的应用非常有意义,对在排序输出的数据集也很有帮助。 排序 业界 在某些情况下,Map函数产生的中间key值的重复数据会占很大的比重,并且,用户自定义的Reduce函数满足结合律和交换律。 在2.1节的词数统计程序是个很好的例子。由于词频率倾向于一个zipf分布(齐夫分布),每个Map任务将产生成千上万个这样的记录<the,1>。 所有的这些记录将通过网络被发送到一个单独的Reduce任务,然后由这个Reduce任务把所有这些记录累加起来产生一个数字。 我们允许用户指定一个可选的combiner函数,combiner函数首先在本地将这些记录进行一次合并,然后将合并的结果再通过网络发送出去。 中间key值的重复数据多,允许用户指定combiner函数在本地先合并一次 模型 例子 类型 描述 Map函数 Reduce函数 input:k1,v1, output:list(k2,v2)[中间] input:k2,list(v2)[中间] output:合并后产生list(v2) MapReduce库把所有具有相同中间key值I的中间value值集合在一起后传递给reduce函数。 Reduce函数合并这些value值,形成一个较小的value值的集合。 一般的,每次Reduce函数调用只产生0或1个输出value值。 通常我们通过一个迭代器把中间value值提供给Reduce函数,这样我们就可以处理无法全部放入内存中的大量的value值的集合。 Combiner函数与Reduce函数 部分的合并中间结果可以显著的提高一些MapReduce操作的速度。 附录A包含一个使用combiner函数的例子 输入格式 读取方式 副作用 跳过损坏的记录 本地执行 操作过程中增加辅助的输出文件 忽略一些有问题的记录也是可以接受的 有时候,用户程序中的bug导致Map或者Reduce函数在处理某些记录的时候crash掉,MapReduce操作无法顺利完成。 惯常的做法是修复bug后再次执行MapReduce操作,但是,有时候找出这些bug并修复它们不是一件容易的事情; 这些bug也许是在第三方库里边,而我们手头没有这些库的源代码。 而且在很多时候,忽略一些有问题的记录也是可以接受的,比如在一个巨大的数据集上进行统计分析的时候。 我们提供了一种执行模式,在这种模式下,为了保证保证整个处理能继续进行,MapReduce会检测哪些记录导致确定性的crash,并且跳过这些记录不处理。 每个worker进程都设置了信号处理函数捕获内存段异常(segmentation violation)和总线错误(bus error)。 在执行Map或者Reduce操作之前,MapReduce库通过全局变量保存记录序号。 如果用户程序触发了一个系统信号,消息处理函数将用“最后一口气”通过UDP包向master发送处理的最后一条记录的序号。 当master看到在处理某条特定记录不止失败一次时,master就标志着条记录需要被跳过,并且在下次重新执行相关的Map或者Reduce任务的时候跳过这条记录。 状态信息 计数器 调试 调试Map和Reduce函数的bug是非常困难的,因为实际执行操作时不但是分布在系统中执行的,而且通常是在好几千台计算机上执行, 具体的执行位置是由master进行动态调度的,这又大大增加了调试的难度。 为了简化调试、profile和小规模测试,我们开发了一套MapReduce库的本地实现版本,通过使用本地版本的MapReduce库,MapReduce操作在本地计算机上顺序的执行。 用户可以控制MapReduce操作的执行,可以把操作限制到特定的Map任务上。 用户通过设定特别的标志来在本地执行他们的程序,之后就可以很容易的使用本地调试和测试工具(比如gdb)。 master使用嵌入式的HTTP服务器,便于监控 状态信息页面显示了包括计算执行的进度, 比如已经完成了多少任务、有多少任务正在处理、输入的字节数、中间数据的字节数、输出的字节数、处理百分比等等。 页面还包含了指向每个任务的stderr和stdout文件的链接。用户根据这些数据预测计算需要执行大约多长时间、是否需要增加额外的计算资源。 这些页面也可以用来分析什么时候计算执行的比预期的要慢。 另外,处于最顶层的状态页面显示了哪些worker失效了,以及他们失效的时候正在运行的Map和Reduce任务。 这些信息对于调试用户代码中的bug很有帮助。 MapReduce库使用计数器统计不同事件发生次数 比如,用户可能想统计已经处理了多少个单词、已经索引的多少篇German文档等等。 Counter* uppercase; uppercase = GetCounter(“uppercase”); map(String name, String contents): for each word w in contents: if (IsCapitalized(w)): uppercase->Increment(); EmitIntermediate(w, “1″); worker传递给master,master统计 这些计数器的值周期性的从各个单独的worker机器上传递给master(附加在ping的应答包中传递)。 master把执行成功的Map和Reduce任务的计数器值进行累计,当MapReduce操作完成之后,返回给用户代码 计数器当前的值也会显示在master的状态页面上,这样用户就可以看到当前计算的进度。 当累加计数器的值的时候,master要检查重复运行的Map或者Reduce任务,避免重复累加(之前提到的备用任务和失效后重新执行任务这两种情况会导致相同的任务被多次执行)。 有些计数器的值是由MapReduce库自动维持的,比如已经处理的输入的key/value pair的数量、输出的key/value pair的数量等等。 集群配置 GREP 高效的backup任务 失效的机器 对比例子 一类是对数据格式进行转换,从一种表现形式转换为另外一种表现形式 另一类是从海量数据中抽取少部分的用户感兴趣的数据 参考:http://blog.csdn.net/linvo/article/details/6596468 中间过程 过程 a 【上】输入 峰值会达到13GB/s,大约200秒之后迅速滑落到了0。 值得注意的是,排序程序输入数据读取速度小于分布式grep程序。 这是因为排序程序的Map任务花了大约一半的处理时间和I/O带宽把中间输出结果写到本地硬盘。 相应的分布式grep程序的中间结果输出几乎可以忽略不计。 【中】中间数据从Map任务发送到Reduce任务 这个过程从第一个Map任务完成之后就开始缓慢启动了。 图示的第一个高峰是启动了第一批大概1700个Reduce任务 (整个MapReduce分布到大概1700台机器上,每台机器1次最多执行1个Reduce任务)。 排序程序运行大约300秒后,第一批启动的Reduce任务有些完成了,开始执行剩下的Reduce任务 所有的处理在大约600秒后结束。 【下】输出 在第一个排序阶段结束和数据开始写入磁盘之间有小延时,因为worker正在忙于排序中间数据 磁盘写入速度在2-4GB/s持续一段时间。输出数据写入磁盘大约持续850秒。 计入初始启动部分的时间,整个运算消耗了891秒。 这个速度和TeraSort benchmark[18]的最高纪录1057秒相差不多。 本地读入提高性能 b 显示了关闭了备用任务后排序程序执行情况 (b)执行的过程和(a)很相似, 除了输出数据写磁盘的动作在时间上拖了一个很长的尾巴,而且在这段时间里,几乎没有什么写入动作。 在960秒后,只有5个Reduce任务没有完成。这些拖后腿的任务又执行了300秒才完成。 整个计算消耗了1283秒,多了44%的执行时间。 c (c)的排序程序执行过程中,我们在程序开始后几分钟有意的kill了1746个worker中的200个。 集群底层的调度立刻在这些机器上重新开始新的worker处理进程 (因为只是worker机器上的处理进程被kill了,机器本身还在工作)。 (c)显示出了一个“负”的输入数据读取速度,这是因为一些已经完成的Map任务丢失了 (由于相应的执行Map任务的worker进程被kill了),需要重新执行这些任务。 相关Map任务很快就被重新执行了。整个运算在933秒内完成,包括了初始启动时间 (只比正常执行多消耗了5%的时间)。 还有一些值得注意的现象: 输入数据的读取速度比排序速度和输出数据写入磁盘速度要高不少, 这是因为我们的输入数据本地化优化策略起了作用 — 绝大部分数据都是从本地硬盘读取的,从而节省了网络带宽。 排序速度比输出数据写入到磁盘的速度快,这是因为输出数据写了两份 (我们使用了2路的GFS文件系统,写入复制节点的原因是为了保证数据可靠性和可用性)。 我们把输出数据写入到两个复制节点的原因是因为这是底层文件系统的保证数据可靠性和可用性的实现机制。 如果底层文件系统使用类似容错编码[14](erasure coding)的方式而不是复制的方式保证数据的可靠性和可用性, 那么在输出数据写入磁盘的时候,就可以降低网络带宽的使用。 成功之处 并行处理 容错处理 数据本地化优化 负载均衡 Hadoop Lucene ElasticSearch 主体 Hadoop 入口 Map Reduce 定义一个 Job, 将 Mapper, Reducer 等必要的值设置进去。 [将 (K1, V1) 的输入转化成 list(K2, V2) 的输出] [将 (K2, list(V2)) 的输入转化成 list(K3, V3) 的输出] http://blog.csdn.net/admin1973/article/details/62037603?locationNum=6&fps=1 Nutch JAVA8 lambda Phoenix 分析 Map-Reduce