ATLAS-4788 : Kafka password is in clear text in application.properties
Signed-off-by: Pinal Shah <pinal.shah@freestoneinfotech.com>
This commit is contained in:
parent
3574a60fd2
commit
14adbad94a
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.atlas.utils;
|
||||
|
||||
import org.apache.atlas.ApplicationProperties;
|
||||
import org.apache.atlas.security.SecurityUtil;
|
||||
import org.apache.commons.configuration.Configuration;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
|
@ -43,6 +44,7 @@ import java.util.Map;
|
|||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.stream.Collectors;
|
||||
import static org.apache.atlas.security.SecurityProperties.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH;
|
||||
|
||||
public class KafkaUtils implements AutoCloseable {
|
||||
|
||||
|
|
@ -62,6 +64,9 @@ public class KafkaUtils implements AutoCloseable {
|
|||
public static final String ATLAS_KAFKA_PROPERTY_PREFIX = "atlas.kafka";
|
||||
public static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
|
||||
|
||||
public static final String JAAS_PASSWORD_SUFFIX = "password";
|
||||
private static final String JAAS_MASK_PASSWORD = "********";
|
||||
|
||||
final protected Properties kafkaConfiguration;
|
||||
final protected AdminClient adminClient;
|
||||
final protected boolean importInternalTopics;
|
||||
|
|
@ -254,6 +259,7 @@ public class KafkaUtils implements AutoCloseable {
|
|||
|
||||
String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + ".";
|
||||
String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP;
|
||||
String passwordOptionKey = optionPrefix + JAAS_PASSWORD_SUFFIX;
|
||||
int optionPrefixLen = optionPrefix.length();
|
||||
StringBuffer optionStringBuffer = new StringBuffer();
|
||||
|
||||
|
|
@ -271,7 +277,16 @@ public class KafkaUtils implements AutoCloseable {
|
|||
} catch (IOException e) {
|
||||
LOG.warn("Failed to build serverPrincipal. Using provided value:[{}]", optionVal);
|
||||
}
|
||||
|
||||
if (key.equalsIgnoreCase(passwordOptionKey)) {
|
||||
String jaasKafkaClientConfigurationProperty = "atlas.jaas.KafkaClient.option.password";
|
||||
if (JAAS_MASK_PASSWORD.equals(configuration.getString(jaasKafkaClientConfigurationProperty))) {
|
||||
try {
|
||||
optionVal = SecurityUtil.getPassword(configuration, jaasKafkaClientConfigurationProperty, HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error in getting secure password ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
optionVal = surroundWithQuotes(optionVal);
|
||||
|
||||
optionStringBuffer.append(String.format(" %s=%s", key.substring(optionPrefixLen), optionVal));
|
||||
|
|
|
|||
|
|
@ -20,13 +20,20 @@ package org.apache.atlas.utils;
|
|||
|
||||
import org.apache.commons.configuration.Configuration;
|
||||
import org.apache.commons.configuration.PropertiesConfiguration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.alias.CredentialProvider;
|
||||
import org.apache.hadoop.security.alias.CredentialProviderFactory;
|
||||
import org.apache.hadoop.security.alias.JavaKeyStoreProvider;
|
||||
import org.apache.kafka.common.config.SaslConfigs;
|
||||
import org.apache.kafka.common.config.types.Password;
|
||||
import org.apache.kafka.common.security.JaasContext;
|
||||
import org.mockito.MockedStatic;
|
||||
import org.mockito.Mockito;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
|
@ -37,6 +44,11 @@ import static org.testng.Assert.fail;
|
|||
|
||||
public class KafkaUtilsTest {
|
||||
|
||||
protected Path jksPath;
|
||||
protected String providerUrl;
|
||||
|
||||
protected static final String JAAS_MASKED_PASSWORD = "keypass";
|
||||
|
||||
@Test
|
||||
public void testSetKafkaJAASPropertiesForAllProperValues() {
|
||||
Properties properties = new Properties();
|
||||
|
|
@ -262,6 +274,94 @@ public class KafkaUtilsTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetKafkaJAASPropertiesForClearTextPassword() throws Exception {
|
||||
Properties properties = new Properties();
|
||||
Configuration configuration = new PropertiesConfiguration();
|
||||
setupCredentials();
|
||||
final String loginModuleName = "org.apache.kafka.common.security.scram.ScramLoginModule";
|
||||
final String loginModuleControlFlag = "required";
|
||||
final String optionUseKeyTab = "false";
|
||||
final String optionStoreKey = "false";
|
||||
final String optionServiceName = "kafka";
|
||||
final String optionTokenAuth = "true";
|
||||
final String optionUsername = "30CQ4q1hQMy0dB6X0eXfxQ";
|
||||
final String optionPassword = "admin123";
|
||||
|
||||
configuration.setProperty("atlas.kafka.bootstrap.servers", "localhost:9100");
|
||||
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
|
||||
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
|
||||
configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
|
||||
configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
|
||||
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName", optionServiceName);
|
||||
configuration.setProperty("atlas.jaas.KafkaClient.option.tokenauth", optionTokenAuth);
|
||||
configuration.setProperty("atlas.jaas.KafkaClient.option.username", optionUsername);
|
||||
configuration.setProperty("atlas.jaas.KafkaClient.option.password", optionPassword);
|
||||
configuration.setProperty("hadoop.security.credential.provider.path", providerUrl);
|
||||
|
||||
try (MockedStatic mockedKafkaUtilsClass = Mockito.mockStatic(KafkaUtils.class)) {
|
||||
mockedKafkaUtilsClass.when(() -> KafkaUtils.surroundWithQuotes(Mockito.anyString())).thenCallRealMethod();
|
||||
mockedKafkaUtilsClass.when(() -> KafkaUtils.setKafkaJAASProperties(configuration, properties)).thenCallRealMethod();
|
||||
|
||||
KafkaUtils.setKafkaJAASProperties(configuration, properties);
|
||||
|
||||
String newPropertyValue = properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
|
||||
assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
|
||||
assertTrue(newPropertyValue.contains(loginModuleControlFlag), "loginModuleControlFlag not present in new property");
|
||||
assertTrue(newPropertyValue.contains("useKeyTab=\"" + optionUseKeyTab + "\""), "useKeyTab not present in new property or value doesn't match");
|
||||
assertTrue(newPropertyValue.contains("storeKey=\"" + optionStoreKey + "\""), "storeKey not present in new property or value doesn't match");
|
||||
assertTrue(newPropertyValue.contains("serviceName=\"" + optionServiceName + "\""), "serviceName not present in new property or value doesn't match");
|
||||
assertTrue(newPropertyValue.contains("tokenauth=\"" + optionTokenAuth + "\""), "tokenauth not pres////.ent in new property or value doesn't match");
|
||||
assertTrue(newPropertyValue.contains("username=\"" + optionUsername + "\""), "username not present in new property or value doesn't match");
|
||||
assertTrue(newPropertyValue.contains("password=\"" + optionPassword + "\""), "password not present in new property or value doesn't match");
|
||||
assertJaaSConfigLoadable(newPropertyValue);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetKafkaJAASPropertiesForPasswordEncryption() throws Exception {
|
||||
Properties properties = new Properties();
|
||||
Configuration configuration = new PropertiesConfiguration();
|
||||
setupCredentials();
|
||||
final String loginModuleName = "org.apache.kafka.common.security.scram.ScramLoginModule";
|
||||
final String loginModuleControlFlag = "required";
|
||||
final String optionUseKeyTab = "false";
|
||||
final String optionStoreKey = "false";
|
||||
final String optionServiceName = "kafka";
|
||||
final String optionTokenAuth = "true";
|
||||
final String optionUsername = "30CQ4q1hQMy0dB6X0eXfxQ";
|
||||
final String optionPassword = "********";
|
||||
|
||||
configuration.setProperty("atlas.kafka.bootstrap.servers", "localhost:9100");
|
||||
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
|
||||
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
|
||||
configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
|
||||
configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
|
||||
configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName", optionServiceName);
|
||||
configuration.setProperty("atlas.jaas.KafkaClient.option.tokenauth", optionTokenAuth);
|
||||
configuration.setProperty("atlas.jaas.KafkaClient.option.username", optionUsername);
|
||||
configuration.setProperty("atlas.jaas.KafkaClient.option.password", optionPassword);
|
||||
configuration.setProperty("hadoop.security.credential.provider.path", providerUrl);
|
||||
|
||||
try (MockedStatic mockedKafkaUtilsClass = Mockito.mockStatic(KafkaUtils.class)) {
|
||||
mockedKafkaUtilsClass.when(() -> KafkaUtils.surroundWithQuotes(Mockito.anyString())).thenCallRealMethod();
|
||||
mockedKafkaUtilsClass.when(() -> KafkaUtils.setKafkaJAASProperties(configuration, properties)).thenCallRealMethod();
|
||||
|
||||
KafkaUtils.setKafkaJAASProperties(configuration, properties);
|
||||
|
||||
String newPropertyValue = properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
|
||||
assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
|
||||
assertTrue(newPropertyValue.contains(loginModuleControlFlag), "loginModuleControlFlag not present in new property");
|
||||
assertTrue(newPropertyValue.contains("useKeyTab=\"" + optionUseKeyTab + "\""), "useKeyTab not present in new property or value doesn't match");
|
||||
assertTrue(newPropertyValue.contains("storeKey=\"" + optionStoreKey + "\""), "storeKey not present in new property or value doesn't match");
|
||||
assertTrue(newPropertyValue.contains("serviceName=\"" + optionServiceName + "\""), "serviceName not present in new property or value doesn't match");
|
||||
assertTrue(newPropertyValue.contains("tokenauth=\"" + optionTokenAuth + "\""), "tokenauth not pres////.ent in new property or value doesn't match");
|
||||
assertTrue(newPropertyValue.contains("username=\"" + optionUsername + "\""), "username not present in new property or value doesn't match");
|
||||
assertTrue(newPropertyValue.contains("password=\"" + JAAS_MASKED_PASSWORD + "\""), "password not present in new property or value doesn't match");
|
||||
assertJaaSConfigLoadable(newPropertyValue);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertJaaSConfigLoadable(String jaasConfig) {
|
||||
// Ensure that JaaS config can be loaded
|
||||
Map<String, Password> jaasConfigs = new HashMap<>();
|
||||
|
|
@ -273,6 +373,27 @@ public class KafkaUtilsTest {
|
|||
}
|
||||
}
|
||||
|
||||
protected void setupCredentials() throws Exception {
|
||||
jksPath = new Path(Files.createTempDirectory("tempproviders").toString(), "kafka.jceks");
|
||||
providerUrl = JavaKeyStoreProvider.SCHEME_NAME + "://file/" + jksPath.toUri();
|
||||
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(false);
|
||||
|
||||
conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, providerUrl);
|
||||
|
||||
CredentialProvider provider = CredentialProviderFactory.getProviders(conf).get(0);
|
||||
|
||||
// create new aliases
|
||||
try {
|
||||
provider.createCredentialEntry("atlas.jaas.KafkaClient.option.password", JAAS_MASKED_PASSWORD.toCharArray());
|
||||
|
||||
|
||||
// write out so that it can be found in checks
|
||||
provider.flush();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue