Spark处理关系型数据库中的分区概念
Spark的分区属于老生常谈的问题,也不是什么新知识,在分布式存储和分布式计算下,分区也是一个非常高频使用的参数,但是当分布式存储和分布式计算多个引擎对到一起的时候,这种概念就非常让人模糊。
HDFS的数据是按照块切分的,在正常的Spark去处理HDFS下,如果不做任何默认的partition修改,那么Spark自行计算出的并发数量等于HDFS上的文件的块的数量。
分布式存储下这都好说,但是我们知道Spark,尤其是SparkSQL现在已经非常易用的可以操作关系型数据库了,例如官方给出的读写例子:
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods |
和:
// Saving data to a JDBC source |
Spark背后对关系型数据库的数据处理,走的依旧是JdbcRDD,这属于基石,那么很多人在这里会有一个疑惑,那就是访问关系型数据库本质上Spark走的还是JDBC,对于JDBC连接来说并没有分布式获取和分区的概念,当我用Spark去读取或者写入数据到关系型数据库的时候,Spark是如何处理并发的?
今天就仔细看看这个问题,因为SparkSQL和Spark本身的原始RDD实际上链路都一样,只是SparkSQL提供了更友好的SQL接口,所以这里不做特别区分,在Spark中,当我们需要对并发数进行调整的时候,可以通过修改RDD的分区数量进行重分区,例如使用coalesce或者通过参数spark.default.parallelism进行修改。
问题其实是如果我们是在使用Spark去访问关系型数据库,这一套对Spark的并发修改的动作,到底能不能生效,能不能提高对关系型数据库的读写性能。
结果是并不能,因为对于Spark来说,这些参数修改的是运行过程中处理数据的时候的并发数量,但是关系型数据库是通过JDBC的方式去访问数据库,在数据源头不具备分布式的概念。
因此这些参数只能解决运行态的并发,并不能解决读取的并发,默认情况下非Mysql的数据库,Spark在进行读取的时候,是单JDBC访问线程,按照每一次100条进行读取,具体代码可以在org/apache/spark/rdd/JdbcRDD.scala中看到:
val url = conn.getMetaData.getURL |
而写读时候则是按照每一次1000条进行批量写入,也就是说正常情况下,在使用Spark去操作关系型数据库的时候,我们针对Spark自身的分区去做调整,修改并发,解决的都是从关系型数据库拿到一批数据后,对这一批数据的并发处理,从读取源头来说,本质上还是单线程一批一批的去获取。
那么对于Spark去操作关系型数据库,如何提高读取并发呢,这个不单单是Spark的问题了,得从关系型数据库入手去操作,首先关系型数据库要满足如下条件:
- 具备分区字段,需要是数值类的(partitionColumn must be a numeric column from the table in question.),经测试,除整型外,float、double、decimal都是可以的。
- lowerBound:下界,必须为整数。
- upperBound:上界,必须为整数。
只有当关系型数据库中的表满足上面的条件后,在Spark去读取的时候,它才知道如何去并行读取关系型数据库的数据,其实逻辑也非常简单,Spark会按照lowerBound和upperBound,再结合每批处理的数据,计算出一个合适的分区数量,最后就给每一个分区挂一个相互不重叠的数据区间,各自去关系型数据库去获取数据,对应的逻辑可以在/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala#L85-L97 中看到:
while (i < numPartitions) { |
从使用上来说,通过这样的参数配置:
spark.read("jdbc") |
就能解决读取关系型数据库的并发的问题,所以对于Spark处理关系型数据库,只需要理解到源头获取数据的多段获取,和获取到数据后,在后续处理的并发处理,就能很容易的理清楚这个问题,对于HDFS来说因为HDFS天生就是按照block的方式去存储的,且nn存储了对应block的offset等信息,所以spark可以直接计算出应该如何去并行读取和处理数据。
在这个特性在关系型数据库下并不存在,因此需要我们在关系型数据库的表中去额外做一些设计,设计一个分区字段,确保Spark能够计算出合理的一个并行读取区间,很多时候处理关系型数据库慢,并不是引擎慢,而是连接关系型数据库的时候,源头慢,这时候去修改Spark的一系列并发参数,实际上是解决不了问题的。
扫码手机观看或分享: