收到一个问题反馈,Hive2写入内部对象存储的时候,会出现一个问题就是insert 的时候只有第一条会插入成功,后面就再也不能插入数据了,会出错:

Error: org.apache.hive.service.cli.HiveSQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.MoveTask. org.apache.hadoop.fs.FileAlreadyExistsException: Failed to rename abc://fcbai/.hive-staging_hive_2022-09-07_16-51-23_310_3624164837504371799-1/-ext-10000/000000_0 to abc://fcbai/000000_0; destination file exists
at org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:380)
at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:257)
at org.apache.hive.service.cli.operation.SQLOperation.access$800(SQLOperation.java:91)
at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork$1.run(SQLOperation.java:348)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1938)
at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork.run(SQLOperation.java:362)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.fs.FileAlreadyExistsException: Failed to rename abc://fcbai/.hive-staging_hive_2022-09-07_16-51-23_310_3624164837504371799-1/-ext-10000/000000_0 to abc://fcbai/000000_0; destination file exists
at org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:2973)
at org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:3304)
at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:2029)
at org.apache.hadoop.hive.ql.exec.MoveTask.execute(MoveTask.java:360)
at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:199)
at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:100)
at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:2183)
at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1839)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1526)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1237)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1232)
at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:255)
... 11 more
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: Failed to rename abc://fcbai/.hive-staging_hive_2022-09-07_16-51-23_310_3624164837504371799-1/-ext-10000/000000_0 to abc://fcbai/000000_0; destination file exists
at org.apache.hadoop.hive.ql.metadata.Hive.mvFile(Hive.java:3053)
at org.apache.hadoop.hive.ql.metadata.Hive.access$200(Hive.java:169)
at org.apache.hadoop.hive.ql.metadata.Hive$4.call(Hive.java:2946)
at org.apache.hadoop.hive.ql.metadata.Hive$4.call(Hive.java:2941)
... 4 more (state=08S01,code=1)

在Hadoop3.x的系列下是没有问题的,只有2.x下的集群才有问题,通过堆栈可以看到问题是Hive的mvFile方法的问题,当前版本为Hive 2.3.9,查看Hive.mvFile(Hive.java:3053)对应方法可以看到:

int counter = 1;
if (!isRenameAllowed || isBlobStoragePath) {
while (destFs.exists(destFilePath)) {
destFilePath = new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : ""));
counter++;
}
}

if (isRenameAllowed) {
while (!destFs.rename(sourcePath, destFilePath)) {
destFilePath = new Path(destDirPath, name + ("_copy_" + counter) + (!type.isEmpty() ? "." + type : ""));
counter++;
}
} else if (isSrcLocal) {
destFs.copyFromLocalFile(sourcePath, destFilePath);
} else {
FileUtils.copy(sourceFs, sourcePath, destFs, destFilePath,
true, // delete source
false, // overwrite destination
conf);
}

return destFilePath;

从正常来说,创建的文件的目录结构应该是这个样子:

-rw-rw-rw-   1 root root          6 2022-09-07 14:38 abc://fcbai/000000_0
-rw-rw-rw- 1 root root 6 2022-09-07 14:39 abc://fcbai/000000_0_copy_1
-rw-rw-rw- 1 root root 6 2022-09-07 16:52 abc://fcbai/000000_0_copy_10
-rw-rw-rw- 1 root root 6 2022-09-07 16:52 abc://fcbai/000000_0_copy_11
-rw-rw-rw- 1 root root 6 2022-09-07 17:21 abc://fcbai/000000_0_copy_12
-rw-rw-rw- 1 root root 6 2022-09-07 14:40 abc://fcbai/000000_0_copy_2
-rw-rw-rw- 1 root root 6 2022-09-07 14:41 abc://fcbai/000000_0_copy_3
-rw-rw-rw- 1 root root 6 2022-09-07 15:07 abc://fcbai/000000_0_copy_4
-rw-rw-rw- 1 root root 6 2022-09-07 15:08 abc://fcbai/000000_0_copy_5
-rw-rw-rw- 1 root root 6 2022-09-07 15:08 abc://fcbai/000000_0_copy_6
-rw-rw-rw- 1 root root 6 2022-09-07 15:46 abc://fcbai/000000_0_copy_7
-rw-rw-rw- 1 root root 6 2022-09-07 16:30 abc://fcbai/000000_0_copy_8
-rw-rw-rw- 1 root root 6 2022-09-07 16:52 abc://fcbai/000000_0_copy_9

但是出错的Hive并没有进行Copy,也就意味着没有进入if (!isRenameAllowed || isBlobStoragePath) 这个逻辑,在这个逻辑里面有个isBlobStoragePath变量,是通过如下代码获取:

boolean isBlobStoragePath = BlobStorageUtils.isBlobStoragePath(conf, destDirPath);

public static boolean isBlobStoragePath(final Configuration conf, final Path path) {
return path != null && isBlobStorageScheme(conf, path.toUri().getScheme());
}

public static boolean isBlobStorageScheme(final Configuration conf, final String scheme) {
Collection<String> supportedBlobStoreSchemes =
conf.getStringCollection(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname);

return supportedBlobStoreSchemes.contains(scheme);
}

也就是最终是通过:

HIVE_BLOBSTORE_SUPPORTED_SCHEMES("hive.blobstore.supported.schemes", "s3,s3a,s3n",
"Comma-separated list of supported blobstore schemes."),

来进行判断,总结下来就是在Hive2中,如果需要使用对象存储去插入数据,那么需要重新覆盖Hive的hive.blobstore.supported.schemes参数,新增自己的对象存储,例如:

set hive.blobstore.supported.schemes=s3,s3a,s3n,abc;

而3.x的Hive则不存在这个问题,因为在3.X的判断逻辑是:

for (int counter = 1; destFs.exists(destFilePath); counter++) {
if (isOverwrite) {
destFs.delete(destFilePath, false);
break;
}
destFilePath = new Path(destDirPath, name + (Utilities.COPY_KEYWORD + counter) +
((taskId == -1 && !type.isEmpty()) ? "." + type : ""));
}

也就是只要目标文件存在就会去copy。


扫码手机观看或分享: