[Enhancement] support broker load from azure abfs (#16760)

In this pr, we support broker load from azure abfs.
We remove the check of path prefix in getFileSystem, if the path prefix is not hdfs/s3a/oss/cos...,
we will try to create a generic file system. The reason why we can do this is because hadoop/s3 SDK
is compatible with all file/object storage system.
Then if in future, if there is a request for supporting new file system,
we only need to add the settings in hdfs-site.xml, core-site.xml of BE/FE, and the jar lib in the specific place.

Signed-off-by: xyz <a997647204@gmail.com>
This commit is contained in:
xyz 2023-01-29 16:46:17 +08:00 committed by GitHub
parent d8a72ac9cc
commit ce2909036a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 154 additions and 3 deletions

View File

@ -101,6 +101,7 @@ class ConfigurationWrap extends Configuration {
String key = enums.nextElement();
String value = props.getProperty(key);
switch (tObjectStoreType) {
case UNIVERSAL_FS:
case S3:
switch (key) {
case HdfsFsManager.FS_S3A_ACCESS_KEY:
@ -367,7 +368,10 @@ public class HdfsFsManager {
} else if (scheme.equals(OBS_SCHEME)) {
brokerFileSystem = getOBSFileSystem(path, loadProperties, tProperties);
} else {
throw new UserException("invalid path. scheme is not supported");
// If all above match fails, then we will read the settings from hdfs-site.xml, core-site.xml of FE,
// and try to create a universal file system. The reason why we can do this is because hadoop/s3
// SDK is compatible with nearly all file/object storage system
brokerFileSystem = getUniversalFileSystem(path, loadProperties, tProperties);
}
return brokerFileSystem;
}
@ -750,6 +754,88 @@ public class HdfsFsManager {
}
}
/**
* visible for test
* <p>
* file system handle is cached, the identity is endpoint + bucket + accessKey_secretKey
*
* @param path
* @param properties
* @return
* @throws URISyntaxException
* @throws Exception
*/
public HdfsFs getUniversalFileSystem(String path, Map<String, String> loadProperties, THdfsProperties tProperties)
throws UserException {
String disableCacheHDFS = loadProperties.getOrDefault(FS_HDFS_IMPL_DISABLE_CACHE, "true");
String disableCacheHDFSLowerCase = disableCacheHDFS.toLowerCase();
if (!(disableCacheHDFSLowerCase.equals("true") || disableCacheHDFSLowerCase.equals("false"))) {
LOG.warn("invalid disable cache: " + disableCacheHDFS);
throw new UserException("invalid disable cache: " + disableCacheHDFS);
}
String disableCacheS3 = loadProperties.getOrDefault(FS_HDFS_IMPL_DISABLE_CACHE, "true");
String disableCacheS3LowerCase = disableCacheS3.toLowerCase();
if (!(disableCacheS3LowerCase.equals("true") || disableCacheS3LowerCase.equals("false"))) {
LOG.warn("invalid disable cache: " + disableCacheS3);
throw new UserException("invalid disable cache: " + disableCacheS3);
}
// skip xxx:// first
int bucketEndIndex = path.indexOf("://", 0);
// find the end of bucket, for example for xxx://abc/def, we will take xxx://abc as host
if (bucketEndIndex != -1) {
bucketEndIndex = path.indexOf("/", bucketEndIndex + 3);
}
String host = path;
if (bucketEndIndex != -1) {
host = path.substring(0, bucketEndIndex);
}
HdfsFsIdentity fileSystemIdentity = new HdfsFsIdentity(host, "");
cachedFileSystem.putIfAbsent(fileSystemIdentity, new HdfsFs(fileSystemIdentity));
HdfsFs fileSystem = cachedFileSystem.get(fileSystemIdentity);
if (fileSystem == null) {
// it means it is removed concurrently by checker thread
return null;
}
fileSystem.getLock().lock();
try {
if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
// this means the file system is closed by file system checker thread
// it is a corner case
return null;
}
if (fileSystem.getDFSFileSystem() == null) {
LOG.info("could not find file system for path " + path + " create a new one");
// create a new filesystem
Configuration conf = new ConfigurationWrap();
conf.set(FS_S3A_IMPL_DISABLE_CACHE, disableCacheHDFS);
conf.set(FS_S3A_IMPL_DISABLE_CACHE, disableCacheS3);
FileSystem genericFileSystem = FileSystem.get(new Path(path).toUri(), conf);
fileSystem.setFileSystem(genericFileSystem);
fileSystem.setConfiguration(conf);
if (tProperties != null) {
convertObjectStoreConfToProperties(path, conf, tProperties, TObjectStoreType.UNIVERSAL_FS);
}
} else {
if (tProperties != null) {
convertObjectStoreConfToProperties(path, fileSystem.getConfiguration(), tProperties,
TObjectStoreType.UNIVERSAL_FS);
}
}
return fileSystem;
} catch (Exception e) {
LOG.error("errors while connect to " + path, e);
throw new UserException(e);
} finally {
fileSystem.getLock().unlock();
}
}
/**
* visible for test
* <p>

View File

@ -221,8 +221,10 @@ public class FileSystemManager {
} else if (scheme.equals(OBS_SCHEME)) {
brokerFileSystem = getOBSFileSystem(path, properties);
} else {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
"invalid path. scheme is not supported");
// If all above match fails, then we will read the settings from hdfs-site.xml, core-site.xml of FE,
// and try to create a universal file system. The reason why we can do this is because hadoop/s3
// SDK is compatible with nearly all file/object storage system
brokerFileSystem = getUniversalFileSystem(path, properties);
}
return brokerFileSystem;
}
@ -629,6 +631,67 @@ public class FileSystemManager {
}
}
/**
* visible for test
* <p>
* file system handle is cached, the identity is endpoint + bucket + accessKey_secretKey
*
* @param path
* @param properties
* @return
* @throws URISyntaxException
* @throws Exception
*/
public BrokerFileSystem getUniversalFileSystem(String path, Map<String, String> properties) {
String disableCacheHDFS = properties.getOrDefault(FS_HDFS_IMPL_DISABLE_CACHE, "true");
String disableCacheS3 = properties.getOrDefault(FS_HDFS_IMPL_DISABLE_CACHE, "true");
// skip xxx:// first
int bucketEndIndex = path.indexOf("://", 0);
// find the end of bucket, for example for xxx://abc/def, we will take xxx://abc as host
if (bucketEndIndex != -1) {
bucketEndIndex = path.indexOf("/", bucketEndIndex + 3);
}
String host = path;
if (bucketEndIndex != -1) {
host = path.substring(0, bucketEndIndex);
}
FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, "");
cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity));
BrokerFileSystem fileSystem = cachedFileSystem.get(fileSystemIdentity);
if (fileSystem == null) {
// it means it is removed concurrently by checker thread
return null;
}
fileSystem.getLock().lock();
try {
if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
// this means the file system is closed by file system checker thread
// it is a corner case
return null;
}
if (fileSystem.getDFSFileSystem() == null) {
logger.info("could not find file system for path " + path + " create a new one");
// create a new filesystem
Configuration conf = new Configuration();
conf.set(FS_S3A_IMPL_DISABLE_CACHE, disableCacheHDFS);
conf.set(FS_S3A_IMPL_DISABLE_CACHE, disableCacheS3);
FileSystem genericFileSystem = FileSystem.get(new Path(path).toUri(), conf);
fileSystem.setFileSystem(genericFileSystem);
}
return fileSystem;
} catch (Exception e) {
logger.error("errors while connect to " + path, e);
throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
} finally {
fileSystem.getLock().unlock();
}
}
/**
* visible for test
* <p>

View File

@ -168,11 +168,13 @@ struct TBrokerRangeDesc {
}
enum TObjectStoreType {
HDFS,
S3,
KS3,
OSS,
COS,
OBS,
UNIVERSAL_FS
}
struct THdfsProperty {