ATLAS-4738: Dynamic Index Recovery issues and improvements
Signed-off-by: radhikakundam <radhikakundam@apache.org>
This commit is contained in:
parent
14adbad94a
commit
882954e296
|
|
@ -83,6 +83,7 @@ import java.io.FileInputStream;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
|
@ -137,6 +138,8 @@ public class AtlasClientV2 extends AtlasBaseClient {
|
|||
//Notification APIs
|
||||
private static final String NOTIFICATION_URI = BASE_URI + "v2/notification";
|
||||
|
||||
//IndexRecovery APIs
|
||||
private static final String INDEX_RECOVERY_URI = BASE_URI + "v2/indexrecovery";
|
||||
|
||||
public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) {
|
||||
super(baseUrl, basicAuthUserNamePassword);
|
||||
|
|
@ -1031,6 +1034,18 @@ public class AtlasClientV2 extends AtlasBaseClient {
|
|||
callAPI(formatPathParameters(API_V2.POST_NOTIFICATIONS_TO_TOPIC, topic), (Class<?>) null, messages);
|
||||
}
|
||||
|
||||
public Map<String, String> getIndexRecoveryData() throws AtlasServiceException {
|
||||
return callAPI(API_V2.GET_INDEX_RECOVERY_DATA, Map.class, null);
|
||||
}
|
||||
|
||||
public void startIndexRecovery(Instant startTime) throws AtlasServiceException {
|
||||
MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
|
||||
|
||||
queryParams.add("startTime", startTime != null ? String.valueOf(startTime) : null);
|
||||
|
||||
callAPI(API_V2.START_INDEX_RECOVERY, (Class<?>) null, queryParams);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public API formatPathWithParameter(API api, String... params) {
|
||||
return formatPathParameters(api, params);
|
||||
|
|
@ -1212,6 +1227,9 @@ public class AtlasClientV2 extends AtlasBaseClient {
|
|||
|
||||
public static final API_V2 POST_NOTIFICATIONS_TO_TOPIC = new API_V2(NOTIFICATION_URI + "/topic/%s", HttpMethod.POST, Response.Status.NO_CONTENT);
|
||||
|
||||
public static final API_V2 GET_INDEX_RECOVERY_DATA = new API_V2(INDEX_RECOVERY_URI , HttpMethod.GET, Response.Status.OK);
|
||||
public static final API_V2 START_INDEX_RECOVERY = new API_V2(INDEX_RECOVERY_URI + "/start", HttpMethod.POST, Response.Status.NO_CONTENT);
|
||||
|
||||
// labels APIs
|
||||
public static final API_V2 ADD_LABELS = new API_V2(ENTITY_API + "guid/%s/labels", HttpMethod.PUT, Response.Status.NO_CONTENT);
|
||||
public static final API_V2 ADD_LABELS_BY_UNIQUE_ATTRIBUTE = new API_V2(ENTITY_API + "uniqueAttribute/type/%s/labels", HttpMethod.PUT, Response.Status.NO_CONTENT);
|
||||
|
|
|
|||
|
|
@ -233,10 +233,11 @@ public final class Constants {
|
|||
/**
|
||||
* Index Recovery vertex property keys.
|
||||
*/
|
||||
public static final String INDEX_RECOVERY_PREFIX = INTERNAL_PROPERTY_KEY_PREFIX + "idxRecovery_";
|
||||
public static final String PROPERTY_KEY_INDEX_RECOVERY_NAME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "name");
|
||||
public static final String PROPERTY_KEY_INDEX_RECOVERY_START_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "startTime");
|
||||
public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime");
|
||||
public static final String INDEX_RECOVERY_PREFIX = INTERNAL_PROPERTY_KEY_PREFIX + "idxRecovery_";
|
||||
public static final String PROPERTY_KEY_INDEX_RECOVERY_NAME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "name");
|
||||
public static final String PROPERTY_KEY_INDEX_RECOVERY_START_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "startTime");
|
||||
public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime");
|
||||
public static final String PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "customTime");
|
||||
|
||||
public static final String SQOOP_SOURCE = "sqoop";
|
||||
public static final String FALCON_SOURCE = "falcon";
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ package org.apache.atlas.repository.graphdb.janus;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.atlas.ApplicationProperties;
|
||||
import org.apache.atlas.AtlasConfiguration;
|
||||
import org.apache.atlas.AtlasException;
|
||||
import org.apache.atlas.repository.graphdb.AtlasGraph;
|
||||
import org.apache.atlas.repository.graphdb.GraphDatabase;
|
||||
|
|
@ -77,9 +78,7 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
|
|||
public static final String INDEX_BACKEND_ES = "elasticsearch";
|
||||
public static final String GRAPH_TX_LOG_CONF = "tx.log-tx";
|
||||
public static final String GRAPH_TX_LOG_VERBOSE_CONF = "tx.recovery.verbose";
|
||||
public static final String SOLR_INDEX_TX_LOG_TTL_CONF = "write.ahead.log.ttl.in.hours";
|
||||
public static final String GRAPH_TX_LOG_TTL_CONF = "log.tx.ttl";
|
||||
public static final long DEFAULT_GRAPH_TX_LOG_TTL = 72; //Hrs
|
||||
|
||||
private static volatile AtlasJanusGraph atlasGraphInstance = null;
|
||||
private static volatile JanusGraph graphInstance;
|
||||
|
|
@ -233,7 +232,7 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
|
|||
public static void configureTxLogBasedIndexRecovery() {
|
||||
try {
|
||||
boolean recoveryEnabled = ApplicationProperties.get().getBoolean(INDEX_RECOVERY_CONF, DEFAULT_INDEX_RECOVERY);
|
||||
long ttl = ApplicationProperties.get().getLong(SOLR_INDEX_TX_LOG_TTL_CONF, DEFAULT_GRAPH_TX_LOG_TTL);
|
||||
long ttl = AtlasConfiguration.SOLR_INDEX_TX_LOG_TTL_CONF.getLong();
|
||||
Duration txLogTtlSecs = Duration.ofSeconds(Duration.ofHours(ttl).getSeconds());
|
||||
|
||||
Map<String, Object> properties = new HashMap<String, Object>() {{
|
||||
|
|
|
|||
|
|
@ -434,7 +434,7 @@ public class AtlasJanusGraphManagement implements AtlasGraphManagement {
|
|||
TransactionRecovery txRecovery = (TransactionRecovery) txRecoveryObject;
|
||||
StandardJanusGraph janusGraph = (StandardJanusGraph) this.graph.getGraph();
|
||||
|
||||
LOG.info("stopIndexRecovery: Index Client is unhealthy. Index recovery: Paused!");
|
||||
LOG.info("stopIndexRecovery: Index recovery: Paused!");
|
||||
|
||||
janusGraph.getBackend().getSystemTxLog().close();
|
||||
|
||||
|
|
|
|||
|
|
@ -92,7 +92,8 @@ public enum AtlasConfiguration {
|
|||
TASKS_USE_ENABLED("atlas.tasks.enabled", true),
|
||||
SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1),
|
||||
UPDATE_COMPOSITE_INDEX_STATUS("atlas.update.composite.index.status", true),
|
||||
METRICS_TIME_TO_LIVE_HOURS( "atlas.metrics.ttl.hours", 336); // 14 days default
|
||||
METRICS_TIME_TO_LIVE_HOURS( "atlas.metrics.ttl.hours", 336), // 14 days default
|
||||
SOLR_INDEX_TX_LOG_TTL_CONF("write.ahead.log.ttl.in.hours", 240); //10 days default
|
||||
|
||||
private static final Configuration APPLICATION_PROPERTIES;
|
||||
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package org.apache.atlas.repository.graph;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.atlas.ApplicationProperties;
|
||||
import org.apache.atlas.AtlasConfiguration;
|
||||
import org.apache.atlas.AtlasException;
|
||||
import org.apache.atlas.ha.HAConfiguration;
|
||||
import org.apache.atlas.listener.ActiveStateChangeHandler;
|
||||
|
|
@ -27,6 +28,7 @@ import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
|
|||
import org.apache.atlas.repository.graphdb.AtlasVertex;
|
||||
import org.apache.atlas.service.Service;
|
||||
import org.apache.commons.configuration.Configuration;
|
||||
import org.apache.commons.lang.math.NumberUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.annotation.Order;
|
||||
|
|
@ -35,12 +37,16 @@ import org.springframework.stereotype.Component;
|
|||
import javax.inject.Inject;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.Iterator;
|
||||
import java.util.TimeZone;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY;
|
||||
import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_NAME;
|
||||
import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME;
|
||||
import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME;
|
||||
import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_START_TIME;
|
||||
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
|
||||
|
|
@ -52,14 +58,14 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
|
|||
private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
|
||||
private static final String INDEX_HEALTH_MONITOR_THREAD_NAME = "index-health-monitor";
|
||||
private static final String SOLR_STATUS_CHECK_RETRY_INTERVAL = "atlas.graph.index.status.check.frequency";
|
||||
private static final String SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME = "atlas.graph.index.recovery.start.time";
|
||||
private static final String SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME = "atlas.index.recovery.start.time";
|
||||
private static final long SOLR_STATUS_RETRY_DEFAULT_MS = 30000; // 30 secs default
|
||||
|
||||
private final Thread indexHealthMonitor;
|
||||
private final RecoveryInfoManagement recoveryInfoManagement;
|
||||
public final RecoveryInfoManagement recoveryInfoManagement;
|
||||
private Configuration configuration;
|
||||
private boolean isIndexRecoveryEnabled;
|
||||
private RecoveryThread recoveryThread;
|
||||
public RecoveryThread recoveryThread;
|
||||
|
||||
@Inject
|
||||
public IndexRecoveryService(Configuration config, AtlasGraph graph) {
|
||||
|
|
@ -151,7 +157,7 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
|
|||
}
|
||||
}
|
||||
|
||||
private static class RecoveryThread implements Runnable {
|
||||
public static class RecoveryThread implements Runnable {
|
||||
private final AtlasGraph graph;
|
||||
private final RecoveryInfoManagement recoveryInfoManagement;
|
||||
private long indexStatusCheckRetryMillis;
|
||||
|
|
@ -176,7 +182,7 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
|
|||
|
||||
while (shouldRun.get()) {
|
||||
try {
|
||||
boolean isIdxHealthy = isIndexBackendHealthy();
|
||||
boolean isIdxHealthy = waitAndCheckIfIndexBackendHealthy();
|
||||
|
||||
if (this.txRecoveryObject == null && isIdxHealthy) {
|
||||
startMonitoring();
|
||||
|
|
@ -207,42 +213,68 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean isIndexBackendHealthy() throws AtlasException, InterruptedException {
|
||||
private boolean waitAndCheckIfIndexBackendHealthy() throws AtlasException, InterruptedException {
|
||||
Thread.sleep(indexStatusCheckRetryMillis);
|
||||
|
||||
return isIndexBackendHealthy();
|
||||
}
|
||||
|
||||
public boolean isIndexBackendHealthy() throws AtlasException {
|
||||
return this.graph.getGraphIndexClient().isHealthy();
|
||||
}
|
||||
|
||||
public void startMonitoringByUserRequest(Long startTime) {
|
||||
startMonitoring(startTime);
|
||||
}
|
||||
|
||||
private void startMonitoring() {
|
||||
Long startTime = null;
|
||||
startMonitoring(recoveryInfoManagement.getStartTime());
|
||||
}
|
||||
|
||||
private void startMonitoring(Long startTime) {
|
||||
if (startTime == null || startTime == 0L) {
|
||||
LOG.error("Index Recovery requested without start time");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
startTime = recoveryInfoManagement.getStartTime();
|
||||
txRecoveryObject = this.graph.getManagementSystem().startIndexRecovery(startTime);
|
||||
|
||||
printIndexRecoveryStats();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Index Recovery: Start: Error!", e);
|
||||
} finally {
|
||||
|
||||
LOG.info("Index Recovery: Started! Recovery time: {}", Instant.ofEpochMilli(startTime));
|
||||
} catch (Exception e) {
|
||||
LOG.error("Index Recovery with recovery time: {} failed", Instant.ofEpochMilli(startTime), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void stopMonitoring() {
|
||||
Instant newStartTime = Instant.now().minusMillis(indexStatusCheckRetryMillis);
|
||||
public void stopMonitoringByUserRequest() {
|
||||
stopIndexRecovery();
|
||||
LOG.info("Index Recovery: Stopped!");
|
||||
}
|
||||
|
||||
private void stopMonitoring() {
|
||||
stopIndexRecoveryAndUpdateStartTime();
|
||||
}
|
||||
|
||||
private void stopIndexRecoveryAndUpdateStartTime() {
|
||||
Instant newStartTime = Instant.now().minusMillis(2 * indexStatusCheckRetryMillis);
|
||||
|
||||
stopIndexRecovery();
|
||||
recoveryInfoManagement.updateStartTime(newStartTime.toEpochMilli());
|
||||
|
||||
LOG.info("Index Recovery: Stopped! Recovery time: {}", newStartTime);
|
||||
}
|
||||
|
||||
private void stopIndexRecovery() {
|
||||
try {
|
||||
this.graph.getManagementSystem().stopIndexRecovery(txRecoveryObject);
|
||||
|
||||
recoveryInfoManagement.updateStartTime(newStartTime.toEpochMilli());
|
||||
|
||||
printIndexRecoveryStats();
|
||||
} catch (Exception e) {
|
||||
LOG.info("Index Recovery: Stopped! Error!", e);
|
||||
} finally {
|
||||
this.txRecoveryObject = null;
|
||||
|
||||
LOG.info("Index Recovery: Stopped! Recovery time: {}", newStartTime);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -252,7 +284,7 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static class RecoveryInfoManagement {
|
||||
public static class RecoveryInfoManagement {
|
||||
private static final String INDEX_RECOVERY_TYPE_NAME = "__solrIndexRecoveryInfo";
|
||||
|
||||
private final AtlasGraph graph;
|
||||
|
|
@ -261,20 +293,46 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
|
|||
this.graph = graph;
|
||||
}
|
||||
|
||||
public void updateStartTime(Long startTime) {
|
||||
try {
|
||||
Long prevStartTime = null;
|
||||
AtlasVertex vertex = findVertex();
|
||||
public void updateStartTime(long time) {
|
||||
updateIndexRecoveryTime(PROPERTY_KEY_INDEX_RECOVERY_START_TIME, time);
|
||||
}
|
||||
|
||||
public void updateCustomStartTime(long time) {
|
||||
updateIndexRecoveryTime(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME, time);
|
||||
}
|
||||
|
||||
public void updateIndexRecoveryTime(String timePropertyKey, long time) {
|
||||
Map<String, String> indexRecoveryData = new HashMap<>();
|
||||
indexRecoveryData.put(timePropertyKey, String.valueOf(time));
|
||||
updateIndexRecoveryData(indexRecoveryData);
|
||||
}
|
||||
|
||||
public void updateIndexRecoveryData(Map<String, String> indexRecoveryData) {
|
||||
try {
|
||||
Long startTime = NumberUtils.createLong(indexRecoveryData.get(PROPERTY_KEY_INDEX_RECOVERY_START_TIME));
|
||||
Long prevStartTime = NumberUtils.createLong(indexRecoveryData.get(PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME));
|
||||
Long customStartTime = NumberUtils.createLong(indexRecoveryData.get(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME));
|
||||
boolean isStartTimeUpdated = startTime != null ? true : false;
|
||||
|
||||
AtlasVertex vertex = findVertex();
|
||||
if (vertex == null) {
|
||||
vertex = graph.addVertex();
|
||||
setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME);
|
||||
} else {
|
||||
prevStartTime = getStartTime(vertex);
|
||||
prevStartTime = isStartTimeUpdated ? getStartTime(vertex) : prevStartTime;
|
||||
}
|
||||
|
||||
setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME);
|
||||
setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_START_TIME, startTime);
|
||||
setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME, prevStartTime);
|
||||
if (startTime != null) {
|
||||
setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_START_TIME, startTime);
|
||||
}
|
||||
|
||||
if (prevStartTime != null) {
|
||||
setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME, prevStartTime);
|
||||
}
|
||||
|
||||
if (customStartTime != null) {
|
||||
setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME, customStartTime);
|
||||
}
|
||||
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Error: Updating: {}!", ex);
|
||||
|
|
@ -290,10 +348,12 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
|
|||
}
|
||||
|
||||
private Long getStartTime(AtlasVertex vertex) {
|
||||
if (vertex == null) {
|
||||
LOG.warn("Vertex passed is NULL: Returned is 0");
|
||||
Long defaultStartTime = getStartTimeByTxLogTTL();
|
||||
|
||||
return 0L;
|
||||
if (vertex == null) {
|
||||
LOG.warn("Vertex passed is NULL: Returned is startTime by TTL {}", Instant.ofEpochMilli(defaultStartTime));
|
||||
|
||||
return defaultStartTime;
|
||||
}
|
||||
|
||||
Long startTime = 0L;
|
||||
|
|
@ -304,10 +364,15 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
|
|||
LOG.error("Error retrieving startTime", e);
|
||||
}
|
||||
|
||||
return startTime;
|
||||
return startTime == null || startTime == 0L ? defaultStartTime : startTime;
|
||||
}
|
||||
|
||||
private AtlasVertex findVertex() {
|
||||
private Long getStartTimeByTxLogTTL() {
|
||||
long ttl = AtlasConfiguration.SOLR_INDEX_TX_LOG_TTL_CONF.getLong();
|
||||
return Instant.now().minus(ttl, ChronoUnit.HOURS).toEpochMilli();
|
||||
}
|
||||
|
||||
public AtlasVertex findVertex() {
|
||||
AtlasGraphQuery query = graph.query().has(PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME);
|
||||
Iterator<AtlasVertex> results = query.vertices().iterator();
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,143 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.atlas.web.rest;
|
||||
|
||||
import org.apache.atlas.AtlasErrorCode;
|
||||
import org.apache.atlas.AtlasException;
|
||||
import org.apache.atlas.annotation.Timed;
|
||||
import org.apache.atlas.authorize.AtlasAdminAccessRequest;
|
||||
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
|
||||
import org.apache.atlas.authorize.AtlasPrivilege;
|
||||
import org.apache.atlas.exception.AtlasBaseException;
|
||||
import org.apache.atlas.repository.graph.IndexRecoveryService;
|
||||
import org.apache.atlas.repository.graphdb.AtlasGraph;
|
||||
import org.apache.atlas.repository.graphdb.AtlasVertex;
|
||||
import org.apache.atlas.utils.AtlasPerfTracer;
|
||||
import org.apache.atlas.web.util.DateTimeHelper;
|
||||
import org.apache.atlas.web.util.Servlets;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.format.annotation.DateTimeFormat;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import javax.ws.rs.*;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.atlas.repository.Constants.*;
|
||||
|
||||
@Path("v2/indexrecovery")
|
||||
@Singleton
|
||||
@Service
|
||||
@Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
|
||||
@Produces({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
|
||||
public class IndexRecoveryREST {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(IndexRecoveryREST.class);
|
||||
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.IndexRecoveryREST");
|
||||
|
||||
private final IndexRecoveryService indexRecoveryService;
|
||||
private final AtlasGraph graph;
|
||||
|
||||
@Inject
|
||||
IndexRecoveryREST(IndexRecoveryService indexRecoveryService, AtlasGraph graph) {
|
||||
this.indexRecoveryService = indexRecoveryService;
|
||||
this.graph = graph;
|
||||
}
|
||||
/**
|
||||
* @return Future index recovery start time and previous recovery start time if applicable
|
||||
* @HTTP 200 If Index recovery data exists for the given entity
|
||||
* @HTTP 400 Bad query parameters
|
||||
*/
|
||||
@GET
|
||||
@Timed
|
||||
public Map<String, String> getIndexRecoveryData() {
|
||||
|
||||
AtlasPerfTracer perf = null;
|
||||
Long startTime = null;
|
||||
Long prevTime = null;
|
||||
Long customStartTime = null;
|
||||
Map<String, String> indexRecoveryData = new HashMap<>();
|
||||
|
||||
try {
|
||||
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
|
||||
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "IndexRecoveryREST.getIndexRecoveryData()");
|
||||
}
|
||||
|
||||
AtlasVertex indexRecoveryVertex = indexRecoveryService.recoveryInfoManagement.findVertex();
|
||||
if (indexRecoveryVertex != null) {
|
||||
startTime = indexRecoveryVertex.getProperty(PROPERTY_KEY_INDEX_RECOVERY_START_TIME, Long.class);
|
||||
prevTime = indexRecoveryVertex.getProperty(PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME, Long.class);
|
||||
customStartTime = indexRecoveryVertex.getProperty(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME, Long.class);
|
||||
}
|
||||
|
||||
indexRecoveryData.put(getPropertyKeyByRemovingPrefix(PROPERTY_KEY_INDEX_RECOVERY_START_TIME), startTime != null ? Instant.ofEpochMilli(startTime).toString() : "Not applicable");
|
||||
indexRecoveryData.put(getPropertyKeyByRemovingPrefix(PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME), prevTime != null ? Instant.ofEpochMilli(prevTime).toString() : "Not applicable");
|
||||
indexRecoveryData.put(getPropertyKeyByRemovingPrefix(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME), customStartTime != null ? Instant.ofEpochMilli(customStartTime).toString() : "Not applicable");
|
||||
|
||||
} finally {
|
||||
AtlasPerfTracer.log(perf);
|
||||
}
|
||||
|
||||
return indexRecoveryData;
|
||||
}
|
||||
|
||||
@POST
|
||||
@Path("/start")
|
||||
public void startCustomIndexRecovery(@QueryParam("startTime") @DateTimeFormat(pattern = DateTimeHelper.ISO8601_FORMAT)
|
||||
final String startTime) throws AtlasBaseException, AtlasException {
|
||||
AtlasPerfTracer perf = null;
|
||||
|
||||
try {
|
||||
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
|
||||
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "IndexRecoveryREST.getIndexRecoveryData()");
|
||||
}
|
||||
|
||||
AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "to start dynamic index recovery by custom time");
|
||||
|
||||
if (startTime == null) {
|
||||
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Index Recovery requested without start time");
|
||||
}
|
||||
|
||||
if (!indexRecoveryService.recoveryThread.isIndexBackendHealthy()) {
|
||||
throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "Index recovery can not be started - Solr Health: Unhealthy");
|
||||
}
|
||||
|
||||
long startTimeMilli = Instant.parse(startTime).toEpochMilli();
|
||||
|
||||
indexRecoveryService.recoveryThread.stopMonitoringByUserRequest();
|
||||
|
||||
indexRecoveryService.recoveryThread.startMonitoringByUserRequest(startTimeMilli);
|
||||
|
||||
indexRecoveryService.recoveryInfoManagement.updateCustomStartTime(startTimeMilli);
|
||||
|
||||
} finally {
|
||||
AtlasPerfTracer.log(perf);
|
||||
}
|
||||
}
|
||||
|
||||
public static String getPropertyKeyByRemovingPrefix(String propertyKey) {
|
||||
return StringUtils.removeStart(propertyKey, INDEX_RECOVERY_PREFIX);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,54 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.atlas.web.integration;
|
||||
|
||||
import org.apache.atlas.AtlasErrorCode;
|
||||
import org.apache.atlas.AtlasServiceException;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.atlas.repository.Constants.*;
|
||||
|
||||
|
||||
public class IndexRecoveryRestIT extends BaseResourceIT {
|
||||
|
||||
@Test
|
||||
public void startIndexRecovery() throws Exception {
|
||||
Map<String, String> indexRecoveryDataBefore = atlasClientV2.getIndexRecoveryData();
|
||||
|
||||
try {
|
||||
atlasClientV2.startIndexRecovery(null);
|
||||
} catch (AtlasServiceException e) {
|
||||
Assert.assertEquals(e.getStatus().getStatusCode(), AtlasErrorCode.BAD_REQUEST.getHttpCode().getStatusCode());
|
||||
}
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
atlasClientV2.startIndexRecovery(Instant.ofEpochMilli(now));
|
||||
|
||||
Map<String, String> indexRecoveryDataAfter = atlasClientV2.getIndexRecoveryData();
|
||||
|
||||
String customTimeKey = StringUtils.removeStart(PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME, INDEX_RECOVERY_PREFIX);
|
||||
Assert.assertNotEquals(indexRecoveryDataBefore.get(customTimeKey), indexRecoveryDataAfter.get(customTimeKey));
|
||||
Assert.assertEquals(Instant.ofEpochMilli(now).toString(), indexRecoveryDataAfter.get(customTimeKey));
|
||||
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue