Spark的分区属于老生常谈的问题,也不是什么新知识,在分布式存储和分布式计算下,分区也是一个非常高频使用的参数,但是当分布式存储和分布式计算多个引擎对到一起的时候,这种概念就非常让人模糊。

HDFS的数据是按照块切分的,在正常的Spark去处理HDFS下,如果不做任何默认的partition修改,那么Spark自行计算出的并发数量等于HDFS上的文件的块的数量。

分布式存储下这都好说,但是我们知道Spark,尤其是SparkSQL现在已经非常易用的可以操作关系型数据库了,例如官方给出的读写例子:

// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()

和:

// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()

Spark背后对关系型数据库的数据处理,走的依旧是JdbcRDD,这属于基石,那么很多人在这里会有一个疑惑,那就是访问关系型数据库本质上Spark走的还是JDBC,对于JDBC连接来说并没有分布式获取和分区的概念,当我用Spark去读取或者写入数据到关系型数据库的时候,Spark是如何处理并发的?

今天就仔细看看这个问题,因为SparkSQL和Spark本身的原始RDD实际上链路都一样,只是SparkSQL提供了更友好的SQL接口,所以这里不做特别区分,在Spark中,当我们需要对并发数进行调整的时候,可以通过修改RDD的分区数量进行重分区,例如使用coalesce或者通过参数spark.default.parallelism进行修改。

问题其实是如果我们是在使用Spark去访问关系型数据库,这一套对Spark的并发修改的动作,到底能不能生效,能不能提高对关系型数据库的读写性能。

结果是并不能,因为对于Spark来说,这些参数修改的是运行过程中处理数据的时候的并发数量,但是关系型数据库是通过JDBC的方式去访问数据库,在数据源头不具备分布式的概念。

因此这些参数只能解决运行态的并发,并不能解决读取的并发,默认情况下非Mysql的数据库,Spark在进行读取的时候,是单JDBC访问线程,按照每一次100条进行读取,具体代码可以在org/apache/spark/rdd/JdbcRDD.scala中看到:

val url = conn.getMetaData.getURL
if (url.startsWith("jdbc:mysql:")) {
// setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force
// streaming results, rather than pulling entire resultset into memory.
// See the below URL
// dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html

stmt.setFetchSize(Integer.MIN_VALUE)
} else {
stmt.setFetchSize(100)
}

而写读时候则是按照每一次1000条进行批量写入,也就是说正常情况下,在使用Spark去操作关系型数据库的时候,我们针对Spark自身的分区去做调整,修改并发,解决的都是从关系型数据库拿到一批数据后,对这一批数据的并发处理,从读取源头来说,本质上还是单线程一批一批的去获取。

那么对于Spark去操作关系型数据库,如何提高读取并发呢,这个不单单是Spark的问题了,得从关系型数据库入手去操作,首先关系型数据库要满足如下条件:

  • 具备分区字段,需要是数值类的(partitionColumn must be a numeric column from the table in question.),经测试,除整型外,float、double、decimal都是可以的。
  • lowerBound:下界,必须为整数。
  • upperBound:上界,必须为整数。

只有当关系型数据库中的表满足上面的条件后,在Spark去读取的时候,它才知道如何去并行读取关系型数据库的数据,其实逻辑也非常简单,Spark会按照lowerBound和upperBound,再结合每批处理的数据,计算出一个合适的分区数量,最后就给每一个分区挂一个相互不重叠的数据区间,各自去关系型数据库去获取数据,对应的逻辑可以在/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala#L85-L97 中看到:

while (i < numPartitions) {
val lBound = if (i != 0) s"$column >= $currentValue" else null
currentValue += stride
val uBound = if (i != numPartitions - 1) s"$column < $currentValue" else null
val whereClause =
if (uBound == null) {
lBound
} else if (lBound == null) {
s"$uBound or $column is null"
} else {
s"$lBound AND $uBound"
}
ans += JDBCPartition(whereClause, i)
i = i + 1
}

从使用上来说,通过这样的参数配置:

spark.read("jdbc")
.option("url", url)
.option("dbtable", "pets")
.option("user", user)
.option("password", password)
.option("numPartitions", 10)
.option("partitionColumn", "owner_id")
.option("lowerBound", 1)
.option("upperBound", 10000)

就能解决读取关系型数据库的并发的问题,所以对于Spark处理关系型数据库,只需要理解到源头获取数据的多段获取,和获取到数据后,在后续处理的并发处理,就能很容易的理清楚这个问题,对于HDFS来说因为HDFS天生就是按照block的方式去存储的,且nn存储了对应block的offset等信息,所以spark可以直接计算出应该如何去并行读取和处理数据。

在这个特性在关系型数据库下并不存在,因此需要我们在关系型数据库的表中去额外做一些设计,设计一个分区字段,确保Spark能够计算出合理的一个并行读取区间,很多时候处理关系型数据库慢,并不是引擎慢,而是连接关系型数据库的时候,源头慢,这时候去修改Spark的一系列并发参数,实际上是解决不了问题的。


扫码手机观看或分享: