Hadoop-MapReduce
定义
一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用”的核心框架。
核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。
特点
优点
- 易于编程:简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。
- 良好的扩展性:当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
- 高容错性:设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,这个过程不需要人工参与。
- 适合 PB 级以上海量数据的离线处理可:以实现上千台服务器集群并发工作,提供数据处理能力。
缺点
- 不擅长实时计算:无法在毫秒或者秒级内返回结果。
- 不擅长流式计算:输入数据集是静态的,不能动态变化,这是由自身的设计特点决定的。
- 多任务耗时:由于每执行一次任务后,结果都会存储磁盘中,多次操作会增大 IO 开销,这是由自身的设计特点决定的。
- 不擅长 DAG (有向图) 计算:没有相应的流程优化。
架构
MrAppMaster
- 负责整个程序的过程调度及状态协调。
MapTask
- 负责 Map 阶段的整个数据处理流程。
工作机制
- Read 阶段:通过用户编写的 RecordReader,从 InputSplit 中解析键值对。
- Map 阶段:将解析出的 key/value 交给用户编写 map() 函数处理,并产生一系列新的 key/value。
- Collect 阶段:在用户编写 map()函数中,当数据处理完成后,一般会调用
OutputCollector.collect()
输出结果。在该函数内部,它会将生成的 key/value 分区(调用 Partitioner),并写入一个环形内存缓冲区中。 - Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上, 生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
- 利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号进行排序,然后按照 key 进行排序。
- 按照分区编号由小到大依次写入任务工作目录下的临时文件
output/spillN.out
(N 表示当前溢写次数)中。如果用户设置了 Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。 - 将分区数据的元信息写到内存索引数据结构
SpillRecord
中,包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过 1MB,则将内存索引写到文件output/spillN.out.index
中。
- Combine 阶段:当所有数据处理完成后,将所有临时文件合并成一个大文件,保存到文件
output/file.out
中,同时生成相应的索引文件output/file.out.index
。 文件合并以分区为单位。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor
(默认 10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。 让每个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
并行度
- 由切片数量决定
ReduceTask
- 负责 Reduce 阶段的整个数据处理流程。
工作机制
- Copy 阶段:从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
- Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
- Sort 阶段:按 key 进行聚合。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此只需对所有数据进行一次归并排序即可。
- Reduce 阶段:reduce() 函数将计算结果写到 HDFS 上。
并行度
- 默认为 1,可手动设置
- 要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有 1 个ReduceTask
- 如果分区数不是 1,但是 并行度为 1,则不会执行分区过程,因为在 MapTask 中,执行分区的前提是先判断 ReduceNum 个数是否大于 1。不大于 1 则不执行。
JobClient
- 作业与集群交互的主要接口,提供了提交作业、跟踪其进度、访问组件任务的报告/日志的功能,获取集群状态信息等工具。
JobTracker
- 对应 NameNode,后台服务进程,负责监听并接收各个 TaskTracker 发送的心跳信息,包括资源使用情况和任务运行情况等信息;同时有容错和为任务调度提供决策依据的功能,其中作业控制模块,负责作业的分解和状态监控包括 TaskTracker 状态监控、作业状态监控和任务状态监控。
TaskTracker
- 对应 DataNode,JobTracker 和 Task 之间的桥梁:一方面,从 JobTracker 接收并执行各种命令:运行任务、提交任务、杀死任务等;另一方面,将本地节点上各个任务的状态通过心跳周期性汇报给 JobTracker。
流程
运算程序往往需要分成至少 2 个阶段:
第一个阶段的 MapTask 并发实例,完全并行运行,互不相干。
第二个阶段的 ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段的所有 MapTask 并发实例的输出。
编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行。
- JobClient 向 JobTracker 申请可用 JobID。
- JobClient 将运行 Job 所需要的资源拷贝到 HDFS 中,包括 MapReduce 程序打包的 JAR 文件、配置文件和客户端计算所得的输入切片信息,这些文件都存放在 JobTracker 专门为该作业创建的文件夹中,文件夹名为该作业的 JobID。JAR文件默认会有10个副本(mapred.submit.replication)。
- JobClient 向 JobTracker 提交 Job,JobTracker 接收到作业后,将其放在一个作业队列里,等待作业调度器对其进行调度。
- 当调度到该作业时,会根据输入切片信息为每个切片创建一个 MapTask,并将其分配给 TaskTracker 执行。
- TaskTracker 会根据主机核的数量和内存的大小有分配固定数量的 map 槽和 reduce 槽:map 的任务分配遵循数据本地化(Data-Local)原则,将 map 任务分配给含有该 map 处理的数据块的 TaskTracker 上,同时将资源复制到该 TaskTracker 上来运行,这叫“运算移动,数据不移动”;分配 reduce 任务时则不考虑数据本地化。
- JobTracker 初始化 Job。
- JobTracker 从 HDFS 中获取输入切片。
- 与此同时,TaskTracker 不断地向 JobTracker 汇报心跳信息,心跳中还携带着很多的信息,比如当前 map 任务完成的进度等。当 JobTracker 收到作业的最后一个任务完成信息时,便把该作业设置成“成功”。当 JobClient 查询状态时,如果它得知任务已完成,便显示一条消息给用户。
- TaskTracker 得到 JobTracker 分配的任务后,从 HDFS 获取 Job 资源,若数据是本地的,不需拷贝数据。
- TaskTracker 启动
- JVM 子进程运运行
Input Split (输入切片)
数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。默认情况下,切片大小等于数据块大小,只记录切片切片的元数据信息,例如起始位置、长度、所在节点列表等,并非数据本身。
切片时不考虑整体,而是逐个针对每个文件单独切片。
每个分片分配一个 MapTask 处理。
参数设置
mapreduce.input.fileinputformat.split.minsize
= 1mapreduce.input.fileinputformat.split.maxsize
=Long.MAXValue
- 切片大小 = Math.max(minSize,Math.min(maxSize,blockSize));
Map 阶段
一般 map 操作都是本地化操作也就是在数据存储节点上进行;
Combiner 阶段
Combiner 组件的父类是 Reducer,区别在于运行位置,Combiner 在每个MapTask 所在的节点运行。
目的在于对每个 MapTask 的输出进行局部合并,以减少网络传输量。
使用原则是不会影响到最终的业务输入,Combiner 的输出键值对应与 Reducer 的输出键值对对应。例如:只是求总数,最大值,最小值时可以使用 combiner,但做平均值计算时,最终的计算结果就会出错。
Shuffle 阶段
shuffle 的本意是洗牌,把一组有一定规律的数据尽量打散转换成一组无规律的数据,而 MapReduce 中的 shuffle 更像是洗牌的逆过程,把一组无规律的数据尽量转换成一组具有一定规律的数据。
流程
- MapTask 收集输出的键值对,放到内存缓冲区中。
- 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件。
- 多个溢出文件会被合并成大的溢出文件。
- 在溢出过程及合并的过程中,都要调用
Partitioner
的getPartition()
方法进行分区和针对 key 进行排序 。 - ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据。
- ReduceTask 会通过归并排序将这些数据再合并成大文件。
参数设置
- 缓冲区大小会影响到 MapReduce 程序的执行效率,原则上说,缓冲区越大, 磁盘 io 的次数越少,执行速度就越快。
io.sort.mb
= 1
Partitioner(分区)
- 默认分区时根据 key 的 hashCode 对 ReduceTasks 的个数取模得到,用户无法控制存储分区位置。
Sort (排序)
MapTask 和 ReduceTask 均会对数据按照 key 进行排序,默认排序是按照字典顺序排序,使用快速排序。该操作属于 Hadoop 的默认行为,任何数据均会被排序,而不管逻辑上是否需要。
对于 MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上。当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于 ReduceTask,它从每个 MapTask 上远程拷贝相应的数据文件,如果文件大小超过一定阈值(默认 80),则进行一次排序,并溢写磁盘,否侧存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件。当所有数据拷贝完毕后,ReduceTask 统一对内存和磁盘上的所有数据进行一次归并排序。
类型
- 部分排序:根据输入记录的键对数据集排序,保证输出的每个文件内部有序。
- 全排序:最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个 ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了 MapReduce 所提供的并行架构。
- 辅助排序:GroupingComparator 分组,在 Reduce 端对 key 进行分组。当接收的 key 为 bean 对象时,比较其中的若干字段进行排序。
- 二次排序:在自定义排序过程中,如果
compareTo
中的判断条件为两个即为二次排序。
Spill(溢写)
当排序完成,便开始把数据刷到磁盘,刷磁盘的过程以分区为单位,一个分区写完,写下一个分区,分区内数据有序,最终实际上会多次溢写,然后生成多个文件。或者如果缓冲区的内存达到了阀值的80%时候,这个守护线程也会把内容写到磁盘上,另外的20%内存可以继续写入要写进磁盘的数据。
Merge(合并)
spill 会生成多个小文件,对于 Reduce 端拉取数据是相当低效的,那么这时候就有了 merge 的过程,合并的过程也是同分片的合并成一个片段(segment),最终所有的 segment 组装成一个最终文件。
Reduce阶段
拉取拷贝(fetch copy)
Reduce 向各个 Map 任务拉取对应数据,这个过程都是以 HTTP 协议完成,每个 Map 节点都会启动一个常驻的 HTTP server 服务,通过其拉取数据。
这个过程完全通过网络传输,是一个非常重量级的操作。
性能优化
性能瓶颈
- 计算机性能:CPU、内存、磁盘、网络
- 数据倾斜
- Map、Reduce 个数设置不合理
- 前置 Task 运行过长,导致后续 Task 长时间等待
- 小文件过多
- 大量的不可分块的超大文件
- Spill 次数过多
- Merge 次数过多
数据输入
框架默认的 TextInputFormat
切片机制对文件进行切片,不管文件多小,都会是一个单独的切片,都会交给一个 MapTask。如果有大量小文件,就会产生大量的 MapTask,导致处理效率极其低下。
- Hadoop Archive:高效地将小文件放入 HDFS 块中的文件存档工具,它能够将多个小文件打包成一个 HAR 文件,这样就减少了 NameNode 的内存使用。
- Sequence File:由一系列的二进制键值对组成,如果 key 为文件名,value 为文件内容,则可以将大批小文件合并成一个大文件。
- CombineFilelnputFormat:用于将多个文件合并成一个单独的切片,会考虑数据的存储位置。
- 开启JVM重用:对于大量小文件 Job,可以减少 45% 运行时间。开启重用后,一个 Task 运行完毕后,JVM 会继续运行其他 Task。设置
mapreduce.job.jvm.numtasks
在 10-20 之间。
Map 阶段
- 减少谥写次数:通过调整
io.sort.mb
及sort.spill.percent
,增大触发溢写的内存上限,减少溢写次数,从而减少磁盘 IO。 - 减少合并次数:通过调整``io.sort.factor`,增大触发合并的文件数目,减少合并的次数,从而缩短处理时间。
- 在 Map 之后,不影响业务逻辑前提下,先进行 Combines 处理,减少I/O。
Reduce 阶段
- 合理设置 Map 和 Reduce 数:两个都不能设置太少,也不能设置太多。太少,会导致 Task 等待,延长处理时间;太多,会导致 Task 间竞争资源,造成处理超时等错误。
- 设置 Map、Reduce 共存:调整
slowstart.completedmaps
,使 Map 运行到一定程度后,Reduce 也开始运行,减少 Reduce 的等待时间。 - 规避使用 Reduce:因为 Reduce 在用于连接数据集的时候将会产生大量的网络消耗。
- 合理设置 Reduce 端的 Buffer:默认情况下,数据达到一个阈值,Buffer 中的数据就会写入磁盘,然后 Reduce 再从磁盘中获得所有的数据。也就是说,Buffer 和 Reduce 是没有直接关联的。设置
mapreduce.reduce.input.buffer.percent
(默认为 0.0),使得 Buffer 中的一部分数据可以直接输送到 Reduce,减少 IO 开销。当值大于 0 的候,会保留指定比例的内存读 Buffer 中的数据直接发送给 Reduce 使用。
I/O 传输
数据压缩
能够有效减少 HDFS 的读写字节数,提高网络带宽和磁盘空间的效率。在运行 MR时,I/O操作、网络数据传输、Shuffle 和 Merge 要花大量的时间,尤其是数据规模很大和工作负载密集的情况下。
可以在任意 MapReduce 阶段启用。
但同时增加了 CPU 运算负担,运用不当也可能降低性能。
基本原则:运算密集型的 Job:少用压缩;I/O 密集型的 Job:多用压缩。
使用
SequenceFile
二进制文件。
序列化
Writable
:Java的序列化是一个重量级序列化框架(Serializable),会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制。
流程
实现 Writable 接口
必须有空参构造函数,因为在反序列化时,需要反射调用空参构造函数
重写序列号方法与反序列化方法,序列化的顺序和反序列化的顺序必须完全一致
1
2
3
4
5
6
7
8
9
10
11
12
13Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
数据倾斜
数据频率倾斜:某一区域的数据量要远远大于其他区域。
数据大小倾斜:部分数据的大小要远远大于平均值。
- 抽样和范围分区:可以通过对原始数据进行抽样得到的结果集来预设分区边界值。
- 自定义分区:基于输出键进行自定义分区。
- Combine 使用:Combine可以大量地减小数据倾斜。在可能的情况下,Combine的目的就是聚合并精简数据。
- 采用Map Join:尽量避免Reduce Join。
容错机制
JobTracker
无容错机制,挂掉之后需要人工介入,重启。
用户可通过配置一些参数,以便 JobTracker 重启后,让作业恢复运行,这样 JobTracker 重启前,会在 histrory log 中记录各个作业的运行状态,以便在重启后,重新提交这些作业,并只对未完成的 Task 进行重新调度。
TaskTracker
当超过参数超过的时间间隔时未向 JobTracker 汇报心跳,则认为它死亡,并将其从调度池中删除;
还可设置一些 TaskTracker host,表示这些节点不允许接入集群,也不会被分配 Task。
Job
当一个作业在某个 Tasktracker 上失败的 Task 个数超过设定值,则该 Tasktracker 被加到该 Job 的黑名单中
Task
每个 MapTask,每个 ReduceTask 都有最大尝试次数
参数设置
mapreduce.map.maxattempts
= 4mapreduce.reduce.maxattemp
= 4mapreduce.task.timeout
= 600000 ms
Record
可设置跳过坏记录的条数。
磁盘
用户可配置多个磁盘目录,将 MapTask 中间结果分到不同的磁盘上,增强容错性。
FileSystem.Cache
某种程度上相当于搞了个全局变量。
如果不使用 cache,每个 FileSystem 的实例都会建立一个到 NamenNde 的连接,而在大数据计算的场景下,MapReduce/Spark 要对 HDFS 进行大量并发的读,不做缓存会造成超量的连接打到 NameNode 上,造成 DDOS 的效果。
但有了 cache 就意味着同一个文件系统的实例被同时使用,如果使用用 try,或者用完之后主动 close,就会导致这个文件系统实例的其它使用方意外出错。因此不要主动 close, FileSystem 有一个 shutdown 的 hook 会负责最后关掉这些 FileSystem,以防止泄漏。
又因为缓存的粒度是文件系统级别的,一般的应用文件系统的个数不应该太多,所以不会是个大问题。