Spark的物理执行图是由调度器根据目标RDD的Lineage图创建的一个由stage构成的DAG (Directed acyclic graph,有向无环图)。

DAG生成

DAG是由stage构成的,每个stage内部尽可能多的包含一组具有窄依赖关系的转换,可以最大限度的流水线并行化(pipeline),不同stage在shuffle依赖处划分,可以保证结果的正确性,此外由于已缓存分区可以缩短父RDD的计算过程也可以作为stage的划分点。Spark以此作为DAG生成的依据。具体而言,Spark中DAG生成的算法如下:从后往前推算,遇到ShuffleDependency就断开,遇到NarrowDependency就将其加入该stage。每个stage里面task的数目由该stage最后一个RDD中的partition个数决定。Stage中的task分为两类:ResultTask和ShuffleMapTask,对于要产生结果的task称为ResultTask,否则都称为ShuffleMapTask。

分析一个复杂数据处理的例子,代码如下:

val data1 = Array[(Int, Char)](
    (1, 'a'), (2, 'b'),
    (3, 'c'), (4, 'd'),
    (5, 'e'), (3, 'f'),
    (2, 'g'), (1, 'h'))
val rangePairs1 = sc.parallelize(data1, 3)

val hashPairs1 = rangePairs1.partitionBy(new HashPartitioner(3))

val data2 = Array[(Int, String)]((1, "A"), (2, "B"),(3, "C"), (4, "D"))

val pairs2 = sc.parallelize(data2, 2)
val rangePairs2 = pairs2.map(x => (x._1, x._2.charAt(0)))

val data3 = Array[(Int, Char)]((1, 'X'), (2, 'Y'))
val rangePairs3 = sc.parallelize(data3, 2)

val rangePairs = rangePairs2.union(rangePairs3)
val result = hashPairs1.join(rangePairs)

该例子的DAG如下图所示:

3.1

该例子共生成了三个stage,其中两个shuffleStage一个resultStage,stage 0 为resultStage,向前推演过程中遇到partitionBy操作和join操作为ShuffleDependency因此断开得到stage 1 和stage 2,在stage内部均为NarrowDependency,因此共得到三个stage。

DAG执行

通过上文的分析,我们得到了一个数据处理应用在Spark中的执行图DAG,本节将介绍如何执行该DAG得到最终的结果。

Spark中的计算流程符合pipeline的思想,就是数据用的时候再算,而且数据流到要计算的位置,为了确定要计算的数据,需要从后往前推,需要哪个partition就计算哪个partition,这样就得到了一个computing chain,结果就是需要首先计算出stage最左边RDD的某些partitions。对于没有parent stage的stage,该stage最左边的RDD是可以立即计算的,而且每计算一条记录就流完整个computing chain 的计算,直到不能流动为止,然后再计算下一条记录,这样实现了每条记录的流动,不要求整个partition内的所有records计算后才整体流动。对于有parent stage的stage,先等所有parent stage的final RDD 中数据计算完成并shuffle后,问题就和没有parent stage 的stage一样了。

当要自己实现RDD时,为了满足Spark计算逻辑需要注意如下几点:每个 RDD 包含的 getDependency() 负责确立 RDD 的数据依赖;具体依赖 parent RDD 中的哪些 partitions 由 dependency.getParents();compute() 方法负责接收 parent RDDs 或者 data block 流入的 records,进行计算,然后输出 record。经常可以在 RDD 中看到这样的代码firstParent[T].iterator(split, context).map(f)。firstParent 表示该 RDD 依赖的第一个 parent RDD,iterator() 表示 parentRDD 中的 records 是一个一个流入该 RDD 的,map(f) 表示每流入一个 recod 就对其进行 f(record) 操作,输出 record。为了统一接口,这段 compute() 仍然返回一个 iterator,来迭代 map(f) 输出的 records。

DAG执行触发

前面介绍了DAG的生成和DAG执行时的操作,但如何触发DAG的执行呢?答案是RDD的动作类操作,当对RDD调用该类操作时,会调用sc.runJob(this, func: Iterator[T] => U),这会向Spark提交一个Job,其实driver中有多少个action()就会对应多少个Job。对于一个Job的提交与执行,大致经历了以下步骤:

  1. sc.runJob() 会调用DAGScheduler.runJob()
  2. DAGScheduler.runJob()会调用submitJob()
  3. submitJob()会首先得到一个JobId,然后会发送一个JobSubmitted的消息给DAGSchedulerEventProcessLoop
  4. 当收到JobSubmitted消息时,会调用handleJobSubmitted,该处会调用newResultStage来生成DAG的stage,在此会调用getParentStages方法,遇到 NarrowDependency 就将依赖的 RDD 加入到 stage,遇到 ShuffleDependency 切开 stage,并递归到 ShuffleDepedency 依赖的 stage
  5. handleJobSubmitted最后会调用submitStage提交stage,submitStage中会调用taskScheduler.submitTasks(taskSet) 进而执行task

小结

DAG是Spark中任务的物理执行图,由stage组成,stage内任务流水线并行化执行保证效率,stage间先后依次执行保证结果正确。本章通过对DAG的生成逻辑、执行时的操作以及用户的driver程序如何触发DAG的执行等进行分析,了解了Spark在该部分的实现逻辑。

参考

Spark code

Spark internal