[Enhancement] broker support multiple name services (#23510)

Fixes #23509

Signed-off-by: blanklin030 <luis01@foxmail.com>
This commit is contained in:
BlankLin 2023-07-04 14:04:49 +08:00 committed by GitHub
parent 346e8cc783
commit 3a57f9a44c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 39 deletions

View File

@ -390,48 +390,52 @@ public class FileSystemManager {
}
}
if (!Strings.isNullOrEmpty(dfsNameServices)) {
// ha hdfs arguments
final String dfsHaNameNodesKey = DFS_HA_NAMENODES_PREFIX + dfsNameServices;
if (!properties.containsKey(dfsHaNameNodesKey)) {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
"load request missed necessary arguments for ha mode");
}
String dfsHaNameNodes = properties.get(dfsHaNameNodesKey);
conf.set(DFS_NAMESERVICES_KEY, dfsNameServices);
conf.set(dfsHaNameNodesKey, dfsHaNameNodes);
String[] nameNodes = dfsHaNameNodes.split(",");
if (nameNodes == null) {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
"invalid " + dfsHaNameNodesKey + " configuration");
} else {
for (String nameNode : nameNodes) {
nameNode = nameNode.trim();
String nameNodeRpcAddress =
DFS_HA_NAMENODE_RPC_ADDRESS_PREFIX + dfsNameServices + "." + nameNode;
if (!properties.containsKey(nameNodeRpcAddress)) {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
"missed " + nameNodeRpcAddress + " configuration");
} else {
conf.set(nameNodeRpcAddress, properties.get(nameNodeRpcAddress));
logger.info("dfs.name.services:" + dfsNameServices);
String[] dfsNameServiceArray = dfsNameServices.split("\\s*,\\s*");
for (String dfsNameService : dfsNameServiceArray) {
// ha hdfs arguments
dfsNameService = dfsNameService.trim();
final String dfsHaNameNodesKey = DFS_HA_NAMENODES_PREFIX + dfsNameService;
if (!properties.containsKey(dfsHaNameNodesKey)) {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
"load request missed necessary arguments for ha mode");
}
String dfsHaNameNodes = properties.get(dfsHaNameNodesKey);
conf.set(dfsHaNameNodesKey, dfsHaNameNodes);
String[] nameNodes = dfsHaNameNodes.split(",");
if (nameNodes == null) {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
"invalid " + dfsHaNameNodesKey + " configuration");
} else {
for (String nameNode : nameNodes) {
nameNode = nameNode.trim();
String nameNodeRpcAddress =
DFS_HA_NAMENODE_RPC_ADDRESS_PREFIX + dfsNameService + "." + nameNode;
if (!properties.containsKey(nameNodeRpcAddress)) {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
"missed " + nameNodeRpcAddress + " configuration");
} else {
conf.set(nameNodeRpcAddress, properties.get(nameNodeRpcAddress));
}
}
}
}
final String dfsClientFailoverProxyProviderKey =
DFS_CLIENT_FAILOVER_PROXY_PROVIDER_PREFIX + dfsNameServices;
if (properties.containsKey(dfsClientFailoverProxyProviderKey)) {
conf.set(dfsClientFailoverProxyProviderKey,
properties.get(dfsClientFailoverProxyProviderKey));
} else {
conf.set(dfsClientFailoverProxyProviderKey,
DEFAULT_DFS_CLIENT_FAILOVER_PROXY_PROVIDER);
}
if (properties.containsKey(FS_DEFAULTFS_KEY)) {
conf.set(FS_DEFAULTFS_KEY, properties.get(FS_DEFAULTFS_KEY));
}
if (properties.containsKey(DFS_HA_NAMENODE_KERBEROS_PRINCIPAL_PATTERN)) {
conf.set(DFS_HA_NAMENODE_KERBEROS_PRINCIPAL_PATTERN,
properties.get(DFS_HA_NAMENODE_KERBEROS_PRINCIPAL_PATTERN));
final String dfsClientFailoverProxyProviderKey =
DFS_CLIENT_FAILOVER_PROXY_PROVIDER_PREFIX + dfsNameService;
if (properties.containsKey(dfsClientFailoverProxyProviderKey)) {
conf.set(dfsClientFailoverProxyProviderKey,
properties.get(dfsClientFailoverProxyProviderKey));
} else {
conf.set(dfsClientFailoverProxyProviderKey,
DEFAULT_DFS_CLIENT_FAILOVER_PROXY_PROVIDER);
}
if (properties.containsKey(FS_DEFAULTFS_KEY)) {
conf.set(FS_DEFAULTFS_KEY, properties.get(FS_DEFAULTFS_KEY));
}
if (properties.containsKey(DFS_HA_NAMENODE_KERBEROS_PRINCIPAL_PATTERN)) {
conf.set(DFS_HA_NAMENODE_KERBEROS_PRINCIPAL_PATTERN,
properties.get(DFS_HA_NAMENODE_KERBEROS_PRINCIPAL_PATTERN));
}
}
}

View File

@ -211,4 +211,27 @@ public class TestFileSystemManager extends TestCase {
assertNotNull(fs);
fs.getDFSFileSystem().close();
}
@Test
public void testGetFileSystemForMultipleNameServicesHA() throws IOException {
Map<String, String> properties = new HashMap<String, String>();
properties.put("username", "user");
properties.put("password", "passwd");
properties.put("fs.defaultFS", "hdfs://starrocks");
properties.put("dfs.nameservices", "DClusterNmg1,DClusterNmg2");
properties.put("dfs.ha.namenodes.DClusterNmg1", "nn11,nn12");
properties.put("dfs.namenode.rpc-address.DClusterNmg1.nn11", "127.0.0.1:8888");
properties.put("dfs.namenode.rpc-address.DClusterNmg1.nn12", "127.0.0.1:7777");
properties.put("dfs.client.failover.proxy.provider.DClusterNmg1",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
properties.put("dfs.ha.namenodes.DClusterNmg2", "nn21,nn22");
properties.put("dfs.namenode.rpc-address.DClusterNmg2.nn21", "127.0.0.1:8020");
properties.put("dfs.namenode.rpc-address.DClusterNmg2.nn22", "127.0.0.1:8030");
properties.put("dfs.client.failover.proxy.provider.DClusterNmg2",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
BrokerFileSystem fs = fileSystemManager.getFileSystem(testHdfsHost + "/user/user/dd_cdm/hive/dd_cdm/", properties);
assertNotNull(fs);
fs.getDFSFileSystem().close();
}
}