最近碰到一个问题,在SparkSQL中周期调度,每天执行类似如下SQL:

drop table if exists testdb.testtbl;
create table testdb.testtbl select * from testdb.store_sales;

会出现问题:

Error in query: Can not create the managed table('`testdb`.`testtbl`'). The associated location('s3a://bucket/testdb/testtbl') already exists.

SparkSQL这边也能看到如下日志:

drop table if exists testdb.testtbl
23/09/23 16:16:02 WARN RetryingMetaStoreClient: MetaStoreClient lost connection. Attempting to reconnect (1 of 1) after 5s. dropTable
org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_drop_table_with_environment_context(ThriftHiveMetastore.java:1378)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.drop_table_with_environment_context(ThriftHiveMetastore.java:1362)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.drop_table_with_environment_context(HiveMetaStoreClient.java:2447)
at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.drop_table_with_environment_context(SessionHiveMetaStoreClient.java:114)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.dropTable(HiveMetaStoreClient.java:1130)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.dropTable(HiveMetaStoreClient.java:1066)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:173)
at com.sun.proxy.$Proxy31.dropTable(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2372)
at com.sun.proxy.$Proxy31.dropTable(Unknown Source)
at org.apache.hadoop.hive.ql.metadata.Hive.dropTable(Hive.java:1201)

我写了一个简单的demo就能复现这个开源的问题,进入SparkSQL执行如下SQL:

drop table if exists testdb.testtbl;
create table testdb.testtbl select * from testdb.store_sales;

注意,store_sales 是tpcds 里面的表,我直接拿了tpcds作为例子,关于如何使用tpcds这里不做过多介绍,自行查看。

由于这个store_sales 是个超大表,所以drop 会执行很长时间,当SparkSQL正在执行drop table的时候,让hms崩掉:

image

如下通过kill -9 让hms崩掉:

image

此时Spark这边会收到错误:

image

但是在testtbl背后的存储上依旧可以看到残留的文件:

image

此时执行create 语句:

image

出现错误, 这个问题的核心原因在于hms的drop table是非原子性的,通过hms的代码 org/apache/hadoop/hive/metastore/HiveMetaStore.java :

private boolean drop_table_core(final RawStore ms, final String catName, final String dbname,
final String name, final boolean deleteData,
final EnvironmentContext envContext, final String indexName)
throws NoSuchObjectException, MetaException, IOException, InvalidObjectException,
InvalidInputException {
// 省略其他代码
if (!ms.dropTable(catName, dbname, name)) {
String tableName = getCatalogQualifiedTableName(catName, dbname, name);
throw new MetaException(indexName == null ? "Unable to drop table " + tableName:
"Unable to drop index table " + tableName + " for index " + indexName);
} else {
if (!transactionalListeners.isEmpty()) {
transactionalListenerResponses =
MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
EventType.DROP_TABLE,
new DropTableEvent(tbl, true, deleteData, this),
envContext);
}
success = ms.commitTransaction();
}
} finally {
if (!success) {
ms.rollbackTransaction();
} else if (deleteData && !isExternal) {
// Data needs deletion. Check if trash may be skipped.
// Delete the data in the partitions which have other locations
deletePartitionData(partPaths, ifPurge, db);
// Delete the data in the table
deleteTableData(tblPath, ifPurge, db);
// ok even if the data is not deleted
}

if (!listeners.isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(listeners,
EventType.DROP_TABLE,
new DropTableEvent(tbl, success, deleteData, this),
envContext,
transactionalListenerResponses, ms);
}
}
return success;
}

可以看到在ms.dropTable(catName, dbname, name)里面先删除hms的元数据,然后在finally的时候去删除物理文件,这样就有一个潜在的风险,比如删除大表的时候,hms出问题,导致元数据和数据对不齐。

同样继续以这个SQL:

drop table if exists testdb.testtbl;
create table testdb.testtbl select * from testdb.store_sales;

为例,当执行drop table的时候,通过kill -9 杀死hms,会看到spark这边出现:

23/09/23 16:16:02 WARN RetryingMetaStoreClient: MetaStoreClient lost connection. Attempting to reconnect (1 of 1) after 5s. dropTable
org.apache.thrift.transport.TTransportException

的错误,此时hms中表已经不存在了:

desc testdb.testtbl;
Error in query: Table or view not found: testdb.testtbl; line 1 pos 5;
'DescribeRelation false, [col_name#15, data_type#16, comment#17]
+- 'UnresolvedTableOrView [testdb, testtbl], DESCRIBE TABLE, true

但表背后的底层物理数据依旧存在,此时去执行:

create table testdb.testtbl select * from testdb.store_sales;

的时候就会出现:

Error in query: Can not create the managed table('`testdb`.`testtbl`'). The associated location('s3a://bucket/testdb/testtbl') already exists.

因为在SparkSQL通过create select的方式去写hive表的时候,会额外做校验:

  private def validateNewLocationOfRename(
oldName: TableIdentifier,
newName: TableIdentifier): Unit = {
val oldTable = getTableMetadata(oldName)
if (oldTable.tableType == CatalogTableType.MANAGED) {
val databaseLocation =
externalCatalog.getDatabase(oldName.database.getOrElse(currentDb)).locationUri
val newTableLocation = new Path(new Path(databaseLocation), formatTableName(newName.table))
val fs = newTableLocation.getFileSystem(hadoopConf)
if (fs.exists(newTableLocation)) {
throw QueryCompilationErrors.cannotOperateManagedTableWithExistingLocationError(
"rename", oldName, newTableLocation)
}
}
}
def cannotOperateManagedTableWithExistingLocationError(
methodName: String, tableIdentifier: TableIdentifier, tableLocation: Path): Throwable = {
new AnalysisException(s"Can not $methodName the managed table('$tableIdentifier')" +
s". The associated location('${tableLocation.toString}') already exists.")
}

在SparkSQL中操作MANAGED表的时候,目标表的背后的存储目录存在,且非空,SparkSQL则会抛错误。

如下时序便是生产环境下一个由于hms的资源不足引起jvm崩溃从而导致数据不一致的一个时序:

  • 2022-04-10 02:52:51 HSM接收到drop 的指令
  • 2022-04-10 02:53:01 的HSM JVM开始不稳定,频繁GC,且中断响应
  • 2022-04-10 02:54:22 的HSM JVM崩溃且自动拉起,导致之前正在执行的drop 指令中断,造成文件只删除了一半没有完整删除,由于Hive是先删除rds的记录,再去删除文件,因为hms中已经没有了表的记录,但是目录依旧存在残留。
  • 2022-04-10 02:55:00 的HMS接收到spark的create指令,由于的目录已经存在,导致创建失败。

对于这类问题,一般有2种方案:

  • 从运维入手:提升hms的稳定性,比如增大内存等等。
  • 从代码入手:比如修改hive源码,让其先删数据,再删meta,这样会有一个副作用就是删除过程中,其余的SQL如果在执行查询的时候,会查询到不完整的数据,其次如果有新数据插入,也会带来额外的问题,需慎重。

扫码手机观看或分享: