Spark读取的同时进行数据的插入
在SparkSQL里面,执行如下SQL:
create table IF NOT EXISTS spark01 ( name string ) stored as orc; |
会出错:
Spark master: yarn, Application Id: application_1671242281437_0025 |
本质上的问题是Spark对数仓常用的数据类型做了自己的实现方式,在他自己的实现方式下,目标路径会先被清空,随后才执行写入,而Hive是先写入到临时目录,任务完成后再将结果数据替换目标路径。
overwrite 的目录是在算子执行的开始阶段 delete 的,如果是 dynaic partition overwrite,则是留到 commit 阶段。
对于 dynamic partition overwrite,是可以源目录和目标目录是一样的,但是非dynamic则不行,具体到Spark的代码位置是:
src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala的153行:
case InsertIntoDir(_, storage, provider, query, overwrite) |
可以看到只要是overwrite,就会进入DDLUtils.verifyNotReadPath,而这里的检查要求output path不能在read path里面:
def cannotOverwritePathBeingReadFromError(): Throwable = { |
可以强制使用Hive解析器,也就是设置spark.sql.hive.convertMetastoreOrc=false来进行解析,但是这样的设置会带来2个问题,一个是只对先创建表的语句生效,但是如果使用AS语法创建的表的话是不生效的。
其次第二个问题比较严重,就是设置了Hive解析器后,无法对ORC的表进行查询,执行任何select都会有如下问题:
ORC split generation failed with exception: null |
这个问题也无法通过修改strategy处理的,hive.exec.orc.split.strategy是只对hive生效的,spark的话是spark.hadoop.hive.exec.orc.split.strategy,上面的问题是由于sparK走了hive解析器后,存在兼容性的问题。
扫码手机观看或分享: