ATLAS-4408: Dynamic handling of failure in updating index

This commit is contained in:
Radhika Kundam 2021-10-10 21:50:29 -07:00 committed by Sarath Subramanian
parent 11627e33d3
commit 261331bbd1
11 changed files with 575 additions and 28 deletions

View File

@ -228,6 +228,14 @@ public final class Constants {
public static final String TASK_START_TIME = encodePropertyKey(TASK_PREFIX + "startTime");
public static final String TASK_END_TIME = encodePropertyKey(TASK_PREFIX + "endTime");
/**
* Index Recovery vertex property keys.
*/
public static final String INDEX_RECOVERY_PREFIX = INTERNAL_PROPERTY_KEY_PREFIX + "idxRecovery_";
public static final String PROPERTY_KEY_INDEX_RECOVERY_NAME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "name");
public static final String PROPERTY_KEY_INDEX_RECOVERY_START_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "startTime");
public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime");
/*
* All supported file-format extensions for Bulk Imports through file upload
*/

View File

@ -54,4 +54,10 @@ public interface AtlasGraphIndexClient {
* @param suggestionProperties the list of suggestion properties.
*/
void applySuggestionFields(String collectionName, List<String> suggestionProperties);
/**
* Returns status of index client
* @return returns true if index client is active
*/
boolean isHealthy();
}

View File

@ -180,4 +180,23 @@ public interface AtlasGraphManagement {
* @throws Exception
*/
void reindex(String indexName, List<AtlasElement> elements) throws Exception;
/**
* Starts recovering indices from the specified recovery time and returns TransactionRecovery
* @param startTime
* @return transactionRecoveryObject
*/
Object startIndexRecovery(long startTime);
/**
* Stop index recovery.
* @param txRecoveryObject
*/
void stopIndexRecovery(Object txRecoveryObject);
/**
* Print index recovery stats.
* @param txRecoveryObject
*/
void printIndexRecoveryStats(Object txRecoveryObject);
}

View File

@ -28,7 +28,6 @@ import org.apache.atlas.repository.graphdb.janus.serializer.BigDecimalSerializer
import org.apache.atlas.repository.graphdb.janus.serializer.BigIntegerSerializer;
import org.apache.atlas.repository.graphdb.janus.serializer.TypeCategorySerializer;
import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
import org.janusgraph.core.JanusGraph;
@ -48,28 +47,36 @@ import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY;
import static org.apache.atlas.ApplicationProperties.INDEX_RECOVERY_CONF;
/**
* Default implementation for Graph Provider that doles out JanusGraph.
*/
public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, AtlasJanusEdge> {
private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphDatabase.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("AtlasJanusGraphDatabase");
private static final String OLDER_STORAGE_EXCEPTION = "Storage version is incompatible with current client";
/**
* Constant for the configuration property that indicates the prefix.
*/
public static final String GRAPH_PREFIX = "atlas.graph";
public static final String INDEX_BACKEND_CONF = "index.search.backend";
public static final String SOLR_ZOOKEEPER_URL = "atlas.graph.index.search.solr.zookeeper-url";
public static final String SOLR_ZOOKEEPER_URLS = "atlas.graph.index.search.solr.zookeeper-urls";
public static final String INDEX_BACKEND_LUCENE = "lucene";
public static final String INDEX_BACKEND_ES = "elasticsearch";
public static final String GRAPH_PREFIX = "atlas.graph";
public static final String INDEX_BACKEND_CONF = "index.search.backend";
public static final String SOLR_ZOOKEEPER_URL = "atlas.graph.index.search.solr.zookeeper-url";
public static final String SOLR_ZOOKEEPER_URLS = "atlas.graph.index.search.solr.zookeeper-urls";
public static final String INDEX_BACKEND_LUCENE = "lucene";
public static final String INDEX_BACKEND_ES = "elasticsearch";
public static final String GRAPH_TX_LOG_CONF = "tx.log-tx";
public static final String GRAPH_TX_LOG_VERBOSE_CONF = "tx.recovery.verbose";
public static final String SOLR_INDEX_TX_LOG_TTL_CONF = "write.ahead.log.ttl.in.hours";
public static final String GRAPH_TX_LOG_TTL_CONF = "log.tx.ttl";
public static final long DEFAULT_GRAPH_TX_LOG_TTL = 72; //Hrs
private static volatile AtlasJanusGraph atlasGraphInstance = null;
private static volatile JanusGraph graphInstance;
@ -166,8 +173,11 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
throw new RuntimeException(e);
}
graphInstance = initJanusGraph(config);
configureTxLogBasedIndexRecovery();
graphInstance = initJanusGraph(config);
atlasGraphInstance = new AtlasJanusGraph();
validateIndexBackend(config);
}
@ -192,6 +202,52 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
}
}
public static void configureTxLogBasedIndexRecovery() {
try {
boolean recoveryEnabled = ApplicationProperties.get().getBoolean(INDEX_RECOVERY_CONF, DEFAULT_INDEX_RECOVERY);
long ttl = ApplicationProperties.get().getLong(SOLR_INDEX_TX_LOG_TTL_CONF, DEFAULT_GRAPH_TX_LOG_TTL);
Duration txLogTtlSecs = Duration.ofSeconds(Duration.ofHours(ttl).getSeconds());
Map<String, Object> properties = new HashMap<String, Object>() {{
put(GRAPH_TX_LOG_CONF, recoveryEnabled);
put(GRAPH_TX_LOG_VERBOSE_CONF, recoveryEnabled);
put(GRAPH_TX_LOG_TTL_CONF, txLogTtlSecs);
}};
updateGlobalConfiguration(properties);
LOG.info("Tx Log-based Index Recovery: {}!", recoveryEnabled ? "Enabled" : "Disabled");
} catch (Exception e) {
LOG.error("Error: Failed!", e);
}
}
private static void updateGlobalConfiguration(Map<String, Object> map) {
JanusGraph graph = null;
JanusGraphManagement managementSystem = null;
try {
graph = initJanusGraph(getConfiguration());
managementSystem = graph.openManagement();
for (Map.Entry<String, Object> entry : map.entrySet()) {
managementSystem.set(entry.getKey(), entry.getValue());
}
LOG.info("Global properties updated!: {}", map);
} catch (Exception ex) {
LOG.error("Error updating global configuration: {}", map, ex);
} finally {
if (managementSystem != null) {
managementSystem.commit();
}
if (graph != null) {
graph.close();
}
}
}
public static JanusGraph getBulkLoadingGraphInstance() {
try {
Configuration cfg = getConfiguration();

View File

@ -40,16 +40,22 @@ import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.response.FacetField;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.TermsResponse;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.NamedList;
import org.janusgraph.diskstorage.solr.Solr6Index;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import static org.apache.atlas.repository.Constants.FREETEXT_REQUEST_HANDLER;
import static org.apache.atlas.repository.Constants.VERTEX_INDEX;
@ -57,11 +63,15 @@ import static org.apache.atlas.repository.Constants.VERTEX_INDEX;
public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphIndexClient.class);
private static final FreqComparator FREQ_COMPARATOR = new FreqComparator();
private static final int DEFAULT_SUGGESTION_COUNT = 5;
private static final int MIN_FACET_COUNT_REQUIRED = 1;
private static final String TERMS_PREFIX = "terms.prefix";
private static final String TERMS_FIELD = "terms.fl";
private static final FreqComparator FREQ_COMPARATOR = new FreqComparator();
private static final int DEFAULT_SUGGESTION_COUNT = 5;
private static final int MIN_FACET_COUNT_REQUIRED = 1;
private static final String TERMS_PREFIX = "terms.prefix";
private static final String TERMS_FIELD = "terms.fl";
private static final int SOLR_HEALTHY_STATUS = 0;
private static final long SOLR_STATUS_LOG_FREQUENCY_MS = 60000;//Prints SOLR DOWN status for every 1 min
private static long prevSolrHealthCheckTime;
private final Configuration configuration;
@ -70,6 +80,29 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
this.configuration = configuration;
}
public boolean isHealthy() {
boolean isHealthy = false;
long currentTime = System.currentTimeMillis();
try {
if (isSolrHealthy()) {
isHealthy = true;
}
} catch (Exception exception) {
if (LOG.isDebugEnabled()) {
LOG.error("Error: isHealthy", exception);
}
}
if (!isHealthy && (prevSolrHealthCheckTime == 0 || currentTime - prevSolrHealthCheckTime > SOLR_STATUS_LOG_FREQUENCY_MS)) {
LOG.info("Solr Health: Unhealthy!");
prevSolrHealthCheckTime = currentTime;
}
return isHealthy;
}
@Override
public void applySearchWeight(String collectionName, Map<String, Integer> indexFieldName2SearchWeightMap) {
SolrClient solrClient = null;
@ -340,6 +373,12 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
return Collections.EMPTY_LIST;
}
private boolean isSolrHealthy() throws SolrServerException, IOException {
SolrClient client = Solr6Index.getSolrClient();
return client != null && client.ping(Constants.VERTEX_INDEX).getStatus() == SOLR_HEALTHY_STATUS;
}
private void graphManagementCommit(AtlasGraphManagement management) {
try {
management.commit();

View File

@ -19,37 +19,46 @@ package org.apache.atlas.repository.graphdb.janus;
import com.google.common.base.Preconditions;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasEdgeLabel;
import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.atlas.repository.graphdb.AtlasGraphIndex;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
import org.apache.commons.lang.StringUtils;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.janusgraph.core.Cardinality;
import org.janusgraph.core.EdgeLabel;
import org.janusgraph.core.JanusGraph;
import org.janusgraph.core.JanusGraphElement;
import org.janusgraph.core.JanusGraphFactory;
import org.janusgraph.core.PropertyKey;
import org.janusgraph.core.schema.*;
import org.janusgraph.core.log.TransactionRecovery;
import org.janusgraph.core.schema.ConsistencyModifier;
import org.janusgraph.core.schema.JanusGraphIndex;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.core.schema.JanusGraphManagement.IndexBuilder;
import org.janusgraph.core.schema.Mapping;
import org.janusgraph.core.schema.Parameter;
import org.janusgraph.core.schema.PropertyKeyMaker;
import org.janusgraph.diskstorage.BackendTransaction;
import org.janusgraph.diskstorage.indexing.IndexEntry;
import org.janusgraph.graphdb.database.IndexSerializer;
import org.janusgraph.graphdb.database.StandardJanusGraph;
import org.janusgraph.graphdb.database.management.ManagementSystem;
import org.janusgraph.graphdb.internal.Token;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasEdgeLabel;
import org.apache.atlas.repository.graphdb.AtlasGraphIndex;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
import org.apache.commons.lang.StringUtils;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.janusgraph.graphdb.log.StandardTransactionLogProcessor;
import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
import org.janusgraph.graphdb.types.IndexType;
import org.janusgraph.graphdb.types.MixedIndexType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -347,6 +356,62 @@ public class AtlasJanusGraphManagement implements AtlasGraphManagement {
}
}
@Override
public Object startIndexRecovery(long recoveryStartTime) {
Instant recoveryStartInstant = Instant.ofEpochMilli(recoveryStartTime);
JanusGraph janusGraph = this.graph.getGraph();
return JanusGraphFactory.startTransactionRecovery(janusGraph, recoveryStartInstant);
}
@Override
public void stopIndexRecovery(Object txRecoveryObject) {
if (txRecoveryObject == null) {
return;
}
try {
if (txRecoveryObject instanceof TransactionRecovery) {
TransactionRecovery txRecovery = (TransactionRecovery) txRecoveryObject;
StandardJanusGraph janusGraph = (StandardJanusGraph) this.graph.getGraph();
LOG.info("stopIndexRecovery: Index Client is unhealthy. Index recovery: Paused!");
janusGraph.getBackend().getSystemTxLog().close();
txRecovery.shutdown();
} else {
LOG.error("stopIndexRecovery({}): Invalid transaction recovery object!", txRecoveryObject);
}
} catch (Exception e) {
LOG.warn("stopIndexRecovery: Error while shutting down transaction recovery", e);
}
}
@Override
public void printIndexRecoveryStats(Object txRecoveryObject) {
if (txRecoveryObject == null) {
return;
}
try {
if (txRecoveryObject instanceof TransactionRecovery) {
StandardTransactionLogProcessor txRecovery = (StandardTransactionLogProcessor) txRecoveryObject;
long[] statistics = txRecovery.getStatistics();
if (statistics.length >= 2) {
LOG.info("Index Recovery: Stats: Success:{}: Failed: {}", statistics[0], statistics[1]);
} else {
LOG.info("Index Recovery: Stats: {}", statistics);
}
} else {
LOG.error("Transaction stats: Invalid transaction recovery object!: Unexpected type: {}: Details: {}", txRecoveryObject.getClass().toString(), txRecoveryObject);
}
} catch (Exception e) {
LOG.error("Error: Retrieving log transaction stats!", e);
}
}
private void reindexElement(ManagementSystem managementSystem, IndexSerializer indexSerializer, MixedIndexType indexType, List<AtlasElement> elements) throws Exception {
Map<String, Map<String, List<IndexEntry>>> documentsPerStore = new HashMap<>();
StandardJanusGraphTx tx = managementSystem.getWrappedTx();

View File

@ -50,6 +50,7 @@ public final class ApplicationProperties extends PropertiesConfiguration {
public static final String INDEX_BACKEND_CONF = "atlas.graph.index.search.backend";
public static final String INDEX_MAP_NAME_CONF = "atlas.graph.index.search.map-name";
public static final String SOLR_WAIT_SEARCHER_CONF = "atlas.graph.index.search.solr.wait-searcher";
public static final String INDEX_RECOVERY_CONF = "atlas.index.recovery.enable";
public static final String ENABLE_FULLTEXT_SEARCH_CONF = "atlas.search.fulltext.enable";
public static final String ENABLE_FREETEXT_SEARCH_CONF = "atlas.search.freetext.enable";
public static final String ATLAS_RUN_MODE = "atlas.run.mode";
@ -66,6 +67,7 @@ public final class ApplicationProperties extends PropertiesConfiguration {
public static final String DEFAULT_GRAPHDB_BACKEND = GRAPHBD_BACKEND_JANUS;
public static final boolean DEFAULT_SOLR_WAIT_SEARCHER = false;
public static final boolean DEFAULT_INDEX_MAP_NAME = false;
public static final boolean DEFAULT_INDEX_RECOVERY = true;
public static final AtlasRunMode DEFAULT_ATLAS_RUN_MODE = AtlasRunMode.PROD;
public static final String INDEX_SEARCH_MAX_RESULT_SET_SIZE = "atlas.graph.index.search.max-result-set-size";

View File

@ -353,6 +353,9 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
createCommonVertexIndex(management, TASK_CREATED_TIME, UniqueKind.NONE, Long.class, SINGLE, true, false);
createCommonVertexIndex(management, TASK_STATUS, UniqueKind.NONE, String.class, SINGLE, true, false);
// index recovery
createCommonVertexIndex(management, PROPERTY_KEY_INDEX_RECOVERY_NAME, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
// create vertex-centric index
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class, SINGLE);
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, Boolean.class, SINGLE);

View File

@ -0,0 +1,283 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graph;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.Iterator;
import java.util.TimeZone;
import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY;
import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_NAME;
import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME;
import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_START_TIME;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
@Component
@Order(8)
public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(IndexRecoveryService.class);
private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
private static final String INDEX_HEALTH_MONITOR_THREAD_NAME = "index-health-monitor";
private static final String SOLR_STATUS_CHECK_RETRY_INTERVAL = "atlas.graph.index.status.check.frequency";
private static final String SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME = "atlas.graph.index.recovery.start.time";
private static final long SOLR_STATUS_RETRY_DEFAULT_MS = 30000; // 30 secs default
private final Thread indexHealthMonitor;
private final RecoveryInfoManagement recoveryInfoManagement;
private Configuration configuration;
private boolean isIndexRecoveryEnabled;
@Inject
public IndexRecoveryService(Configuration config, AtlasGraph graph) {
this.configuration = config;
this.isIndexRecoveryEnabled = config.getBoolean(ApplicationProperties.INDEX_RECOVERY_CONF, DEFAULT_INDEX_RECOVERY);
long recoveryStartTimeFromConfig = getRecoveryStartTimeFromConfig(config);
long healthCheckFrequencyMillis = config.getLong(SOLR_STATUS_CHECK_RETRY_INTERVAL, SOLR_STATUS_RETRY_DEFAULT_MS);
this.recoveryInfoManagement = new RecoveryInfoManagement(graph);
RecoveryThread recoveryThread = new RecoveryThread(recoveryInfoManagement, graph, recoveryStartTimeFromConfig, healthCheckFrequencyMillis);
this.indexHealthMonitor = new Thread(recoveryThread, INDEX_HEALTH_MONITOR_THREAD_NAME);
}
private long getRecoveryStartTimeFromConfig(Configuration config) {
long ret = 0L;
try {
String time = config.getString(SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME);
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
ret = dateFormat.parse(time).toInstant().toEpochMilli();
} catch (Exception e) {
LOG.debug("Error fetching: {}", SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME);
}
return ret;
}
@Override
public void start() throws AtlasException {
if (configuration == null || !HAConfiguration.isHAEnabled(configuration)) {
LOG.info("==> IndexRecoveryService.start()");
startTxLogMonitoring();
LOG.info("<== IndexRecoveryService.start()");
}
}
@Override
public void stop() throws AtlasException {
try {
indexHealthMonitor.join();
} catch (InterruptedException e) {
LOG.error("indexHealthMonitor: Interrupted", e);
}
}
@Override
public void instanceIsActive() throws AtlasException {
LOG.info("==> IndexRecoveryService.instanceIsActive()");
startTxLogMonitoring();
LOG.info("<== IndexRecoveryService.instanceIsActive()");
}
@Override
public void instanceIsPassive() throws AtlasException {
LOG.info("IndexRecoveryService.instanceIsPassive(): no action needed.");
}
@Override
public int getHandlerOrder() {
return ActiveStateChangeHandler.HandlerOrder.INDEX_RECOVERY.getOrder();
}
private void startTxLogMonitoring() {
if (!isIndexRecoveryEnabled) {
LOG.warn("IndexRecoveryService: Recovery should be enabled.");
return;
}
indexHealthMonitor.start();
}
private static class RecoveryThread implements Runnable {
private final AtlasGraph graph;
private final RecoveryInfoManagement recoveryInfoManagement;
private long indexStatusCheckRetryMillis;
private Object txRecoveryObject;
private RecoveryThread(RecoveryInfoManagement recoveryInfoManagement, AtlasGraph graph, long startTimeFromConfig, long healthCheckFrequencyMillis) {
this.graph = graph;
this.recoveryInfoManagement = recoveryInfoManagement;
this.indexStatusCheckRetryMillis = healthCheckFrequencyMillis;
if (startTimeFromConfig > 0) {
this.recoveryInfoManagement.updateStartTime(startTimeFromConfig);
}
}
public void run() {
LOG.info("Index Health Monitor: Starting...");
while (true) {
try {
boolean solrHealthy = isSolrHealthy();
if (this.txRecoveryObject == null && solrHealthy) {
startMonitoring();
}
if (this.txRecoveryObject != null && !solrHealthy) {
stopMonitoring();
}
} catch (Exception e) {
LOG.error("Error: Index recovery monitoring!", e);
}
}
}
private boolean isSolrHealthy() throws AtlasException, InterruptedException {
Thread.sleep(indexStatusCheckRetryMillis);
return this.graph.getGraphIndexClient().isHealthy();
}
private void startMonitoring() {
Long startTime = null;
try {
startTime = recoveryInfoManagement.getStartTime();
txRecoveryObject = this.graph.getManagementSystem().startIndexRecovery(startTime);
printIndexRecoveryStats();
} catch (Exception e) {
LOG.error("Index Recovery: Start: Error!", e);
} finally {
LOG.info("Index Recovery: Started! Recovery time: {}", Instant.ofEpochMilli(startTime));
}
}
private void stopMonitoring() {
Instant newStartTime = Instant.now().minusMillis(indexStatusCheckRetryMillis);
try {
this.graph.getManagementSystem().stopIndexRecovery(txRecoveryObject);
recoveryInfoManagement.updateStartTime(newStartTime.toEpochMilli());
printIndexRecoveryStats();
} catch (Exception e) {
LOG.info("Index Recovery: Stopped! Error!", e);
} finally {
this.txRecoveryObject = null;
LOG.info("Index Recovery: Stopped! Recovery time: {}", newStartTime);
}
}
private void printIndexRecoveryStats() {
this.graph.getManagementSystem().printIndexRecoveryStats(txRecoveryObject);
}
}
@VisibleForTesting
static class RecoveryInfoManagement {
private static final String INDEX_RECOVERY_TYPE_NAME = "__solrIndexRecoveryInfo";
private final AtlasGraph graph;
public RecoveryInfoManagement(AtlasGraph graph) {
this.graph = graph;
}
public void updateStartTime(Long startTime) {
try {
Long prevStartTime = null;
AtlasVertex vertex = findVertex();
if (vertex == null) {
vertex = graph.addVertex();
} else {
prevStartTime = getStartTime(vertex);
}
setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME);
setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_START_TIME, startTime);
setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME, prevStartTime);
} catch (Exception ex) {
LOG.error("Error: Updating: {}!", ex);
} finally {
graph.commit();
}
}
public Long getStartTime() {
AtlasVertex vertex = findVertex();
return getStartTime(vertex);
}
private Long getStartTime(AtlasVertex vertex) {
if (vertex == null) {
LOG.warn("Vertex passed is NULL: Returned is 0");
return 0L;
}
Long startTime = 0L;
try {
startTime = vertex.getProperty(PROPERTY_KEY_INDEX_RECOVERY_START_TIME, Long.class);
} catch (Exception e) {
LOG.error("Error retrieving startTime", e);
}
return startTime;
}
private AtlasVertex findVertex() {
AtlasGraphQuery query = graph.query().has(PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME);
Iterator<AtlasVertex> results = query.vertices().iterator();
return results.hasNext() ? results.next() : null;
}
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graph;
import com.google.inject.Inject;
import org.apache.atlas.RequestContext;
import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.repository.AtlasTestBase;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
@Guice(modules = TestModules.TestOnlyModule.class)
public class RecoveryInfoManagementTest extends AtlasTestBase {
@Inject
private AtlasGraph atlasGraph;
@BeforeTest
public void setupTest() {
RequestContext.clear();
RequestContext.get().setUser(TestUtilsV2.TEST_USER, null);
}
@BeforeClass
public void initialize() throws Exception {
super.initialize();
}
@AfterClass
public void cleanup() throws Exception {
super.cleanup();
}
@Test
public void verifyCreateUpdate() {
IndexRecoveryService.RecoveryInfoManagement rm = new IndexRecoveryService.RecoveryInfoManagement(atlasGraph);
long now = System.currentTimeMillis();
rm.updateStartTime(now);
long storedTime = rm.getStartTime();
Assert.assertEquals(now, storedTime);
}
}

View File

@ -33,7 +33,8 @@ public interface ActiveStateChangeHandler {
ATLAS_PATCH_SERVICE(3),
DEFAULT_METADATA_SERVICE(4),
NOTIFICATION_HOOK_CONSUMER(5),
TASK_MANAGEMENT(6);
TASK_MANAGEMENT(6),
INDEX_RECOVERY(7);
private final int order;