Spark
Spark 延续了MapReduce 的设计思路:对数据的计算也分为Map 和Reduce 两类。但不同的是,一个 Spark 任务并不止包含一个 Map 和一个R educe,而是由一系列的 Map、Reduce 构成。这样,计算的中间结果可以高效地转给下一个计算步骤,提高算法性能。虽然 Spark 的改进看似很小,但实验结果显示,它的算法性能相比 MapReduce 提高了10~100 倍。
与 MR 异同
Spark
集流批处理、交互式查询、机器学习及图计算等于一体
基于内存迭代式计算,适合低延迟、迭代运算类型作业
可以通过缓存共享 rdd、DataFrame 提升效率,尤其是SparkSQL可以将数据以列式的形式存储于内存中
中间结果支持 checkpoint ,遇错可快速恢复
支持 DAG,map 之间以 pipeline 方式运行,无需磁盘 IO
多线程模型,每个 worker 节点运行一个或多个 executor 服务,每个 task 作为线程运行在 executor 中,task 间可通过内存共享资源
Spark 编程模型更灵活,支持多种语言如 java、scala、python、R,并支持丰富的 transformation 和 action 的算子
MapReduce
适合离线数据处理,不适合迭代计算、交互式处理、流式处理
中间结果需要落地,需要大量的磁盘 IO 和网络 IO 影响性能
虽然 MapReduce 中间结果可以存储于 HDFS,利用 HDFS 缓存功能,但相对 Spark 缓存功能较低效
多进程模型,任务调度(频繁申请、释放资源)和启动开销大,不适合低延迟类型作业
编程不够灵活,仅支持 map 和 reduce 两种操作。
架构
Spark 集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调度的功能。
Master
一个主要负责资源的调度和分配的进程,并进行集群的监控等职责,类似于 Yarn 环境中的 RM。
Worker
一个运行在集群中的一台服务器上的进程,由 Master 分配资源对数据进行并行的处理和计算,类似于 Yarn 环境中 NM。
Driver
Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。
- 将用户程序转化为 Job
- 在 Executor 之间调度 Task
- 跟踪 Executor 的执行情况
- 通过 UI 展示查询运行情况
Executor
集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark Job 中运行具体的 Task,Task 彼此之间相互独立。
Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。
通过自身的块管理器(Block Manager)为 RDD 提供内存式存储,RDD 是直接缓存在 Executor 内的,因此任务可以在运行时充分利用缓存数据加速运算。
参数设置
--num-executors
流程
- Task 提交后,都会先启动 Driver;
- Driver 向集群管理器注册应用程序;
- 集群管理器根据此任务的配置文件分配 Executor 并启动;
- Driver 开始执行 main 函数,Spark 查询为懒执行,当执行到 Action 算子时开始反向推算,根据宽依赖进行 Stage 的划分,每一个 Stage 对应一个 TaskSet,TaskSet 中有多个 Task,查找可用资源 Executor 进行调度;
- 根据本地化原则,Task 会被分发到指定的 Executor 去执行,在任务执行的过程中,Executor 也会不断与 Driver 进行通信,报告任务运行情况。
RDD
RDD(Resilient Distributed Datasets),弹性分布式数据集。传统的 MapReduce 虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是在迭代计算式的时候,要进行大量的磁盘 IO 操作,而 RDD 正是解决这一缺点的抽象方法。
RDD 内部的数据集合在逻辑上和物理上被划分成多个小子集合,这样的每一个子集合我们将其称为分区(Partitions),分区的个数会决定并行计算的粒度,即并行任务的个数。
RDD 只是数据集的抽象,分区内部并不会存储具体的数据。Partition 类内包含一个 index 成员,表示该分区在 RDD 内的索引,通过索引 + 分区号可以确定唯一数据块,再利用底层数据存储层提供的接口就能从存储介质(如:HDFS、Memory)中提取出分区对应的数据。
特性
- 弹性
- 存储的弹性:内存与磁盘的自动切换;
- 容错的弹性:数据丢失可以自动恢复;
- 计算的弹性:计算出错重试机制;
- 分片的弹性:可根据需要重新分片。
- 分布式:数据存储在大数据集群不同节点上
- 数据集:RDD 封装了计算逻辑,并不保存数据
- 数据抽象:RDD 是一个抽象类,需要子类具体实现
- 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑
- 可分区、并行计算
序列化
闭包检查
从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化。
Kryo
速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经有 Kryo 序列化。
注意:即使使用 Kryo 序列化,也要继承 Serializable 接口。
依赖关系
- 两个相邻 RDD 之间的关系
血缘关系
RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。
将创建 RDD 的一系列 Lineage (血统)记录下来,以便恢复丢失的分区。Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
窄依赖
表示每一个父(上游) RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用。
- 独生子女
宽依赖
表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle。
- 多生
持久化
Cache
- 通过 cache 或 persist 方法将前面的计算结果缓存
默认情况下会把数据以缓存的形式存储在 JVM 的堆内存中。缓存有可能丢失,或者由于内存不足而被删除,但 RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。
由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可, 并不需要重算全部 Partition。
Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作,比如:reduceByKey。这样做是为了当一个节点 Shuffle 失败时避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用 persist 或 cache。
遵循懒加载原则,cache 操作并不会马上被执行,必须执行 Action 操作才能触发。
CheckPoint
- 将 RDD 中间结果写入磁盘
由于血缘依赖过长会造成容错成本过高,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
遵循懒加载原则,checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。
区别
- Cache 只是将数据保存起来,不切断血缘依赖;Checkpoint 切断血缘依赖。
- Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低;Checkpoint 的数据通常存储在 HDFS 等容错、高可用的文件系统,可靠性高。
建议对 checkpoint 的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据,否则需要再从头计算一次。
算子
mapPartition
对 rdd 中的整个分区的迭代器进行操作。
与 map 的比较
map 是对 rdd 中的每一个元素进行操作。
如果在这一过程中需要频繁创建额外的对象,则性能更高,例如将 rdd 中的数据通过 jdbc 写入数据库,map 需要为每个元素创建一个链接而 mapPartition 只需要为每个分区创建一个链接。
如果分区中数据量较大,容易导致内存不足。
distinct
将数据集中重复的数据去重。
替代实现
1 | map(x => (x, null)).reduceByKey((x, *) => x, numPartitions).map(*._1) |
reduceByKey
可以将 rdd 按照相同的 Key 对 Value 进行聚合。
与 groupByKey 的比较
- 从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会写入磁盘数据量;而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
- 从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合。
与其他聚合函数的比较
- reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同。
- foldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同。
- aggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同。
- combineByKey:当计算过程中发现数据结构不满足要求时,可以让第一个数据转换结构,分区内和分区间计算规则不相同。
累加器
- 用来把 Executor 端变量信息聚合到 Driver 端。
在 Driver 程序中定义变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本。
每个 task 更新这些副本的值后, 传回 Driver 端进行 merge 操作。
广播变量
- 用来高效分发较大的对象。
向所有工作节点发送一个较大的只读变量,以供一个或多个 Spark 操作使用。
如果在多个并行操作中使用同一个变量,Spark 会为每个任务分别发送。
任务调度
总体来说分两路:Stage 级与 Task 级。
- Job 以 Action 方法为界,遇到一个 Action 方法则触发一个 Job;
- Stage 是 Job 的子集,以 RDD 宽依赖(即 Shuffle)为界,遇到 Shuffle 做一次划分;
- Task 是 Stage 的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个 task。
架构
DAGScheduler
负责 Stage 级的调度,主要是将 job 切分成若干 Stages,并将每个 Stage 打包成 TaskSet 交给 TaskScheduler 调度。
TaskScheduler
负责 Task 级的调度,将 DAGScheduler 给过来的 TaskSet 按照指定的调度策略分发到 Executor 上执行,调度过程中 SchedulerBackend 负责提供可用资源。
监控 Stage 的运行状态,只有 Executor 丢失或者 Task 由于 Fetch 失败才需要重新提交失败的 Stage 以调度运行失败的任务,其他类型的 Task 失败会在 TaskScheduler 的调度过程中重试。
SchedulerBackend
通过 ApplicationMaster 申请资源
接收 Executor 的注册信息,并维护 Executor 的状态
不断从 TaskScheduler 中拿到合适的 Task 分发到 Executor 执行。
HeartbeatReceiver
负责接收 Executor 的心跳信息,监控 Executor 的存活状况,并通知到 TaskScheduler。
TaskSetManager
负责监控管理同一个 Stage 中的 Task,TaskScheduler 以 TaskSetManager 为单元来调度任务。
Stage 级调度
从 DAG 切割开始,主要是由 DAGScheduler 来完成。当遇到一个 Action 操作后就会触发一个 Job 的计算,并交给 DAGScheduler 来提交。
DAGScheduler 会根据 DAG 进行切分,将一个 Job 划分为若干 Stages,具体划分策略是,由最终的 RDD 不断通过依赖回溯判断父依赖是否是宽依赖,即以 Shuffle 为界,划分 Stage,窄依赖的 RDD 会被划分到同一个 Stage 中。
划分的 Stages 分两类,一类叫做 ResultStage,为 DAG 最下游的 Stage,由 Action 方法决定;另一类叫做 ShuffleMapStage,为下游 Stage 准备数据。
一个 Stage 是否被提交,需要判断它的父 Stage 是否执行,只有其父 Stage 执行完毕才能提交当前 Stage。如果一个 Stage 没有父 Stage,那么从该 Stage 开始提交。
Stage 提交时会将 Task 信息(分区信息以及方法等)序列化并打包成 TaskSet 交给 TaskScheduler。
Task 级调度
DAGScheduler 将 Stage 打包到交给 TaskScheduler 后,TaskScheduler 会将 TaskSet 封装为 TaskSetManager 加入到调度队列中,TaskScheduler 以 TaskSetManager 为单元来调度任务,TaskScheduler 会从调度队列中按照指定的调度策略选择 TaskSetManager 去调度运行。
调度策略
FIFO
FAIR
有一个 rootPool 和多个子 Pool,各个子 Pool 中存储着所有待分配的 TaskSetMagager,需要先对子 Pool 进行排序,再对子 Pool 里面的 TaskSetMagager 进行排序,因为 Pool 和 TaskSetMagager 都继承了 Schedulable 特质,因此使用相同的排序算法。
基于 Fair-share 比较,每个要排序的对象包含三个属性: runningTasks
(正在运行的 Task 数)、minShare
、weight
参数设置
fairscheduler.xml
配置文件
比较过程
- runningTasks 比 minShare 小的先执行
- minShare 使用率低的先执行
- 权重使用率(runningTasks 与 weight 的比值)低的先执行
- 名字
整体上来说就是通过 minShare 和 weight 这两个参数控制比较过程,可以做到让 minShare 使用率和权重使用率少(实际运行 task 比例较少)的先运行。
排序完成后,所有的 TaskSetManager 被放入一个 ArrayBuffer 里,之后依次被取出并发送给 Executor 执行。
失败重试
Task 被提交到 Executor 启动执行后,Executor 会将执行状态上报给 SchedulerBackend,SchedulerBackend 则告诉 TaskScheduler,TaskScheduler 找到该 Task 对应的 TaskSetManager,并通知到该 TaskSetManager,这样 TaskSetManager 就知道 Task 的失败与成功状态。
对于失败的 Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的 Task 池子中,否则整个 Application 失败。
黑名单
在记录 Task 失败次数过程中,会记录它上一次失败所在的 ExecutorId 和 Host,这样下次再调度这个 Task 时,会避免其被调度到上一次失败的节点上,起到一定的容错作用。黑名单记录还包含了其对应的“拉黑”时间,及这段时间内不再往这个节点上调度该 Task。
数据倾斜
数据倾斜就是我们在计算数据的时候,数据的分散度不够,导致大量的数据集中到了一台或者几台机器上计算,这些数据的计算速度远远低于平均计算速度,导致整个计算过程过慢。
- key 分布不均匀
- 业务数据本身的特性
- 建表时考虑不周
- 某些 SQL 语句本身就有数据倾斜
优化
增加jvm(Java Virtual Machine:Java虚拟机)内存,这适用于变量值非常少的情况,这种情况下,往往只能通过硬件的手段来进行调优,增加jvm内存可以显著的提高运行效率;
增加reduce的个数,这适用于变量值非常多的情况,这种情况下最容易造成的结果就是大量相同key被partition到一个分区,从而一个reduce执行了大量的工作;
重新设计key,有一种方案是在map阶段时给key加上一个随机数,有了随机数的key就不会被大量的分配到同一节点(小几率),待到reduce后再把随机数去掉即可;
使用combiner合并。combinner是在map阶段,reduce之前的一个中间阶段,在这个阶段可以选择性的把大量的相同key数据先进行一个合并,可以看做是local reduce,然后再交给reduce来处理,减轻了map端向reduce端发送的数据量(减轻了网络带宽),也减轻了map端和reduce端中间的shuffle阶段的数据拉取数量(本地化磁盘IO速率);(hive.map.aggr=true)
设置合理的map reduce的task数,能有效提升性能。(比如,10w+级别的计算,用160个reduce,那是相当的浪费,1个足够);
数据量较大的情况下,**慎用count(distinct)**,count(distinct)容易产生倾斜问题;
hive.groupby.skewindata=true,有数据倾斜的时候进行负载均衡,当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
留存计算
用户在某段时间内开始使用应用,经过一段时间后,仍然继续使用该应用的用户,被认作是留存用户,这部分用户占当时新增用户的比例即是留存率,会按照每隔1单位时间(例日、周、月)来进行统计。
留存指的就是“有多少用户留下来了”。留存用户和留存率体现了应用的质量和保留用户的能力。
N日留存率=某日新增且过N日还登录的用户数 / 某日新增的用户数*100%
1 | select |
透视(Pviot)
1 | df.groupBy("year") |
1 | SELECT |
逆透视
1 | unpivot_df = df_pivot.selectExpr("`年月`", |
一行拆分多行
1 | df.withColumn("name", explode(split($"name", "[ ]"))) |
分组聚合
reduceByKey,合并具有相同键的值
groupByKey,对具有相同键的值进行分组
combineByKey,使用不同的返回类型合并具有相同键的值
1 | combineByKey( |
- aggregation,与reduce()相似,但输入值和返回值的类型可以不一致
1 | aggregate(zeroValue) |
排序
- sortByKey
- sortBy
1 | def sortBy[K]( |
- 自定义类
- 继承Ordered类,重写compare方法
- 序列化
- 重写toString(可选,主要为了可以展示数据)
- implicit
1 | implicit val sortIntegersByString = new Ordering[Int] { |
窗口函数
ROW ,以行的方式进行偏移,然后确定边界范围。
RANGE ,以逻辑的方式进行偏移,然后再确定边界范围。
1 | <窗口函数> over (partition by <用于分组的列名> |
- 排名函数(ranking function) 包括rank,dense_rank, row_number,percent_rank, ntile等,后面我们结合例子来看。
- 分析函数 (analytic functions) 包括cume_dist,lag等。
- 聚合函数(aggregate functions),就是我们常用的max, min, sum, avg等。
UDF
UDF:User-Defined-Function,用户自定义函数,数据是一进一出,功能类似于大多数数学函数或者字符串处理函数;
UDAF:User-Defined Aggregation Function,用户自定义聚合函数,数据是多进一出,功能类似于 count/max/min;
UDTF:User-Defined Table-Generating Functions,用户自定义表生成函数,数据是一进多出,功能类似于lateral view explore();
匿名注册
1 | spark.udf.register("prefixName", (name:String)=> { |
实名注册
定义时要在后面加“ _”(注意前面有个空格) 。
1 | def isAdult(age: Int) = { |
临时UDF
在org.apache.spark.sql.execution.command.CreateFunctionCommand类的run方法中,会判断创建的Function是否是临时方法,若是,则会创建一个临时Function。
临时函数直接注册到functionRegistry(实现类是SimpleFunctionRegistry),即内存中。
1 | def createTempFunction( |
持久化UDF
在org.apache.spark.sql.execution.command.CreateFunctionCommand中,会调用SessionCatalog的createFunction,最终执行了HiveExternalCatalog的createFunction。
创建永久函数会在Hive元数据库中创建相应的函数。
在解析SQL中的UDF时,会调用SessionCatalog的lookupFunction0方法,在此方法中,首先会检查内存中是否存在,如果不存在则会加载此UDF,加载时会把RESOURCE_URI发到ClassLoader的路径中,如果把UDF注册到内存的functionRegistry中。
1 | def lookupFunction( |