[Feature] Broker support COS posix bucket (#42638)

Signed-off-by: ricky <rickif@qq.com>
This commit is contained in:
ricky 2024-05-23 10:04:06 +08:00 committed by GitHub
parent cc7abe0070
commit 5b1cebbf48
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 76 additions and 2 deletions

View File

@ -339,6 +339,42 @@ under the License.
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.qcloud.cos/hadoop-cos -->
<dependency>
<groupId>com.qcloud.cos</groupId>
<artifactId>hadoop-cos</artifactId>
<version>3.3.0-8.3.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.qcloud/cos_api-bundle -->
<dependency>
<groupId>com.qcloud</groupId>
<artifactId>cos_api-bundle</artifactId>
<version>5.6.137.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.qcloud/cosn-ranger-interface -->
<dependency>
<groupId>com.qcloud</groupId>
<artifactId>cosn-ranger-interface</artifactId>
<version>1.0.5</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.qcloud/chdfs_hadoop_plugin_network -->
<dependency>
<groupId>com.qcloud</groupId>
<artifactId>chdfs_hadoop_plugin_network</artifactId>
<version>3.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.qcloud/hadoop-ranger-client-for-hadoop -->
<dependency>
<groupId>com.qcloud</groupId>
<artifactId>hadoop-ranger-client-for-hadoop</artifactId>
<version>3.3.0-4.1</version>
</dependency>
</dependencies>
<build>

View File

@ -149,6 +149,10 @@ public class FileSystemManager {
private static final String FS_COS_ENDPOINT = "fs.cosn.bucket.endpoint_suffix";
private static final String FS_COS_IMPL_DISABLE_CACHE = "fs.cosn.impl.disable.cache";
private static final String FS_COS_IMPL = "fs.cosn.impl";
private static final String FS_ABSTRACTFILESYSTEM_COSN_IMPL = "fs.AbstractFileSystem.cosn.impl";
private static final String FS_COSN_TRSF_FS_ABSTRACTFILESYSTEM_OFS_IMPL = "fs.cosn.trsf.fs.AbstractFileSystem.ofs.impl";
private static final String FS_COSN_TRSF_FS_OFS_IMPL = "fs.cosn.trsf.fs.ofs.impl";
private static final String FS_COSN_CREDENTIALS_PROVIDER = "fs.cosn.credentials.provider";
// arguments for obs
private static final String FS_OBS_ACCESS_KEY = "fs.obs.access.key";
@ -819,8 +823,42 @@ public class FileSystemManager {
conf.set(FS_COS_ENDPOINT, endpoint);
conf.set(FS_COS_IMPL, "org.apache.hadoop.fs.CosFileSystem");
conf.set(FS_COS_IMPL_DISABLE_CACHE, disableCache);
FileSystem cosFileSystem = FileSystem.get(pathUri.getUri(), conf);
fileSystem.setFileSystem(cosFileSystem);
// Too many configuration items, so we directly pass through the properties.
for (Map.Entry<String, String> entry : properties.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
String authentication = properties.getOrDefault(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
AUTHENTICATION_SIMPLE);
UserGroupInformation ugi = null;
if (authentication.equalsIgnoreCase(AUTHENTICATION_KERBEROS)) {
conf.set(FS_ABSTRACTFILESYSTEM_COSN_IMPL, "org.apache.hadoop.fs.CosN");
conf.set(FS_COSN_TRSF_FS_ABSTRACTFILESYSTEM_OFS_IMPL, "com.qcloud.chdfs.fs.CHDFSDelegateFSAdapter");
conf.set(FS_COSN_TRSF_FS_OFS_IMPL, "com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapter");
conf.set(FS_COSN_CREDENTIALS_PROVIDER, "org.apache.hadoop.fs.auth.RangerCredentialsProvider");
String principal = properties.getOrDefault(KERBEROS_PRINCIPAL,"");
String keytab = properties.getOrDefault(KERBEROS_KEYTAB,"");
UserGroupInformation.setConfiguration(conf);
if (!principal.isEmpty() && !keytab.isEmpty()) {
ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
}
}
FileSystem cosFileSystem;
if (ugi != null) {
cosFileSystem = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return FileSystem.get(pathUri.getUri(), conf);
}
});
} else {
cosFileSystem = FileSystem.get(pathUri.getUri(), conf);
}
fileSystem.setFileSystem(cosFileSystem, authentication.equalsIgnoreCase(AUTHENTICATION_KERBEROS));
}
return fileSystem;
} catch (Exception e) {