做网站备案照片的要求,西安广告公司排名top10,做明星网站打广告,包装设计app目录
对比总结
MapReduce流程
编辑
MapTask流程
ReduceTask流程
MapReduce原理
阶段划分
Map shuffle
Partition
Collector
Sort
Spill
Merge
Reduce shuffle
Copy
Merge Sort 对比总结
Map端读取文件#xff1a;都是需要通过split概念来进行逻辑切片…目录
对比总结
MapReduce流程
编辑
MapTask流程
ReduceTask流程
MapReduce原理
阶段划分
Map shuffle
Partition
Collector
Sort
Spill
Merge
Reduce shuffle
Copy
Merge Sort 对比总结
Map端读取文件都是需要通过split概念来进行逻辑切片概念相同底层具体实现和参数略有差异业务逻辑实现方式MapReduce引擎是通过用户自定义实现Mapper和Reducer类来实现业务逻辑的而Spark提供了丰富的算子以及上层DataFrame、DataSet的抽象计算引擎MapReduce是基于标准的map reduce计算理念的实现map任务和reduce任务数据交换通过读写文件的方式进行的运行效率低Spark是基于内存的(RDD)同时Spark的计算引擎是基于DAG方式实现的类似Tez除了shuffle会通过文件进行数据交换外每个阶段的计算是基于内存的运行效率高基于内存有利于进行性能优化同时提供缓存和广播机制有利于计算结果复用适合算法迭代计算此外不但可以使用堆内内存还可以使用堆外内存部署方式都可以基于YARNMapReduce基于YARN实现了MRAppMasterSpark基于YARN实现了ApplicationMasterShuffle方式MapReduce采用了基于排序的数据聚集策略按Key排序而该策略是不可定制的不可以使用其他数据聚集算法如Hash聚集而Spark默认也需要排序但一定条件下也可以不进行排序ByPass和Tungsten方式。运行方式MapReduce的Job是基于JVM进程的而Spark的Driver和Executor是JVM进程Task运行是基于线程的。
MapReduce流程 MapTask流程
Read阶段Map Task通过用户编写的RecordReader从输入InputSplit中解析出一个个key/value。Map阶段该阶段主要是讲解析的key/value交给用户编写的map()函数处理并产生一些列新的key/value。Collect阶段在用户编写的map()函数中当数据处理完成后一般会调用OutputCollector.collect()输出结果。在该函数内部它会将生成的key/value分片通过调用Partitioner并写入一个环形内存缓冲区中。Spill阶段即“溢写”当环形缓冲区满后MapReduce会将数据写到本地磁盘上生成一个临时文件。需要主要的是将数据写入本地磁盘之前先要对数据进行一次本地排序并在必要时对数据进行合并、压缩等操作。Combine阶段当所有数据处理完成后Map Task对所有临时文件进行一次合并以确保最终只是生成一个数据文件。
ReduceTask流程
Shuffle阶段也称之为Copy阶段。Reduce Task从各个Map Task远程拷贝一片数据并针对某一片数据如果其大小超过一定阈值则写到磁盘上否则直接放到内存中。Merge阶段在远程拷贝数据的同时Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并以防止内存使用过多或磁盘上文件过多。Sort阶段按照MapReduce语义用户编写的reduce()函数输入数据是按Key进行聚集的一组数据。为了将Key相同的数据聚在一起Hadoop采用了基于排序的策略。由于各个Map Task已经实现对自己的处理结果进行了局部排序。因此Reduce Task只需对所有数据进行一次归并排序即可。Reduce阶段在该阶段中Reduce Task将每组数据依次交给用户编写的reduce()函数处理。Write阶段reduce()函数将计算结果写到HDFS上。
MapReduce原理
阶段划分
我们知道MapReduce计算模型主要由三个阶段构成Map、shuffle、Reduce。
Map是映射负责数据的过滤分法将原始数据转化为键值对Reduce是合并将具有相同key值的value进行处理后再输出新的键值对作为最终结果。Shuffle为了让Reduce可以并行处理Map的结果必须对Map的输出进行一定的排序与分割然后再交给对应的Reduce而这个将Map输出进行进一步整理并交给Reduce的过程就是Shuffle。
整个MR的大致过程如下 Map和Reduce操作需要我们自己定义相应Map类和Reduce类以完成我们所需要的化简、合并操作而shuffle则是系统自动帮我们实现的了解shuffle的具体流程能帮助我们编写出更加高效的Mapreduce程序。
Shuffle过程包含在Map和Reduce两端即Map shuffle和Reduce shuffle。
Map shuffle
在Map端的shuffle过程是对Map的结果进行分区、排序、分割然后将属于同一划分分区的输出合并在一起并写在磁盘上最终得到一个分区有序的文件分区有序的含义是map输出的键值对按分区进行排列具有相同partition值的键值对存储在一起每个分区里面的键值对又按key值进行升序排列默认其流程大致如下 Partition
对于map输出的每一个键值对系统都会给定一个partitionpartition值默认是通过计算key的hash值后对Reduce task的数量取模获得。如果一个键值对的partition值为1意味着这个键值对会交给第一个Reducer处理。
我们知道每一个Reduce的输出都是有序的但是将所有Reduce的输出合并到一起却并非是全局有序的如果要做到全局有序我们该怎么做呢最简单的方式只设置一个Reduce task但是这样完全发挥不出集群的优势而且能应对的数据量也很受限。最佳的方式是自己定义一个Partitioner用输入数据的最大值除以系统Reduce task数量的商作为分割边界也就是说分割数据的边界为此商的1倍、2倍至numPartitions-1倍这样就能保证执行partition后的数据是整体有序的。
另一种需要我们自己定义一个Partitioner的情况是各个Reduce task处理的键值对数量极不平衡。对于某些数据集由于很多不同的key的hash值都一样导致这些键值对都被分给同一个Reducer处理而其他的Reducer处理的键值对很少从而拖延整个任务的进度。当然编写自己的Partitioner必须要保证具有相同key值的键值对分发到同一个Reducer。
Collector
Map的输出结果是由collector处理的每个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间在内存中放置尽可能多的数据。
这个数据结构其实就是个字节数组叫Kvbuffer名如其义但是这里面不光放置了数据还放置了一些索引数据给放置索引数据的区域起了一个Kvmeta的别名在Kvbuffer的一块区域上穿了一个IntBuffer字节序采用的是平台自身的字节序的马甲。数据区域和索引数据区域在Kvbuffer中是相邻不重叠的两个区域用一个分界点来划分两者分界点不是亘古不变的而是每次Spill之后都会更新一次。初始的分界点是0数据的存储方向是向上增长索引数据的存储方向是向下增长如图所示 Kvbuffer的存放指针bufindex是一直闷着头地向上增长比如bufindex初始值为0一个Int型的key写完之后bufindex增长为4一个Int型的value写完之后bufindex增长为8。
索引是对在kvbuffer中的键值对的索引是个四元组包括value的起始位置、key的起始位置、partition值、value的长度占用四个Int长度Kvmeta的存放指针Kvindex每次都是向下跳四个“格子”然后再向上一个格子一个格子地填充四元组的数据。比如Kvindex初始位置是-4当第一个键值对写完之后(Kvindex0)的位置存放value的起始位置、(Kvindex1)的位置存放key的起始位置、(Kvindex2)的位置存放partition的值、(Kvindex3)的位置存放value的长度然后Kvindex跳到-8位置等第二个键值对和索引写完之后Kvindex跳到-12位置。
Kvbuffer的大小可以通过io.sort.mb设置默认大小为100M。但不管怎么设置Kvbuffer的容量都是有限的键值对和索引不断地增加加着加着Kvbuffer总有不够用的那天那怎么办把数据从内存刷到磁盘上再接着往内存写数据把Kvbuffer中的数据刷到磁盘上的过程就叫Spill多么明了的叫法内存中的数据满了就自动地spill到具有更大空间的磁盘。
关于Spill触发的条件也就是Kvbuffer用到什么程度开始Spill还是要讲究一下的。如果把Kvbuffer用得死死得一点缝都不剩的时候再开始Spill那Map任务就需要等Spill完成腾出空间之后才能继续写数据如果Kvbuffer只是满到一定程度比如80%的时候就开始Spill那在Spill的同时Map任务还能继续写数据如果Spill够快Map可能都不需要为空闲空间而发愁。两利相衡取其大一般选择后者。Spill的门限可以通过io.sort.spill.percent默认是0.8。
Spill这个重要的过程是由Spill线程承担Spill线程从Map任务接到“命令”之后就开始正式干活干的活叫SortAndSpill原来不仅仅是Spill在Spill之前还有个颇具争议性的Sort。
Sort
当Spill触发后SortAndSpill先把Kvbuffer中的数据按照partition值和key两个关键字升序排序移动的只是索引数据排序结果是Kvmeta中数据按照partition为单位聚集在一起同一partition内的按照key有序。
Spill
Spill线程为这次Spill过程创建一个磁盘文件从所有的本地目录中轮训查找能存储这么大空间的目录找到之后在其中创建一个类似于“spill12.out”的文件。Spill线程根据排过序的Kvmeta挨个partition的把数据吐到这个文件中一个partition对应的数据吐完之后顺序地吐下个partition直到把所有的partition遍历完。一个partition在文件中对应的数据也叫段(segment)。在这个过程中如果用户配置了combiner类那么在写之前会先调用combineAndSpill()对结果进行进一步合并后再写出。Combiner会优化MapReduce的中间结果所以它在整个模型中会多次使用。那哪些场景才能使用Combiner呢Combiner的输出是Reducer的输入Combiner绝不能改变最终的计算结果。所以从我的想法来看Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致且不影响最终结果的场景。比如累加最大值等。Combiner的使用一定得慎重如果用好它对job执行效率有帮助反之会影响reduce的最终结果。
所有的partition对应的数据都放在这个文件里虽然是顺序存放的但是怎么直接知道某个partition在这个文件中存放的起始位置呢强大的索引又出场了。有一个三元组记录某个partition对应的数据在这个文件中的索引起始位置、原始数据长度、压缩之后的数据长度一个partition对应一个三元组。然后把这些索引信息存放在内存中如果内存中放不下了后续的索引信息就需要写到磁盘文件中了从所有的本地目录中轮训查找能存储这么大空间的目录找到之后在其中创建一个类似于“spill12.out.index”的文件文件中不光存储了索引数据还存储了crc32的校验数据。spill12.out.index不一定在磁盘上创建如果内存默认1M空间中能放得下就放在内存中即使在磁盘上创建了和spill12.out文件也不一定在同一个目录下。每一次Spill过程就会最少生成一个out文件有时还会生成index文件Spill的次数也烙印在文件名中。索引文件和数据文件的对应关系如下图所示 在Spill线程如火如荼的进行SortAndSpill工作的同时Map任务不会因此而停歇而是一无既往地进行着数据输出。Map还是把数据写到kvbuffer中那问题就来了只顾着闷头按照bufindex指针向上增长kvmeta只顾着按照Kvindex向下增长是保持指针起始位置不变继续跑呢还是另谋它路如果保持指针起始位置不变很快bufindex和Kvindex就碰头了碰头之后再重新开始或者移动内存都比较麻烦不可取。Map取kvbuffer中剩余空间的中间位置用这个位置设置为新的分界点bufindex指针移动到这个分界点Kvindex移动到这个分界点的-16位置然后两者就可以和谐地按照自己既定的轨迹放置数据了当Spill完成空间腾出之后不需要做任何改动继续前进。分界点的转换如下图所示 Map任务总要把输出的数据写到磁盘上即使输出数据量很小在内存中全部能装得下在最后也会把数据刷到磁盘上。
Merge
Map任务如果输出数据量很大可能会进行好几次Spillout文件和Index文件会产生很多分布在不同的磁盘上。最后把这些文件进行合并的merge过程闪亮登场。
Merge过程怎么知道产生的Spill文件都在哪了呢从所有的本地目录上扫描得到产生的Spill文件然后把路径存储在一个数组里。Merge过程又怎么知道Spill的索引信息呢没错也是从所有的本地目录上扫描得到Index文件然后把索引信息存储在一个列表里。到这里又遇到了一个值得纳闷的地方。在之前Spill过程中的时候为什么不直接把这些信息存储在内存中呢何必又多了这步扫描的操作特别是Spill的索引数据之前当内存超限之后就把数据写到磁盘现在又要从磁盘把这些数据读出来还是需要装到更多的内存中。之所以多此一举是因为这时kvbuffer这个内存大户已经不再使用可以回收有内存空间来装这些数据了。对于内存空间较大的土豪来说用内存来省却这两个io步骤还是值得考虑的。 然后为merge过程创建一个叫file.out的文件和一个叫file.out.Index的文件用来存储最终的输出和索引一个partition一个partition的进行合并输出。对于某个partition来说从索引列表中查询这个partition对应的所有索引信息每个对应一个段插入到段列表中。也就是这个partition对应一个段列表记录所有的Spill文件中对应的这个partition那段数据的文件名、起始位置、长度等等。
然后对这个partition对应的所有的segment进行合并目标是合并成一个segment。当这个partition对应很多个segment时会分批地进行合并先从segment列表中把第一批取出来以key为关键字放置成最小堆然后从最小堆中每次取出最小的输出到一个临时文件中这样就把这一批段合并成一个临时的段把它加回到segment列表中再从segment列表中把第二批取出来合并输出到一个临时segment把其加入到列表中这样往复执行直到剩下的段是一批输出到最终的文件中。最终的索引数据仍然输出到Index文件中。
Reduce shuffle
在Reduce端shuffle主要分为复制Map输出、排序合并两个阶段。
Copy
Reduce任务通过HTTP向各个Map任务拖取它所需要的数据。Map任务成功完成后会通知父TaskTracker状态已经更新TaskTracker进而通知JobTracker这些通知在心跳机制中进行。所以对于指定作业来说JobTracker能记录Map输出和TaskTracker的映射关系。Reduce会定期向JobTracker获取Map的输出位置一旦拿到输出位置Reduce任务就会从此输出对应的TaskTracker上复制输出到本地而不会等到所有的Map任务结束。
Merge Sort
Copy过来的数据会先放入内存缓冲区中如果内存缓冲区中能放得下这次数据的话就直接把数据写到内存中即内存到内存merge。Reduce要向每个Map去拖取数据在内存中每个Map对应一块数据当内存缓存区中存储的Map数据占用空间达到一定程度的时候开始启动内存中merge把内存中的数据merge输出到磁盘上一个文件中即内存到磁盘merge。在将buffer中多个map输出合并写入磁盘之前如果设置了Combiner则会化简压缩合并的map输出。Reduce的内存缓冲区可通过mapred.job.shuffle.input.buffer.percent配置默认是JVM的heap size的70%。内存到磁盘merge的启动门限可以通过mapred.job.shuffle.merge.percent配置默认是66%。
当属于该reducer的map输出全部拷贝完成则会在reducer上生成多个文件如果拖取的所有map数据总量都没有内存缓冲区则数据就只存在于内存中这时开始执行合并操作即磁盘到磁盘mergeMap的输出数据已经是有序的Merge进行一次合并排序所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort即copy和sort两个阶段是重叠而不是完全分开的。最终Reduce shuffle过程会输出一个整体有序的数据块。