ATLAS-4554: updated monitoring to support elasticsearch index backend

Signed-off-by: Madhan Neethiraj <madhan@apache.org>
This commit is contained in:
xingyu_zha 2022-02-14 17:56:13 +08:00 committed by Madhan Neethiraj
parent e49eb7ba24
commit 0847d6c371
5 changed files with 1355 additions and 10 deletions

View File

@ -37,6 +37,7 @@ import org.janusgraph.core.JanusGraphFactory;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.diskstorage.StandardIndexProvider;
import org.janusgraph.diskstorage.StandardStoreManager;
import org.janusgraph.diskstorage.es.ElasticSearch7Index;
import org.janusgraph.diskstorage.solr.Solr6Index;
import org.janusgraph.graphdb.database.serialize.attribute.SerializableSerializer;
import org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry;
@ -122,6 +123,8 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
addHBase2Support();
addSolr6Index();
addElasticSearch7Index();
}
private static void addHBase2Support() {
@ -164,6 +167,27 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
}
}
private static void addElasticSearch7Index() {
try {
Field field = StandardIndexProvider.class.getDeclaredField("ALL_MANAGER_CLASSES");
field.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
Map<String, String> customMap = new HashMap<>(StandardIndexProvider.getAllProviderClasses());
customMap.put("elasticsearch", ElasticSearch7Index.class.getName());
ImmutableMap<String, String> immap = ImmutableMap.copyOf(customMap);
field.set(null, immap);
LOG.debug("Injected es7 index - {}", ElasticSearch7Index.class.getName());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static JanusGraph getGraphInstance() {
if (graphInstance == null) {
synchronized (AtlasJanusGraphDatabase.class) {

View File

@ -18,6 +18,7 @@
package org.apache.atlas.repository.graphdb.janus;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasAggregationEntry;
import org.apache.atlas.repository.Constants;
@ -42,6 +43,8 @@ import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.TermsResponse;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.NamedList;
import org.janusgraph.diskstorage.es.ElasticSearch7Index;
import org.janusgraph.diskstorage.es.ElasticSearchClient;
import org.janusgraph.diskstorage.solr.Solr6Index;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -70,7 +73,7 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
private static final String TERMS_FIELD = "terms.fl";
private static final int SOLR_HEALTHY_STATUS = 0;
private static final long SOLR_STATUS_LOG_FREQUENCY_MS = 60000;//Prints SOLR DOWN status for every 1 min
private static long prevSolrHealthCheckTime;
private static long prevIdxHealthCheckTime;
private final Configuration configuration;
@ -83,21 +86,26 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
public boolean isHealthy() {
boolean isHealthy = false;
long currentTime = System.currentTimeMillis();
String idxBackEnd = configuration.getString(ApplicationProperties.INDEX_BACKEND_CONF);
try {
if (isSolrHealthy()) {
isHealthy = true;
if (ApplicationProperties.INDEX_BACKEND_SOLR.equals(idxBackEnd)) {
isHealthy = isSolrHealthy();
} else if (ApplicationProperties.INDEX_BACKEND_ELASTICSEARCH.equals(idxBackEnd)) {
isHealthy = isElasticsearchHealthy();
}
LOG.info("indexBackEnd={}; isHealthy={}", idxBackEnd, isHealthy);
} catch (Exception exception) {
if (LOG.isDebugEnabled()) {
LOG.error("Error: isHealthy", exception);
}
}
if (!isHealthy && (prevSolrHealthCheckTime == 0 || currentTime - prevSolrHealthCheckTime > SOLR_STATUS_LOG_FREQUENCY_MS)) {
LOG.info("Solr Health: Unhealthy!");
if (!isHealthy && (prevIdxHealthCheckTime == 0 || currentTime - prevIdxHealthCheckTime > SOLR_STATUS_LOG_FREQUENCY_MS)) {
LOG.info("Backend Health: Unhealthy!");
prevSolrHealthCheckTime = currentTime;
prevIdxHealthCheckTime = currentTime;
}
return isHealthy;
@ -379,6 +387,13 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
return client != null && client.ping(Constants.VERTEX_INDEX).getStatus() == SOLR_HEALTHY_STATUS;
}
private boolean isElasticsearchHealthy() throws IOException {
ElasticSearchClient client = ElasticSearch7Index.getElasticSearchClient();
String janusVertexIndex = ApplicationProperties.DEFAULT_INDEX_NAME + "_" + Constants.VERTEX_INDEX;
return client != null && client.indexExists(janusVertexIndex);
}
private void graphManagementCommit(AtlasGraphManagement management) {
try {
management.commit();

View File

@ -50,14 +50,17 @@ public final class ApplicationProperties extends PropertiesConfiguration {
public static final String INDEX_BACKEND_CONF = "atlas.graph.index.search.backend";
public static final String INDEX_MAP_NAME_CONF = "atlas.graph.index.search.map-name";
public static final String SOLR_WAIT_SEARCHER_CONF = "atlas.graph.index.search.solr.wait-searcher";
public static final String ELASTICSEARCH_INDEX_NAME_CONF = "atlas.graph.index.search.elasticsearch.index-name";
public static final String INDEX_RECOVERY_CONF = "atlas.index.recovery.enable";
public static final String ENABLE_FULLTEXT_SEARCH_CONF = "atlas.search.fulltext.enable";
public static final String ENABLE_FREETEXT_SEARCH_CONF = "atlas.search.freetext.enable";
public static final String ATLAS_RUN_MODE = "atlas.run.mode";
public static final String GRAPHBD_BACKEND_JANUS = "janus";
public static final String DEFAULT_INDEX_NAME = "janusgraph";
public static final String STORAGE_BACKEND_HBASE = "hbase";
public static final String STORAGE_BACKEND_HBASE2 = "hbase2";
public static final String INDEX_BACKEND_SOLR = "solr";
public static final String INDEX_BACKEND_ELASTICSEARCH = "elasticsearch";
public static final String LDAP_TYPE = "atlas.authentication.method.ldap.type";
public static final String LDAP = "LDAP";
public static final String AD = "AD";
@ -355,6 +358,10 @@ public final class ApplicationProperties extends PropertiesConfiguration {
addPropertyDirect(INDEX_MAP_NAME_CONF, DEFAULT_INDEX_MAP_NAME);
LOG.info("Setting index.search.map-name property '" + DEFAULT_INDEX_MAP_NAME + "'");
}
} else if (indexBackend.equalsIgnoreCase(INDEX_BACKEND_ELASTICSEARCH)){
addPropertyDirect(ELASTICSEARCH_INDEX_NAME_CONF, DEFAULT_INDEX_NAME);
LOG.info("Setting elasticsearch.index-name property '" + DEFAULT_INDEX_NAME + "'");
}
// setting value for 'atlas.graph.index.search.max-result-set-size' (default = 500000)

View File

@ -166,13 +166,13 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
while (shouldRun.get()) {
try {
boolean solrHealthy = isSolrHealthy();
boolean isIdxHealthy = isIndexBackendHealthy();
if (this.txRecoveryObject == null && solrHealthy) {
if (this.txRecoveryObject == null && isIdxHealthy) {
startMonitoring();
}
if (this.txRecoveryObject != null && !solrHealthy) {
if (this.txRecoveryObject != null && !isIdxHealthy) {
stopMonitoring();
}
} catch (Exception e) {
@ -197,7 +197,7 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
}
}
private boolean isSolrHealthy() throws AtlasException, InterruptedException {
private boolean isIndexBackendHealthy() throws AtlasException, InterruptedException {
Thread.sleep(indexStatusCheckRetryMillis);
return this.graph.getGraphIndexClient().isHealthy();