ATLAS-4256, ATLAS-4258: AtlasTasks - Elegant handling of Failover Scenarios

Signed-off-by: Sarath Subramanian <sarath@apache.org>
This commit is contained in:
Ashutosh Mestry 2021-04-26 22:02:02 -07:00 committed by Sarath Subramanian
parent 683e2a91db
commit eddda699de
13 changed files with 137 additions and 29 deletions

View File

@ -94,7 +94,7 @@ public final class Constants {
public static final String PROPAGATED_CLASSIFICATION_NAMES_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "propagatedClassificationNames");
public static final String CUSTOM_ATTRIBUTES_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "customAttributes");
public static final String LABELS_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "labels");
public static final String PENDING_TASKS_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "pendingTasks");
public static final String EDGE_PENDING_TASKS_PROPERTY_KEY = encodePropertyKey(RELATIONSHIP_PROPERTY_KEY_PREFIX + "__pendingTasks");
/**
* Patch vertices property keys.

View File

@ -65,6 +65,7 @@ import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isRef
import static org.apache.atlas.type.AtlasStructType.UNIQUE_ATTRIBUTE_SHADE_PROPERTY_PREFIX;
import static org.apache.atlas.type.AtlasTypeUtil.isArrayType;
import static org.apache.atlas.type.AtlasTypeUtil.isMapType;
import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
/**

View File

@ -1203,7 +1203,7 @@ public abstract class DeleteHandlerV1 {
Map<String, Object> taskParams = ClassificationTask.toParameters(relationshipEdgeId, relationship);
AtlasTask task = taskManagement.createTask(taskType, currentUser, taskParams);
AtlasGraphUtilsV2.addEncodedProperty(relationshipEdge, PENDING_TASKS_PROPERTY_KEY, task.getGuid());
AtlasGraphUtilsV2.addItemToListProperty(relationshipEdge, EDGE_PENDING_TASKS_PROPERTY_KEY, task.getGuid());
RequestContext.get().queueTask(task);
}

View File

@ -47,6 +47,7 @@ import org.apache.atlas.type.AtlasType;
import org.apache.atlas.util.FileUtils;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
@ -841,4 +842,29 @@ public class AtlasGraphUtilsV2 {
return ret;
}
public static void addItemToListProperty(AtlasEdge edge, String property, String value) {
List list = getListFromProperty(edge, property);
list.add(value);
edge.setListProperty(property, list);
}
public static void removeItemFromListProperty(AtlasEdge edge, String property, String value) {
List list = getListFromProperty(edge, property);
list.remove(value);
if (CollectionUtils.isEmpty(list)) {
edge.removeProperty(property);
} else {
edge.setListProperty(property, list);
}
}
private static List getListFromProperty(AtlasEdge edge, String property) {
List list = edge.getListProperty(property);
return CollectionUtils.isEmpty(list) ? new ArrayList() : list;
}
}

View File

@ -25,7 +25,8 @@ import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TimeBoundary;
import org.apache.atlas.exception.EntityNotFoundException;
import org.apache.atlas.model.TimeBoundary;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
@ -124,6 +125,7 @@ import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPro
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_DELETE;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
@Component
public class EntityGraphMapper {
@ -2039,6 +2041,8 @@ public class EntityGraphMapper {
return null;
}
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuid);
AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid);
if (entityVertex == null) {
@ -2554,7 +2558,7 @@ public class EntityGraphMapper {
}
@GraphTransaction
public List<String> deleteClassificationPropagation(String classificationVertexId) throws AtlasBaseException {
public List<String> deleteClassificationPropagation(String entityGuid, String classificationVertexId) throws AtlasBaseException {
try {
if (StringUtils.isEmpty(classificationVertexId)) {
LOG.warn("deleteClassificationPropagation(classificationVertexId={}): classification vertex id is empty", classificationVertexId);
@ -2562,6 +2566,8 @@ public class EntityGraphMapper {
return null;
}
GraphTransactionInterceptor.lockObjectAndReleasePostCommit(entityGuid);
AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
if (classificationVertex == null) {
@ -2822,4 +2828,36 @@ public class EntityGraphMapper {
private void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId) {
deleteDelegate.getHandler().createAndQueueTask(taskType, entityVertex, classificationVertexId, null);
}
public void removePendingTaskFromEntity(String entityGuid, String taskGuid) throws EntityNotFoundException {
if (StringUtils.isEmpty(entityGuid) || StringUtils.isEmpty(taskGuid)) {
return;
}
AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid);
if (entityVertex == null) {
LOG.warn("Error fetching vertex: {}", entityVertex);
return;
}
entityVertex.removePropertyValue(PENDING_TASKS_PROPERTY_KEY, taskGuid);
}
public void removePendingTaskFromEdge(String edgeId, String taskGuid) throws AtlasBaseException {
if (StringUtils.isEmpty(edgeId) || StringUtils.isEmpty(taskGuid)) {
return;
}
AtlasEdge edge = graph.getEdge(edgeId);
if (edge == null) {
LOG.warn("Error fetching edge: {}", edgeId);
return;
}
AtlasGraphUtilsV2.removeItemFromListProperty(edge, EDGE_PENDING_TASKS_PROPERTY_KEY, taskGuid);
}
}

View File

@ -1189,7 +1189,7 @@ public class EntityGraphRetriever {
continue;
}
if (ignoreInactive && GraphHelper.getStatus((AtlasEdge) element) != AtlasEntity.Status.ACTIVE) {
if (isInactiveEdge(element, ignoreInactive)) {
continue;
}
@ -1710,4 +1710,8 @@ public class EntityGraphRetriever {
return new HashSet<>(ret);
}
private boolean isInactiveEdge(Object element, boolean ignoreInactive) {
return ignoreInactive && element instanceof AtlasEdge && getStatus((AtlasEdge) element) != AtlasEntity.Status.ACTIVE;
}
}

View File

@ -54,13 +54,11 @@ public class ClassificationPropagateTaskFactory implements TaskFactory {
private final AtlasRelationshipStore relationshipStore;
@Inject
public ClassificationPropagateTaskFactory(TaskManagement taskManagement, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
public ClassificationPropagateTaskFactory(AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
this.graph = graph;
this.entityGraphMapper = entityGraphMapper;
this.deleteDelegate = deleteDelegate;
this.relationshipStore = relationshipStore;
taskManagement.addFactory(this);
}
public org.apache.atlas.tasks.AbstractTask create(AtlasTask task) {

View File

@ -67,9 +67,10 @@ public class ClassificationPropagationTasks {
@Override
protected void run(Map<String, Object> parameters) throws AtlasBaseException {
String entityGuid = (String) parameters.get(PARAM_ENTITY_GUID);
String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID);
entityGraphMapper.deleteClassificationPropagation(classificationVertexId);
entityGraphMapper.deleteClassificationPropagation(entityGuid, classificationVertexId);
}
}

View File

@ -19,8 +19,10 @@ package org.apache.atlas.repository.store.graph.v2.tasks;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.exception.EntityNotFoundException;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
@ -56,7 +58,11 @@ public abstract class ClassificationTask extends AbstractTask {
protected final DeleteHandlerDelegate deleteDelegate;
protected final AtlasRelationshipStore relationshipStore;
public ClassificationTask(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
public ClassificationTask(AtlasTask task,
AtlasGraph graph,
EntityGraphMapper entityGraphMapper,
DeleteHandlerDelegate deleteDelegate,
AtlasRelationshipStore relationshipStore) {
super(task);
this.graph = graph;
@ -120,17 +126,15 @@ public abstract class ClassificationTask extends AbstractTask {
protected void setStatus(AtlasTask.Status status) {
super.setStatus(status);
// remove pending task guid from entity vertex or relationship edge
AtlasElement element;
if (getTaskType() == CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE) {
element = graph.getEdge((String) getTaskDef().getParameters().get(PARAM_RELATIONSHIP_EDGE_ID));
} else {
element = AtlasGraphUtilsV2.findByGuid((String) getTaskDef().getParameters().get(PARAM_ENTITY_GUID));
try {
if (getTaskType() == CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE) {
entityGraphMapper.removePendingTaskFromEdge((String) getTaskDef().getParameters().get(PARAM_RELATIONSHIP_EDGE_ID), getTaskGuid());
} else {
entityGraphMapper.removePendingTaskFromEntity((String) getTaskDef().getParameters().get(PARAM_ENTITY_GUID), getTaskGuid());
}
} catch (EntityNotFoundException | AtlasBaseException e) {
LOG.error("Error updating associated element for: {}", getTaskGuid(), e);
}
element.removePropertyValue(PENDING_TASKS_PROPERTY_KEY, getTaskGuid());
}
protected abstract void run(Map<String, Object> parameters) throws AtlasBaseException;

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.Set;
@Component
public class TaskFactoryRegistry {
private static final Logger LOG = LoggerFactory.getLogger(TaskFactoryRegistry.class);
@Inject
public TaskFactoryRegistry(TaskManagement taskManagement, Set<TaskFactory> factories) {
for (TaskFactory factory : factories) {
taskManagement.addFactory(factory);
}
LOG.info("TaskFactoryRegistry: TaskManagement updated with factories: {}", factories.size());
}
}

View File

@ -73,7 +73,7 @@ public class TaskManagement implements Service, ActiveStateChangeHandler {
if (configuration == null || !HAConfiguration.isHAEnabled(configuration)) {
startInternal();
} else {
LOG.info("TaskManagement.start(): deferring patches until instance activation");
LOG.info("TaskManagement.start(): deferring until instance activation");
}
}
@ -183,9 +183,11 @@ public class TaskManagement implements Service, ActiveStateChangeHandler {
}
LOG.info("TaskManagement: Started!");
queuePendingTasks();
}
public void queuePendingTasks() {
private void queuePendingTasks() {
if (AtlasConfiguration.TASKS_USE_ENABLED.getBoolean() == false) {
return;
}

View File

@ -134,7 +134,7 @@ public class ClassificationPropagationWithTasksTest extends AtlasTestBase {
ret = entityGraphMapper.updateClassificationsPropagation(HDFS_PATH_EMPLOYEES, StringUtils.EMPTY, StringUtils.EMPTY);
assertNull(ret);
ret = entityGraphMapper.deleteClassificationPropagation(StringUtils.EMPTY);
ret = entityGraphMapper.deleteClassificationPropagation(StringUtils.EMPTY, StringUtils.EMPTY);
assertNull(ret);
AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
@ -215,7 +215,7 @@ public class ClassificationPropagationWithTasksTest extends AtlasTestBase {
assertNotNull(entityVertex);
assertNotNull(classificationVertex);
List<String> impactedEntities = entityGraphMapper.deleteClassificationPropagation(classificationVertex.getId().toString());
List<String> impactedEntities = entityGraphMapper.deleteClassificationPropagation(hdfs_employees.getGuid(), classificationVertex.getId().toString());
assertNotNull(impactedEntities);
}

View File

@ -211,11 +211,6 @@ public class AdminResource {
}
}
@PostConstruct
public void init() {
taskManagement.queuePendingTasks();
}
/**
* Fetches the thread stack dump for this application.
*