RDD(Resilient Distributed Datasets,弹性分布式数据集)[1]是一种分布式内存的抽象概念,它满足容错性要求,允许开发人员在大型集群上执行基于内存的计算

RDD抽象

RDD是只读的、分区纪录的集合,只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建。RDD不需要物化,RDD含有如何从其他RDD衍生(即计算)出本RDD的相关信息(即Lineage),据此可以从物理存储的数据计算出相应的RDD分区,也可以用来恢复丢失的分区实现容错支持。

RDD实现

Spark中的RDD是一些Scala的类,为了保存RDD的转换Lineage信息,每个RDD都要包含以下信息: (1)一组RDD分区(partition,即数据集的原子组成部分);(2)对父RDD的一组依赖,这些依赖描述了RDD的Lineage;(3)一个函数,即在父RDD上执行何种计算;(4)元数据,描述分区模式和数据存放的位置。下表中给出了Spark中RDD的内部接口,实现这些接口即可实现一个RDD类:

方法 含义
partitions() 返回一组Partition对象
preferredLocations(p) 根据数据存放的位置,返回分区p在哪些节点访问更快
dependencies() 返回一组依赖
iterator(p, parentIters) 按照父分区的迭代器,逐个计算分区p的元素
partitioner() 返回RDD是否hash/range分区的元数据信息

以UnionRDD为例,UnionRDD由union操作得到,将两个RDD合并在一起,源码如下:

private[spark] class UnionPartition[T: ClassTag](
    idx: Int,
    @transient private val rdd: RDD[T],
    val parentRddIndex: Int,
    @transient private val parentRddPartitionIndex: Int)
  extends Partition {

  var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex)

  def preferredLocations(): Seq[String] = rdd.preferredLocations(parentPartition)
  override val index: Int = idx
  @throws(classOf[IOException])
  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
    // Update the reference to parent split at the time of task serialization
    parentPartition = rdd.partitions(parentRddPartitionIndex)
    oos.defaultWriteObject()
  }
}

@DeveloperApi
class UnionRDD[T: ClassTag](
    sc: SparkContext,
    var rdds: Seq[RDD[T]])
  extends RDD[T](sc, Nil) {  // Nil since we implement getDependencies

  override def getPartitions: Array[Partition] = {
    val array = new Array[Partition](rdds.map(_.partitions.length).sum)
    var pos = 0
    for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
      array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
      pos += 1
    }
    array
  }

  override def getDependencies: Seq[Dependency[_]] = {
    val deps = new ArrayBuffer[Dependency[_]]
    var pos = 0
    for (rdd <- rdds) {
      deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length)
      pos += rdd.partitions.length
    }
    deps
  }

  override def compute(s: Partition, context: TaskContext): Iterator[T] = {
    val part = s.asInstanceOf[UnionPartition[T]]
    parent[T](part.parentRddIndex).iterator(part.parentPartition, context)
  }

  override def getPreferredLocations(s: Partition): Seq[String] =
    s.asInstanceOf[UnionPartition[T]].preferredLocations()

  override def clearDependencies() {
    super.clearDependencies()
    rdds = null
  }
}

分析源码:

  • getPartitions():返回一组UnionPartition对象,由要合并的两个RDD的分区构造得来
  • getPreferredLocations:返回分区的最优访问位置
  • getDependencies:得到该RDD对父RDD的依赖关系,依赖关系的构造下文将详细介绍
  • compute:指定该RDD要进行的操作,由于Union只是简单合并,并不需要额外操作,只需调用父RDD的操作即可
  • partitioner:该RDD中没有重写改变量,继承自父类为None,说明该RDD不包含分区元数据

RDD操作

Spark中RDD的操作分为两类:转换(Transformations)和动作(Actions)。转换类操作从一个已有的RDD创建一个新的RDD而actions类操作则触发RDD的计算并将结果返回给用户程序。

Spark中转换类操作并不会立即触发计算,它只是将操作记录下来,只有当动作类操作执行时才会触发计算的执行。注意,对于一个转换类操作得到的RDD,一次动作类操作会计算一次该RDD,因此对于该RDD的多次动作类操作会导致该RDD的重复计算,为了避免这种情况,可以对该RDD执行cache操作,将结果缓存来避免多次重复计算。

Spark中常用的操作如下表所示:

2.1

RDD 逻辑执行图

在了解了RDD的实现和支持的操作后,用户就可以对RDD进行编程来实现数据处理逻辑。Spark会根据用户对RDD的操作来生成逻辑执行图,至于如何生成逻辑执行图实际上要解决以下几个问题:如何产生新的RDD、应该产生哪些RDD、以及如何建立RDD间的依赖关系 。

对于如何产生新的RDD和产生哪些RDD,Spark中会对每个转换类操作产生一个或多个新的RDD,用户调用时就会产生。对于如何计算新的RDD中的数据,则在RDD中compute方法中描述,该方法负责接收上一个RDD或数据源的输入数据,进行计算逻辑,输出结果。下表中给出了一些典型操作会产生的新的RDD以及进行的计算逻辑:

Transformation Generated RDDs Compute()
map(func) MappedRDD iterator(split).map(f)
filter(func) FilteredRDD iterator(split).filter(f)
flatMap(func) FlatMappedRDD iterator(split).flatMap(f)
mapPartitions(func) MapPartitionsRDD f(iterator(split))
mapPartitionsWithIndex(func) MapPartitionsRDD f(split.index, iterator(split))

对于如何建立RDD间的依赖关系,需要解决RDD整体依赖和分区依赖两个层次的问题:RDD整体依赖是指RDD本省的依赖,比如x = a.join(b) 表明x同时依赖a和b,通常在构建新的RDD时指定;分区依赖指的是生成RDD中每个分区与父RDD中分区的依赖关系,是依赖父RDD中的一个分区还是多个分区,不同转换操作的依赖关系可以分为下图中的几类情况:

2.2

其中前面三种为完全依赖在Spark中称为NarrowDependency,部分依赖称为ShuffleDependency。其中NarrowDependency又包括两个特例子类:OneToOneDependency和RangeDependency,OneToOneDependency对应上图的第一种情况而RangeDependency只在UnionRDD中出现,所有NarrowDependency中具体子RDD中的 partitoin i 依赖父RDD中一个 partition 还是多个partitions,是由NarrowDependency中的getParents(partition i)决定的。 下面给出一些典型操作的计算过程以及数据依赖图:

  1. Union(otherRDD):union() 将两个 RDD 简单合并在一起,不改变 partition 里面的数据。RangeDependency 实际上也是 1:1,只是为了访问 union() 后的 RDD 中的 partition 方便,保留了原始 RDD 的 range 边界。

2.3

  1. groupByKey(numPartitions):将 Key相同的records聚合在一起,一个简单的 shuffle过程就可以完成。ShuffledRDD中的compute()只负责将属于每个partition的数据 fetch过来,之后使用mapPartitions()操作进行aggregate,生成 MapPartitionsRDD。

2.4

  1. reduceyByKey(func, numPartitions):相当于传统的 MapReduce,整个数据流也与 Hadoop 中的数据流基本一样。reduceyByKey()默认在 map 端开启combine(),因此在 shuffle之前先通过 mapPartitions操作进行combine,得到MapPartitionsRDD,然后 shuffle得到ShuffledRDD,然后再进行reduce(通过 aggregate + mapPartitions() 操作来实现)得到 MapPartitionsRDD。

2.5

  1. cartesian(otherRDD):Cartesian 对两个 RDD 做笛卡尔集,生成的 CartesianRDD 中 partition个数 = partitionNum(RDD a) * partitionNum(RDD b)。这里的依赖关系与前面的不太一样,CartesianRDD 中每个partition 依赖两个parent RDD,而且其中每个partition完全依赖RDD a中一个partition,同时又完全依赖RDD b中另一个 partition。这里没有红色箭头,因为所有依赖都是 NarrowDependency。

2.6

小结

RDD是Spark中的核心概念,描述了数据在内存中的抽象,并通过对转换和动作两类操作的支持可方便的使用户编写自己的数据处理应用。本章首先介绍了RDD的概念,然后描述了RDD在Spark中的实现,并列举了一些RDD支持的操作,最后还介绍了Spark如何将对RDD的操作 转化成逻辑执行图并给出了一些例子。通过本章的介绍,我们可以知道Spark逻辑执行计划如何生成,Spark将根据得到的逻辑执行计划生成物理执行计划即DAG,物理计划如何生成以及逻辑计划中的依赖关系如何发挥作用将在下章中介绍。

参考

Spark code

Spark internal