[Feature] Support create hms/hdfs client in UGI (#34591)

Signed-off-by: yanz <dirtysalt1987@gmail.com>
This commit is contained in:
dirtysalt 2023-11-09 11:35:34 +08:00 committed by GitHub
parent 309ad884e7
commit f86e01b17b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 96 additions and 7 deletions

View File

@ -35,6 +35,7 @@
package org.apache.hadoop.hive.metastore;
import com.google.common.collect.Lists;
import com.starrocks.connector.hadoop.HadoopExt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
@ -436,6 +437,14 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
}
private void open() throws MetaException {
UserGroupInformation ugi = HadoopExt.getInstance().getHMSUGI(conf);
HadoopExt.getInstance().doAs(ugi, () -> {
openInternal();
return null;
});
}
private void openInternal() throws MetaException {
isConnected = false;
TTransportException tte = null;
boolean useSSL = MetastoreConf.getBoolVar(conf, ConfVars.USE_SSL);
@ -544,7 +553,10 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
if (isConnected && !useSasl && MetastoreConf.getBoolVar(conf, ConfVars.EXECUTE_SET_UGI)) {
// Call set_ugi, only in unsecure mode.
try {
UserGroupInformation ugi = SecurityUtils.getUGI();
UserGroupInformation ugi = HadoopExt.getInstance().getHMSUGI(conf);
if (ugi == null) {
ugi = SecurityUtils.getUGI();
}
client.set_ugi(ugi.getUserName(), Arrays.asList(ugi.getGroupNames()));
} catch (LoginException e) {
LOG.warn("Failed to do login. set_ugi() is not successful, " +

View File

@ -0,0 +1,19 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.connector.hadoop;
public interface GenericExceptionAction<R, E extends Exception> {
R run() throws E;
}

View File

@ -15,9 +15,12 @@
package com.starrocks.connector.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.PrivilegedAction;
public class HadoopExt {
private static final Logger LOGGER =
LoggerFactory.getLogger(HadoopExt.class);
@ -39,4 +42,55 @@ public class HadoopExt {
public String getCloudConfString(Configuration conf) {
return conf.get(HDFS_CLOUD_CONFIGURATION_STRING, "");
}
public UserGroupInformation getHMSUGI(Configuration conf) {
return null;
}
public UserGroupInformation getHDFSUGI(Configuration conf) {
return null;
}
public <R, E extends Exception> R doAs(UserGroupInformation ugi, GenericExceptionAction<R, E> action) throws E {
if (ugi == null) {
return action.run();
}
return executeActionInDoAs(ugi, action);
}
private static <R, E extends Exception> R executeActionInDoAs(UserGroupInformation userGroupInformation,
GenericExceptionAction<R, E> action) throws E {
return userGroupInformation.doAs((PrivilegedAction<ResultOrException<R, E>>) () -> {
try {
return new ResultOrException<>(action.run(), null);
} catch (Throwable e) {
return new ResultOrException<>(null, e);
}
}).get();
}
private static class ResultOrException<T, E extends Exception> {
private final T result;
private final Throwable exception;
public ResultOrException(T result, Throwable exception) {
this.result = result;
this.exception = exception;
}
@SuppressWarnings("unchecked")
public T get()
throws E {
if (exception != null) {
if (exception instanceof Error) {
throw (Error) exception;
}
if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
}
throw (E) exception;
}
return result;
}
}
}

View File

@ -592,13 +592,17 @@ public abstract class FileSystem extends Configured
return get(defaultUri, conf); // return default
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
LOGGER.debug("Bypassing cache to create filesystem {}", uri);
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
UserGroupInformation ugi = HadoopExt.getInstance().getHDFSUGI(conf);
FileSystem fs = HadoopExt.getInstance().doAs(ugi, () -> {
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
LOGGER.debug("Bypassing cache to create filesystem {}", uri);
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
});
return fs;
}
/**