ATLAS-3129 Fix SSL Truststore reloader leak from AtlasBaseClient
Change-Id: I5af8c26a41a7010de645ddaa6869c3ce15723f43
This commit is contained in:
parent
4d6169f51e
commit
0a81c2505d
|
|
@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
|
|
@ -123,6 +122,7 @@ public class HBaseBridge {
|
|||
|
||||
public static void main(String[] args) {
|
||||
int exitCode = EXIT_CODE_FAILED;
|
||||
AtlasClientV2 atlasClientV2 =null;
|
||||
|
||||
try {
|
||||
Options options = new Options();
|
||||
|
|
@ -142,7 +142,6 @@ public class HBaseBridge {
|
|||
urls = new String[] { DEFAULT_ATLAS_URL };
|
||||
}
|
||||
|
||||
final AtlasClientV2 atlasClientV2;
|
||||
|
||||
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
|
||||
String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
|
||||
|
|
@ -195,6 +194,10 @@ public class HBaseBridge {
|
|||
System.out.println("ImportHBaseEntities failed. Please check the log file for the detailed error message");
|
||||
|
||||
LOG.error("ImportHBaseEntities failed", e);
|
||||
}finally {
|
||||
if(atlasClientV2!=null) {
|
||||
atlasClientV2.close();
|
||||
}
|
||||
}
|
||||
|
||||
System.exit(exitCode);
|
||||
|
|
|
|||
|
|
@ -103,6 +103,7 @@ public class HiveMetaStoreBridge {
|
|||
|
||||
public static void main(String[] args) {
|
||||
int exitCode = EXIT_CODE_FAILED;
|
||||
AtlasClientV2 atlasClientV2 = null;
|
||||
|
||||
try {
|
||||
Options options = new Options();
|
||||
|
|
@ -123,7 +124,6 @@ public class HiveMetaStoreBridge {
|
|||
atlasEndpoint = new String[] { DEFAULT_ATLAS_URL };
|
||||
}
|
||||
|
||||
final AtlasClientV2 atlasClientV2;
|
||||
|
||||
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
|
||||
String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
|
||||
|
|
@ -174,6 +174,10 @@ public class HiveMetaStoreBridge {
|
|||
printUsage();
|
||||
} catch(Exception e) {
|
||||
LOG.error("Import failed", e);
|
||||
} finally {
|
||||
if( atlasClientV2 !=null) {
|
||||
atlasClientV2.close();
|
||||
}
|
||||
}
|
||||
|
||||
System.exit(exitCode);
|
||||
|
|
|
|||
|
|
@ -88,6 +88,7 @@ public class KafkaBridge {
|
|||
|
||||
public static void main(String[] args) {
|
||||
int exitCode = EXIT_CODE_FAILED;
|
||||
AtlasClientV2 atlasClientV2 = null;
|
||||
|
||||
try {
|
||||
Options options = new Options();
|
||||
|
|
@ -105,7 +106,6 @@ public class KafkaBridge {
|
|||
urls = new String[] { DEFAULT_ATLAS_URL };
|
||||
}
|
||||
|
||||
final AtlasClientV2 atlasClientV2;
|
||||
|
||||
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
|
||||
String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput();
|
||||
|
|
@ -148,6 +148,10 @@ public class KafkaBridge {
|
|||
System.out.println("ImportKafkaEntities failed. Please check the log file for the detailed error message");
|
||||
e.printStackTrace();
|
||||
LOG.error("ImportKafkaEntities failed", e);
|
||||
} finally {
|
||||
if (atlasClientV2 != null) {
|
||||
atlasClientV2.close();
|
||||
}
|
||||
}
|
||||
|
||||
System.exit(exitCode);
|
||||
|
|
|
|||
|
|
@ -114,6 +114,8 @@ public abstract class AtlasBaseClient {
|
|||
private boolean retryEnabled = false;
|
||||
private Cookie cookie = null;
|
||||
|
||||
private SecureClientUtils clientUtils;
|
||||
|
||||
protected AtlasBaseClient() {
|
||||
}
|
||||
|
||||
|
|
@ -282,14 +284,15 @@ public abstract class AtlasBaseClient {
|
|||
}
|
||||
|
||||
final URLConnectionClientHandler handler;
|
||||
clientUtils = new SecureClientUtils();
|
||||
|
||||
boolean isKerberosEnabled = AuthenticationUtil.isKerberosAuthenticationEnabled(ugi);
|
||||
|
||||
if (isKerberosEnabled) {
|
||||
handler = SecureClientUtils.getClientConnectionHandler(config, configuration, doAsUser, ugi);
|
||||
handler = clientUtils.getClientConnectionHandler(config, configuration, doAsUser, ugi);
|
||||
} else {
|
||||
if (configuration.getBoolean(TLS_ENABLED, false)) {
|
||||
handler = SecureClientUtils.getUrlConnectionClientHandler();
|
||||
handler = clientUtils.getUrlConnectionClientHandler();
|
||||
} else {
|
||||
handler = new URLConnectionClientHandler();
|
||||
}
|
||||
|
|
@ -300,6 +303,12 @@ public abstract class AtlasBaseClient {
|
|||
return client;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
if (clientUtils != null) {
|
||||
clientUtils.destroyFactory();
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected String determineActiveServiceURL(String[] baseUrls, Client client) {
|
||||
if (baseUrls.length == 0) {
|
||||
|
|
|
|||
|
|
@ -57,9 +57,10 @@ public class SecureClientUtils {
|
|||
|
||||
public final static int DEFAULT_SOCKET_TIMEOUT_IN_MSECS = 1 * 60 * 1000; // 1 minute
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SecureClientUtils.class);
|
||||
private SSLFactory factory = null;
|
||||
|
||||
|
||||
public static URLConnectionClientHandler getClientConnectionHandler(DefaultClientConfig config,
|
||||
public URLConnectionClientHandler getClientConnectionHandler(DefaultClientConfig config,
|
||||
org.apache.commons.configuration.Configuration clientConfig, String doAsUser,
|
||||
final UserGroupInformation ugi) {
|
||||
config.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true);
|
||||
|
|
@ -125,7 +126,7 @@ public class SecureClientUtils {
|
|||
}
|
||||
};
|
||||
|
||||
private static ConnectionConfigurator newConnConfigurator(Configuration conf) {
|
||||
private ConnectionConfigurator newConnConfigurator(Configuration conf) {
|
||||
try {
|
||||
return newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT_IN_MSECS, conf);
|
||||
} catch (Exception e) {
|
||||
|
|
@ -134,14 +135,12 @@ public class SecureClientUtils {
|
|||
}
|
||||
}
|
||||
|
||||
private static ConnectionConfigurator newSslConnConfigurator(final int timeout, Configuration conf)
|
||||
private ConnectionConfigurator newSslConnConfigurator(final int timeout, Configuration conf)
|
||||
throws IOException, GeneralSecurityException {
|
||||
final SSLFactory factory;
|
||||
final SSLSocketFactory sf;
|
||||
final HostnameVerifier hv;
|
||||
|
||||
factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
|
||||
factory.init();
|
||||
factory = getSSLFactory(conf);
|
||||
sf = factory.createSSLSocketFactory();
|
||||
hv = factory.getHostnameVerifier();
|
||||
|
||||
|
|
@ -159,6 +158,22 @@ public class SecureClientUtils {
|
|||
};
|
||||
}
|
||||
|
||||
public SSLFactory getSSLFactory(Configuration conf) throws IOException, GeneralSecurityException {
|
||||
if (factory == null) {
|
||||
factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
|
||||
factory.init();
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
public void destroyFactory() {
|
||||
if (factory != null) {
|
||||
factory.destroy();
|
||||
factory = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static void setTimeouts(URLConnection connection, int socketTimeout) {
|
||||
connection.setConnectTimeout(socketTimeout);
|
||||
connection.setReadTimeout(socketTimeout);
|
||||
|
|
@ -210,7 +225,7 @@ public class SecureClientUtils {
|
|||
}
|
||||
}
|
||||
|
||||
public static URLConnectionClientHandler getUrlConnectionClientHandler() {
|
||||
public URLConnectionClientHandler getUrlConnectionClientHandler() {
|
||||
return new URLConnectionClientHandler(new HttpURLConnectionFactory() {
|
||||
@Override
|
||||
public HttpURLConnection getHttpURLConnection(URL url)
|
||||
|
|
@ -230,8 +245,7 @@ public class SecureClientUtils {
|
|||
UserGroupInformation.setConfiguration(conf);
|
||||
|
||||
HttpsURLConnection c = (HttpsURLConnection) connection;
|
||||
factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
|
||||
factory.init();
|
||||
factory = getSSLFactory(conf);
|
||||
sf = factory.createSSLSocketFactory();
|
||||
hv = factory.getHostnameVerifier();
|
||||
c.setSSLSocketFactory(sf);
|
||||
|
|
|
|||
|
|
@ -77,22 +77,28 @@ public class QuickStart {
|
|||
@VisibleForTesting
|
||||
static void runQuickstart(String[] args, String[] basicAuthUsernamePassword) throws Exception {
|
||||
String[] urls = getServerUrl(args);
|
||||
QuickStart quickStart;
|
||||
QuickStart quickStart = null;
|
||||
|
||||
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
|
||||
quickStart = new QuickStart(urls, basicAuthUsernamePassword);
|
||||
} else {
|
||||
quickStart = new QuickStart(urls);
|
||||
try {
|
||||
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
|
||||
quickStart = new QuickStart(urls, basicAuthUsernamePassword);
|
||||
} else {
|
||||
quickStart = new QuickStart(urls);
|
||||
}
|
||||
|
||||
// Shows how to create types in Atlas for your meta model
|
||||
quickStart.createTypes();
|
||||
|
||||
// Shows how to create entities (instances) for the added types in Atlas
|
||||
quickStart.createEntities();
|
||||
|
||||
// Shows some search queries using DSL based on types
|
||||
quickStart.search();
|
||||
} finally {
|
||||
if(quickStart!=null) {
|
||||
quickStart.closeConnection();
|
||||
}
|
||||
}
|
||||
|
||||
// Shows how to create types in Atlas for your meta model
|
||||
quickStart.createTypes();
|
||||
|
||||
// Shows how to create entities (instances) for the added types in Atlas
|
||||
quickStart.createEntities();
|
||||
|
||||
// Shows some search queries using DSL based on types
|
||||
quickStart.search();
|
||||
}
|
||||
|
||||
static String[] getServerUrl(String[] args) throws AtlasException {
|
||||
|
|
@ -492,4 +498,10 @@ public class QuickStart {
|
|||
throw new AtlasBaseException(AtlasErrorCode.QUICK_START, e, "one or more dsl queries failed");
|
||||
}
|
||||
}
|
||||
|
||||
private void closeConnection() {
|
||||
if (metadataServiceClient != null) {
|
||||
metadataServiceClient.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -151,25 +151,30 @@ public class QuickStartV2 {
|
|||
static void runQuickstart(String[] args, String[] basicAuthUsernamePassword) throws Exception {
|
||||
String[] urls = getServerUrl(args);
|
||||
|
||||
QuickStartV2 quickStartV2;
|
||||
QuickStartV2 quickStartV2 = null;
|
||||
try {
|
||||
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
|
||||
quickStartV2 = new QuickStartV2(urls, basicAuthUsernamePassword);
|
||||
} else {
|
||||
quickStartV2 = new QuickStartV2(urls);
|
||||
}
|
||||
|
||||
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
|
||||
quickStartV2 = new QuickStartV2(urls, basicAuthUsernamePassword);
|
||||
} else {
|
||||
quickStartV2 = new QuickStartV2(urls);
|
||||
// Shows how to create v2 types in Atlas for your meta model
|
||||
quickStartV2.createTypes();
|
||||
|
||||
// Shows how to create v2 entities (instances) for the added types in Atlas
|
||||
quickStartV2.createEntities();
|
||||
|
||||
// Shows some search queries using DSL based on types
|
||||
quickStartV2.search();
|
||||
|
||||
// Shows some lineage information on entity
|
||||
quickStartV2.lineage();
|
||||
} finally {
|
||||
if (quickStartV2!= null) {
|
||||
quickStartV2.closeConnection();
|
||||
}
|
||||
}
|
||||
|
||||
// Shows how to create v2 types in Atlas for your meta model
|
||||
quickStartV2.createTypes();
|
||||
|
||||
// Shows how to create v2 entities (instances) for the added types in Atlas
|
||||
quickStartV2.createEntities();
|
||||
|
||||
// Shows some search queries using DSL based on types
|
||||
quickStartV2.search();
|
||||
|
||||
// Shows some lineage information on entity
|
||||
quickStartV2.lineage();
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -647,4 +652,10 @@ public class QuickStartV2 {
|
|||
|
||||
return tableEntity.getGuid();
|
||||
}
|
||||
|
||||
private void closeConnection() {
|
||||
if (atlasClientV2 != null) {
|
||||
atlasClientV2.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue