Spark 在 MapReduce 的基础上进行了改进,它主要使用内存进行中间计算数据存储,加快了计算执行时间,在某些情况下性能可以提升百倍 。
除了速度更快,Spark 和 MapReduce 相比,还有更简单易用的编程模型 。
Spark 的主要编程模型是 RDD,即弹性数据集 。在 RDD 上定义了许多常见的大数据计算函数,利用这些函数可以用极少的代码完成较为复杂的大数据计算 。
例如我们在介绍 Hive 架构设计时谈到的 WordCount 示例 。 使用 Scala 语言在 Spark 上编写 ,代码只需三行 。
- val textFile = sc.textFile("hdfs://...")
- val counts = textFile.flatMap(line => line.split(" "))
- .map(word => (word, 1))
- .reduceByKey(_ + _)
- counts.saveAsTextFile("hdfs://...")
这个 demo 的代码含义就不展开详细介绍 。首选,从 HDFS 读取数据,构建一个 RDD textFile,然后在这个 RDD 上执行三个操作:一是将输入数据的每一行文本用空格拆分单词;二是将单词进行转换,比如:word ——> (word,1),生成 < Key , Value > 的结构;三是针对相同的 Key 进行统计,统计方式是对 Value 求和 。最后,将 RDD counts 写入 HDFS ,完成结果输出 。
Spark 编程模型RDD 是 Spark 的核心概念,是弹性数据集(Resilient Distributed Datasets)的缩写。RDD 既是 Spark 面向开发者的编程模型,又是 Spark 自身架构的核心元素。
我们先来认识一下作为 Spark 编程模型的RDD 。我们知道,大数据计算就是在大规模的数据集上进行一系列的数据计算处理。MapReduce 针对输入数据,将计算过程分为两个阶段,一个 Map 阶段,一个 Reduce 阶段,可以理解成是面向过程的大数据计算。我们在用 MapReduce 编程的时候,思考的是,如何将计算逻辑用 Map 和 Reduce 两个阶段实现,map 和 reduce 函数的输入和输出是什么,这也是我们在学习 MapReduce 编程的时候一再强调的。大数据培训
而 Spark 则直接针对数据进行编程,将大规模数据集合抽象成一个 RDD 对象,然后在这个 RDD 上进行各种计算处理,得到一个新的 RDD,继续计算处理,直到得到最后的结果数据。所以 Spark 可以理解成是面向对象的大数据计算。我们在进行 Spark 编程的时候,思考的是一个 RDD 对象需要经过什么样的操作,转换成另一个 RDD 对象,思考的重心和落脚点都在 RDD 上。
所以在上面 WordCount 的代码示例里,第 2 行代码实际上进行了 3 次 RDD 转换,每次转换都得到一个新的 RDD,因为新的 RDD 可以继续调用 RDD 的转换函数,所以连续写成一行代码。事实上,可以分成 3 行
Spark 架构核心
- val rdd1 = textFile.flatMap(line => line.split(" "))
- val rdd2 = rdd1.map(word => (word, 1))
- val rdd3 = rdd2.reduceByKey(_ + _)
RDD 上定义的函数分两种,一种是转换(transformation) 函数,这种函数的返回值还是 RDD;另一种是 执行(action) 函数,这种函数不再返回 RDD。
RDD 定义了很多转换操作函数,比如有计算 map(func)、过滤 filter(func)、合并数据集 union(otherDataset)、根据 Key 聚合 reduceByKey(func, [numPartitions])、连接数据集 join(otherDataset, [numPartitions])、分组 groupByKey([numPartitions]) 等十几个函数。
作为 Spark 架构核心元素的 RDD。跟 MapReduce 一样,Spark 也是对大数据进行分片计算,Spark 分布式计算的数据分片、任务调度都是以 RDD 为单位展开的,每个 RDD 分片都会分配到一个执行进程去处理。
RDD 上的转换操作又分成两种,一种转换操作产生的 RDD 不会出现新的分片,比如 map、filter 等,也就是说一个 RDD 数据分片,经过 map 或者 filter 转换操作后,结果还在当前分片。就像你用 map 函数对每个数据加 1,得到的还是这样一组数据,只是值不同。实际上,Spark 并不是按照代码写的操作顺序去生成 RDD,比如 rdd2 = rdd1.map(func) 这样的代码并不会在物理上生成一个新的 RDD。物理上,Spark 只有在产生新的 RDD 分片时候,才会真的生成一个 RDD,Spark 的这种特性也被称作 惰性计算。
另一种转换操作产生的 RDD 则会产生新的分片,比如 reduceByKey,来自不同分片的相同 Key 必须聚合在一起进行操作,这样就会产生新的 RDD 分片。
所以,大家只需要记住,Spark 应用程序代码中的 RDD 和 Spark 执行过程中生成的物理 RDD 不是一一对应的,RDD 在 Spark 里面是一个非常灵活的概念,同时又非常重要,需要认真理解。
Spark 的计算阶段和 MapReduce 一样,Spark 也遵循移动计算比移动数据更划算 这一大数据计算基本原则。但是和 MapReduce 僵化的 Map 与 Reduce 分阶段计算相比,Spark 的计算框架更加富有弹性和灵活性,进而有更好的运行性能 。
Spark 会根据程序中的转换函数生成计算任务执行计划,这个执行计划就是一个 DAG 。Spark 可以在一个作业中完成非常复杂的大数据计算 。
所谓 DAG 也就是 有向无环图,就是说不同阶段的依赖关系是有向的,计算过程只能沿着依赖关系方向执行,被依赖的阶段执行完成之前,依赖的阶段不能开始执行,同时,这个依赖关系不能有环形依赖,否则就成为死循环了。下面这张图描述了一个典型的 Spark 运行 DAG 的不同阶段。
在上面的图中, A、C、E 是从 HDFS 上加载的 RDD,A 经过 groupBy 分组统计转换函数计算后得到的 RDD B,C 经过 map 转换函数计算后得到 RDD D,D 和 E 经过 union 合并转换函数计算后得到 RDD F ,B 和 F 经过 join 连接函数计算后得到最终的合并结果 RDD G 。
所以可以看到 Spark 作业调度执行的核心是 DAG,有了 DAG,整个应用就被切分成哪些阶段,每个阶段的依赖关系也就清楚了。之后再根据每个阶段要处理的数据量生成相应的任务集合(TaskSet),每个任务都分配一个任务进程去处理,Spark 就实现了大数据的分布式计算。
具体来看的话,负责 Spark 应用 DAG 生成和管理的组件是 DAGScheduler,DAGScheduler 根据程序代码生成 DAG,然后将程序分发到分布式计算集群,按计算阶段的先后关系调度执行。
大家注意到了么,上面的例子有 4 个转换函数,但是只有 3 个阶段 。那么 Spark 划分计算阶段的依据具体是什么呢?显然并不是 RDD 上的每个转换函数都会生成一个计算阶段 。
通过观察一下上面的 DAG 图,关于计算阶段的划分从图上就能看出规律,当 RDD 之间的转换连接线呈现多对多交叉连接的时候,就会产生新的阶段。一个 RDD 代表一个数据集,图中每个 RDD 里面都包含多个小块,每个小块代表 RDD 的一个分片。
一个数据集中的多个数据分片需要进行分区传输,写入到另一个数据集的不同分片中,这种数据分区交叉传输的操作,我们在 MapReduce 的运行过程中也看到过。
这就是 shuffle 过程,Spark 也需要通过 shuffle 将数据进行重新组合,相同 Key 的数据放在一起,进行聚合、关联等操作,因而每次 shuffle 都产生新的计算阶段。这也是为什么计算阶段会有依赖关系,它需要的数据来源于前面一个或多个计算阶段产生的数据,必须等待前面的阶段执行完毕才能进行 shuffle,并得到数据。
所以大家需要记住,计算阶段划分的依据是 shuffle,不是转换函数的类型 。
思考大家可能会想,为什么同样经过 shuffle ,Spark 可以更高效 ?
从本质上看,Spark 可以算作是一种 MapReduce 计算模型的不同实现。Hadoop MapReduce 简单粗暴地根据 shuffle 将大数据计算分成 Map 和 Reduce 两个阶段,然后就算完事了。而 Spark 更细腻一点,将前一个的 Reduce 和后一个的 Map 连接起来,当作一个阶段持续计算,形成一个更加优雅、高效的计算模型,虽然其本质依然是 Map 和 Reduce。但是这种多个计算阶段依赖执行的方案可以有效减少对 HDFS 的访问,减少作业的调度执行次数,因此执行速度也更快。
并且和 Hadoop MapReduce 主要使用磁盘存储 shuffle 过程中的数据不同,Spark 优先使用内存进行数据存储,包括 RDD 数据。除非是内存不够用了,否则是尽可能使用内存, 这也是 Spark 性能比 Hadoop 高的另一个原因。
Spark 支持 Standalone、Yarn、Mesos、Kubernetes 等多种部署方案,几种部署方案原理也都一样,只是不同组件角色命名不同,但是核心功能和运行流程都差不多。
首先,Spark 应用程序启动在自己的 JVM 进程里,即 Driver 进程,启动后调用 SparkContext 初始化执行配置和输入数据。SparkContext 启动 DAGScheduler 构造执行的 DAG 图,切分成最小的执行单位也就是计算任务。
然后 Driver 向 Cluster Manager 请求计算资源,用于 DAG 的分布式计算。Cluster Manager 收到请求以后,将 Driver 的主机地址等信息通知给集群的所有计算节点 Worker。
Worker 收到信息以后,根据 Driver 的主机地址,跟 Driver 通信并注册,然后根据自己的空闲资源向 Driver 通报自己可以领用的任务数。Driver 根据 DAG 图开始向注册的 Worker 分配任务。
Worker 收到任务后,启动 Executor 进程开始执行任务。Executor 先检查自己是否有 Driver 的执行代码,如果没有,从 Driver 下载执行代码,通过 Java 反射加载后开始执行。
Spark性能调优与故障处理关于 Spark 的性能调优,就有很多可以值得探讨的地方。 我们一般能快速想到的是常规的性能调优,包括最优的资源配置,RDD优化,并行度调节等等,除此之外,还有算子调优,Shuffle 调优,JVM 调优 。而关于故障处理,我们一般讨论的是解决 Spark 数据倾斜 的问题,我们一般会通过聚合原数据,过滤导致倾斜的 key,提升shuffle 操作过程中的 reduce 并行度等方式 。因为本篇文章主要介绍架构设计和原理思想,基于篇幅限制,详细步骤就不展示详细描述。正好最近收集了一本 Spark性能调优与故障处理 的 pdf ,里面对于详解的步骤均做了详细的说明 。
Spark 生态最后,我们来看看 Spark 的生态!
跟我们之前介绍的 Hadoop 一样,Spark 也有他自己的生态体系 。以 Spark 为基础,有支持 SQL 语句的 Spark SQL,有支持流计算的 Spark Streaming,有支持机器学习的 MLlib,还有支持图计算的 GraphX。利用这些产品,Spark 技术栈支撑起大数据分析、大数据机器学习等各种大数据应用场景。
为了方便大家了解,下面对这些组件进行一一介绍:
Spark SQL:用来操作结构化数据的核心组件,通过Spark SQL可以直接查询Hive、 Hbase等多种外部数据源中的数据。Spark SQL的重要特点是能够统一处理关系表和RDD在处理结构化数据时,开发人员无须编写 MapReduce程序,直接使用SQL命令就能完成更加复杂的数据查询操作。
Spark Streaming:Spark提供的流式计算框架,支持高吞吐量、可容错处理的实时流式数据处理,其核心原理是将流数据分解成一系列短小的批处理作业,每个短小的批处理作业都可以使用 Spark Core进行快速处理。Spark Streaming支持多种数据源,如 Kafka以及TCP套接字等。
MLlib:Spark提供的关于机器学习功能的算法程序库,包括分类、回归、聚类、协同过滤算法等,还提供了模型评估、数据导入等额外的功能,开发人员只需了解一定的机器学习算法知识就能进行机器学习方面的开发,降低了学习成本。
GraphX: Spark提供的分布式图处理框架,拥有图计算和图挖掘算法的API接口以及丰富的功能和运算符,极大地方便了对分布式图的处理需求,能在海量数据上运行复杂的图算法。
Spark生态系统各个组件关系密切,并且可以相互调用,这样设计具有以下显著优势。
(1)Spark生态系统包含的所有程序库和高级组件都可以从 Spark核心引擎的改进中获益。
(2)不需要运行多套独立的软件系统,能够大大减少运行整个系统的资源代价。
(3)能够无缝整合各个系统,构建不同处理模型的应用。
总结Spark 有三个主要特性:RDD 的编程模型更简单,DAG 切分的多阶段计算过程更快速,使用内存存储中间计算结果更高效。这三个特性使得 Spark 相对 Hadoop MapReduce 可以有更快的执行速度,以及更简单的编程实现。
另外,从 Spark 的生态我们可以看出,Spark 框架对大数据的支持从内存计算、实时处理到交互式查询,进而发展到图计算和机器学习模块。Spark 生态系统广泛的技术面,一方面挑战占据大数据市场份额最大的 Hadoop,另一方面又随时准备迎接后起之秀 Flink 、Kafka 等计算框架的挑战,从而使Spark 在大数据领域更好地发展 !