在Hadoop中,由于FileSystem自带一个Cache,也就是每一次通过FileSystem.get获得到的fs会自动被cache下来:

public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();

if (scheme == null && authority == null) { // use default FS
return get(conf);
}

if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
LOGGER.debug("Bypassing cache to create filesystem {}", uri);
return createFileSystem(uri, conf);
}

return CACHE.get(uri, conf);
}

所以对于fs.close的调用需要非常谨慎,除非是严格的disable cache,否则close都可能会引起其他正在操作的作业出问题。在hive中使用fs接口操作hdfs文件的时候,当每一个hive sql结束的时候。

hive侧会主动调用:

FileSystem.closeAllForUGI(ugi);

从而关闭fs:

CACHE.closeAll(ugi);

从代码里面可以看到,只有ugi类的fs才会被close:

synchronized void closeAll(UserGroupInformation ugi) throws IOException {
List<FileSystem> targetFSList = new ArrayList<>(map.entrySet().size());
//Make a pass over the list and collect the FileSystems to close
//we cannot close inline since close() removes the entry from the Map
for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
final Key key = entry.getKey();
final FileSystem fs = entry.getValue();
if (ugi.equals(key.ugi) && fs != null) {
targetFSList.add(fs);
}
}
List<IOException> exceptions = new ArrayList<>();
//now make a pass over the target list and close each
for (FileSystem fs : targetFSList) {
try {
fs.close();
}
catch(IOException ioe) {
exceptions.add(ioe);
}
}
if (!exceptions.isEmpty()) {
throw MultipleIOException.createIOException(exceptions);
}
}

这里需要注意一些点,比如如果需要做一些任务,如果利用了jvm的exist hook去做一些操作的就需要小心,因为hive中hms持有的fs会在jvm内常驻,且jvm不会退出。


扫码手机观看或分享: