Spark SQL 是Spark 的结构化数据处理模块,提供一种称为DataFrame的数据抽象模型,也是一个分布式SQL 查询引擎。 其中DataFrame是一种分布式数据集合,其内部数据按照列的形式组织,类似关系型数据库中表的形式,也支持RDD的一些操作。

例子

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
	
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations. 
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

上述代码给出了一个Spark SQL 的应用示例,程序前两句生成SQLContext并导入Spark SQL的上下文环境,程序3、4句加载数据源并将其注册为table,程序第5句对该表进行SQL查询结果为DataFrame对象,第6句对该DataFrame对象进行操作。

SQLContext

SQLContext是Spark SQL的上下文对象,提供编程入口和运行时支持。它包含以下成员:

  • Catalog:一个存储<tableName,logicalPlan>的map结构,查找关系的目录,注册表,注销表,查询表和逻辑计划关系的类。
  • sqlParser:parse传入的sql语句,构建语法树,返回一个Unresolved Logical Plan
  • analyzer: 分析输入的Unresolved Logical Plan ,结合Catalog中的数据生成Resolved Logical Plan
  • optimizer:优化Logical Plan
  • planner:分析输入的Logical Plan,生成physical plan
  • QueryExecution:内部类,包含了SQL 执行的主要流程。

这些对象定义了Spark SQL 的整个工作流,如下图所示:通过SQL/HiveQl parser或是DataFrame API构造的逻辑执行计划经过analyzer的分析之后再经优化得到优化执行计划,接着再转为物理执行计划,并最终转换为RDD DAG在Spark引擎上执行。

5.1

DataFrame

Spark SQL通过DataFrame接口支持多种数据源的操作,对DataFrame的操作可以使用传统的RDD方式,也可以将其注册为表然后使用SQL进行查询,目前对标准SQL select支持较好,支持HiveQL 类Insert(insert into table t select …)不支持标准Insert。

有两类方式来创建DataFrame:使用已有的RDD和使用外部数据源。对于使用已有的RDD,又有两种方式来建立schema:

  1. 使用反射,如上述例子中,可以使用Scala的case class定义schema,然后调用RDD的toDF方法来建立DataFrame,由于case class最多只能支持22个列,因此这适用于简单的情况
  2. 编程定义,对于复杂的情况,用户可以编程实现来自定义Schema,需要首先将原来的RDD转换为ROW类型RDD,然后自己定义StructType来表示Schema,最后通过调用createDataFrame方法来建立DataFrame

对于外部数据源,Spark SQL 支持json、jdbc、orc和parquet等格式,对于这些格式,Spark SQL会自己推导出数据的Schema,比如对于parquet格式,可以使用如下代码:

val parquetFile = sqlContext.read.parquet("people.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

首先调用SQLContext中的read函数,会得到一个DataFrameReader对象,使用该对象可以读取不同格式数据源生成DataFrame,然后就可以将该DataFrame注册为表来进行使用。实现上DataFrameReader对不同数据格式实现了不同的Relation类,这些Relation继承自BaseRelation实现了schema和scan方法,使Spark SQL可以由数据源生成DataFrame。

当DataFrame调用registerTempTable时,会调用SQLContext的registerDataFrameAsTable函数,这会将表名和该DataFrame的logicalPlan保存在catalog中,catalog可以类似理解为数据库中的metaDataStore,不同的是这是内存中的变量,并没有持久化(之所以叫做tempTable),每次SQLContext重启都需要重新注册。

要实现持久化,有两类方式实现:

  1. 借助Hive,HiveContext继承自SQLContext但是增加了对HiveMetaStore和HQL的支持,可以使用saveAsTable函数,这会创建一个managed table,将数据和metaData同时持久化。这种方式虽然可行,但是需要借助Hive,若只是为了metaStore有点繁重。
  2. 自己实现,元数据的持久化关键就是catalog变量的持久化,目前catalog是一个存储<tableName,logicalPlan>的map内存结构,增加对它的持久化管理即可。

源码分析

Spark SQL代码主要包含4个模块,其中两个hive相关的提供hive支持,重要的两个是catalyst和core。

Catalyst是Spark SQL 项目中的一个独立库,目标是与Spark解耦,处理执行计划的生成和优化等。 Catalyst SQL parser是catalyst中的一部分,目前Spark SQL的parser是一个用Scala编写的parser,将输入的Sql解析成一系列的catalyst expressions。

Catalyst expressions 是执行计划表示的数据结构,对应SQL中的多种操作。

Catalyst Analyzer将结合catalog中的数据,对输入的logic plan进行一系列的Rule操作,将unresolved expressions变成resolved expressions。

Catalyst Optimizer定义了一系列的优化策略,可以对logic plan 进行性能优化。

Catalyst Plans 定义了一些数据结构来抽象表示physical plan,也定义了一些logic plan 到physical plan的转化等操作。

Core是Spark SQL中另外一个重要的部分,定义Spark SQL 的编程接口和执行支持。Catalyst生成的physical plan 需要执行,core中的execution包内代码实现了这部分功能,实现了physical plan到RDD的转化,进而将Sql 查询转化为Spark RDD操作。

小结

本章分析了Spark SQL的执行流程和代码组织实现。首先通过一个例子了解了Spark SQL 的编程模式;然后分析代码给出了SQL的执行流程,以及数据导入导出的相关逻辑;最后给出了源码组织结构,以便进一步阅读。

进一步的,Spark SQL作为分布式数据查询引擎的设计初衷是方便用户使用SQL查询已有的数据,并不是作为关系型数据使用,因此在使用上和功能上与关系型数据库有所区别。

使用上,对于已有的数据要使用Spark SQL进行查询,需要以下几步:

  1. 将数据文件建立成为DataFrame
  2. 将DataFrame注册为tempTable
  3. 对该tempTable使用Sql查询
  4. 查询结果同样是DataFrame,可以继续注册为临时表进行查询,也可以保存到文件或表方便下次使用

第1步建立DataFrame的逻辑在上述文档中已有说明,对parquet文件格式的支持也在此完成,主要代码在org.apache.spark.sql.execution.datasources.parquet包内,该部分代码主要实现了解析parquet格式并将建立schema的逻辑,使用该schema就可以建立DataFrame。

第2步对DataFrame调用registerTempTable操作注册临时表,实现上,该方法会调用catalog的registerTable方法: catalog.registerTable(Seq(tableName), df.logicalPlan) 将tableName 和 Schema保存到catalog的map中,这样内存中就有了该表名与表schema的对应关系,作为元数据为后续查询计划的生成使用。

第3步对临时表使用Sql查询,这就与关系型数据库类似,经历了parse、analysis、optimize、plan和execute的过程。该部分代码分布在catalyst和spark core中。

第4步得到查询结果,对此结果可以使用DataFrame支持的操作继续计算,也可以保存为文件或表。保存操做会使用DataFrameWriter实现,对不同文件格式的支持也在org.apache.spark.sql.execution.datasources包内,比如parquet的org.apache.spark.sql.execution.datasources.parquet这与第1步的read操作相同。保存为表与文件的区别就是表带有Spark SQL的schema信息,下次可以使用table()操作导入,不需要再次注册。

至此,我们完成了一个标准的Spark SQL 使用流程,需要注意的是在此都是查询类SQL,并没有更新类SQL,实际上对于注册的临时表Spark SQL 并不支持更新类SQL操作(insert into,update),也就是说一旦将数据源注册为临时表,则只能对该数据源进行查询操作。但是,可以在DataFrame注册为临时表之前对其进行一些操作(union,join)来得到新的DataFrame,进而对结果DataFrame注册临时表进行SQL查询。

对于元数据的操作,Spark SQL 通过catalog对象来实现,Spark SQL 默认使用SimpleCatalog类,该类支持简单的元数据管理(表注册,表关系查询,表删除等)但是这个并没有持久化支持。当使用Hive时,Spark SQL使用HiveMetastoreCatalog类,该类将Spark SQL 的catalog操作用Hive 的metaDataStore api实现,利用hive的metaDataStore实现持久化,但使用这种方式主要是为了与Hive兼容,并不只是为了metaData的持久化。这两种方式的类都实现了org.apache.spark.sql.catalyst.analysis.Catalog接口,我们也可以自己编写一个实现该接口的类来管理catalog(使用postgresql)满足我们的需求,将catalog初始化、表注册、表删除、表关系查找等操作使用postgresql来实现。

参考

Spark code

Spark internal