ATLAS-3919: Handling classification propagation as deferred-action

Co-authored-by: Jayendra Parab <jayendra.parab@gmail.com>

Signed-off-by: Sarath Subramanian <sarath@apache.org>
This commit is contained in:
Ashutosh Mestry 2021-04-14 16:49:24 -07:00 committed by Sarath Subramanian
parent e6f2a8afe3
commit 0751f16250
46 changed files with 2686 additions and 331 deletions

View File

@ -94,6 +94,7 @@ public final class Constants {
public static final String PROPAGATED_CLASSIFICATION_NAMES_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "propagatedClassificationNames");
public static final String CUSTOM_ATTRIBUTES_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "customAttributes");
public static final String LABELS_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "labels");
public static final String PENDING_TASKS_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "pendingTasks");
/**
* Patch vertices property keys.
@ -207,6 +208,24 @@ public final class Constants {
*/
public static final String TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE = "ADD_MANDATORY_ATTRIBUTE";
/*
* Task related constants
*/
public static final String TASK_PREFIX = INTERNAL_PROPERTY_KEY_PREFIX + "task_";
public static final String TASK_TYPE_PROPERTY_KEY = encodePropertyKey(TASK_PREFIX + "v_type");
public static final String TASK_TYPE_NAME = INTERNAL_PROPERTY_KEY_PREFIX + "AtlasTaskDef";
public static final String TASK_GUID = encodePropertyKey(TASK_PREFIX + "guid");
public static final String TASK_TYPE = encodePropertyKey(TASK_PREFIX + "type");
public static final String TASK_CREATED_TIME = encodePropertyKey(TASK_PREFIX + "timestamp");
public static final String TASK_UPDATED_TIME = encodePropertyKey(TASK_PREFIX + "modificationTimestamp");
public static final String TASK_CREATED_BY = encodePropertyKey(TASK_PREFIX + "createdBy");
public static final String TASK_STATUS = encodePropertyKey(TASK_PREFIX + "status");
public static final String TASK_ATTEMPT_COUNT = encodePropertyKey(TASK_PREFIX + "attemptCount");
public static final String TASK_PARAMETERS = encodePropertyKey(TASK_PREFIX + "parameters");
public static final String TASK_ERROR_MESSAGE = encodePropertyKey(TASK_PREFIX + "errorMessage");
public static final String TASK_START_TIME = encodePropertyKey(TASK_PREFIX + "startTime");
public static final String TASK_END_TIME = encodePropertyKey(TASK_PREFIX + "endTime");
/*
* All supported file-format extensions for Bulk Imports through file upload
*/

View File

@ -57,6 +57,16 @@
</layout>
</appender>
<appender name="TASKS" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="${atlas.log.dir}/tasks.log"/>
<param name="Append" value="true"/>
<param name="maxFileSize" value="259MB" />
<param name="maxBackupIndex" value="20" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%t %d %x %m%n (%C{1}:%L)"/>
</layout>
</appender>
<appender name="METRICS" class="org.apache.log4j.RollingFileAppender">
<param name="File" value="${atlas.log.dir}/metric.log"/>
<param name="Append" value="true"/>
@ -144,6 +154,11 @@
<appender-ref ref="FAILED"/>
</logger>
<logger name="TASKS" additivity="false">
<level value="info"/>
<appender-ref ref="TASKS"/>
</logger>
<root>
<priority value="warn"/>
<appender-ref ref="FILE"/>

View File

@ -76,7 +76,8 @@ public enum AtlasConfiguration {
STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled", true),
REBUILD_INDEX("atlas.rebuild.index", false),
STORE_DIFFERENTIAL_AUDITS("atlas.entity.audit.differential", false),
DSL_EXECUTOR_TRAVERSAL("atlas.dsl.executor.traversal", true);
DSL_EXECUTOR_TRAVERSAL("atlas.dsl.executor.traversal", true),
TASKS_USE_ENABLED("atlas.tasks.enabled", true);
private static final Configuration APPLICATION_PROPERTIES;

View File

@ -95,6 +95,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
private Map<String, String> customAttributes;
private Map<String, Map<String, Object>> businessAttributes;
private Set<String> labels;
private Set<String> pendingTasks; // read-only field i.e. value provided is ignored during entity create/update
@JsonIgnore
private static AtomicLong s_nextId = new AtomicLong(System.nanoTime());
@ -220,6 +221,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
setCustomAttributes(other.getCustomAttributes());
setBusinessAttributes(other.getBusinessAttributes());
setLabels(other.getLabels());
setPendingTasks(other.getPendingTasks());
}
}
@ -393,6 +395,14 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
this.labels = labels;
}
public Set<String> getPendingTasks() {
return pendingTasks;
}
public void setPendingTasks(Set<String> pendingTasks) {
this.pendingTasks = pendingTasks;
}
public List<AtlasClassification> getClassifications() { return classifications; }
public void setClassifications(List<AtlasClassification> classifications) { this.classifications = classifications; }
@ -443,6 +453,7 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
setCustomAttributes(null);
setBusinessAttributes(null);
setLabels(null);
setPendingTasks(null);
}
private static String nextInternalId() {
@ -486,6 +497,9 @@ public class AtlasEntity extends AtlasStruct implements Serializable {
sb.append(", labels=[");
dumpObjects(labels, sb);
sb.append("]");
sb.append(", pendingTasks=[");
dumpObjects(pendingTasks, sb);
sb.append("]");
sb.append('}');
return sb;

View File

@ -0,0 +1,192 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.tasks;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.commons.lang.StringUtils;
import java.util.Date;
import java.util.Map;
import java.util.UUID;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class AtlasTask {
@JsonIgnore
public static final int MAX_ATTEMPT_COUNT = 3;
public enum Status {
PENDING,
IN_PROGRESS,
COMPLETE,
FAILED;
}
private String type;
private String guid;
private String createdBy;
private Date createdTime;
private Date updatedTime;
private Date startTime;
private Date endTime;
private Map<String, Object> parameters;
private int attemptCount;
private String errorMessage;
private Status status;
public AtlasTask() {
}
public AtlasTask(String type, String createdBy, Map<String, Object> parameters) {
this.guid = UUID.randomUUID().toString();
this.type = type;
this.createdBy = createdBy;
this.createdTime = new Date();
this.updatedTime = this.createdTime;
this.parameters = parameters;
this.status = Status.PENDING;
this.attemptCount = 0;
}
public String getGuid() {
return guid;
}
public void setGuid(String guid) {
this.guid = guid;
}
public String getCreatedBy() {
return createdBy;
}
public void setCreatedBy(String createdBy) {
this.createdBy = createdBy;
}
public Date getCreatedTime() {
return createdTime;
}
public void setCreatedTime(Date createdTime) {
this.createdTime = createdTime;
}
public Date getUpdatedTime() {
return updatedTime;
}
public void setUpdatedTime(Date updatedTime) {
this.updatedTime = updatedTime;
}
public Map<String, Object> getParameters() {
return parameters;
}
public void setParameters(Map<String, Object> val) {
this.parameters = val;
}
public void setType(String val) {
this.type = val;
}
public String getType() {
return this.type;
}
public void setStatus(String val) {
if (StringUtils.isNotEmpty(val)) {
this.status = Status.valueOf(val);
}
}
public void setStatus(Status val) {
this.status = val;
}
public Status getStatus() {
return this.status;
}
public int getAttemptCount() {
return attemptCount;
}
public void setAttemptCount(int attemptCount) {
this.attemptCount = attemptCount;
}
public void incrementAttemptCount() {
this.attemptCount++;
}
public void setStatusPending() {
this.status = Status.PENDING;
}
public String getErrorMessage() {
return errorMessage;
}
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
@JsonIgnore
public void start() {
this.setStatus(Status.IN_PROGRESS);
this.setStartTime(new Date());
}
@JsonIgnore
public void end() {
this.status = Status.COMPLETE;
this.setEndTime(new Date());
}
@JsonIgnore
public void updateStatusFromAttemptCount() {
setStatus((attemptCount < MAX_ATTEMPT_COUNT) ? AtlasTask.Status.PENDING : AtlasTask.Status.FAILED);
}
}

View File

@ -1291,6 +1291,7 @@ public class AtlasEntityType extends AtlasStructType {
add(new AtlasAttributeDef(IS_INCOMPLETE_PROPERTY_KEY, ATLAS_TYPE_INT, false, true));
add(new AtlasAttributeDef(LABELS_PROPERTY_KEY, ATLAS_TYPE_STRING, false, true));
add(new AtlasAttributeDef(CUSTOM_ATTRIBUTES_PROPERTY_KEY, ATLAS_TYPE_STRING, false, true));
add(new AtlasAttributeDef(PENDING_TASKS_PROPERTY_KEY, ATLAS_TYPE_STRING, false, true));
}};
return new AtlasEntityDef(ENTITY_ROOT_NAME, "Root entity for system attributes", "1.0", attributeDefs);

View File

@ -59,7 +59,7 @@ public class AtlasTypeRegistry {
registryData = new RegistryData();
updateSynchronizer = new TypeRegistryUpdateSynchronizer(this);
missingRelationshipDefs = new HashSet<>();
commonIndexFieldNameCache = new HashMap<>();
commonIndexFieldNameCache = new LinkedHashMap<>();
resolveReferencesForRootTypes();
resolveIndexFieldNamesForRootTypes();

View File

@ -47,6 +47,7 @@ public final class Constants {
public static final String CLASSIFICATION_NAMES_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "classificationNames");
public static final String PROPAGATED_CLASSIFICATION_NAMES_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "propagatedClassificationNames");
public static final String IS_INCOMPLETE_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "isIncomplete");
public static final String PENDING_TASKS_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "pendingTasks");
//Classification-Only System Attributes
public static final String CLASSIFICATION_ENTITY_STATUS_PROPERTY_KEY = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "entityStatus");

View File

@ -144,3 +144,6 @@ atlas.authentication.method.kerberos=false
######### Gremlin Search Configuration #########
# Set to false to disable gremlin search.
atlas.search.gremlin.enable=true
######### Configure use of Tasks #########
atlas.tasks.enabled=false

View File

@ -26,7 +26,9 @@ import org.apache.atlas.exception.NotFoundException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@ -53,7 +55,8 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
private static final ThreadLocal<Boolean> innerFailure = ThreadLocal.withInitial(() -> Boolean.FALSE);
private static final ThreadLocal<Map<String, AtlasVertex>> guidVertexCache = ThreadLocal.withInitial(() -> new HashMap<>());
private final AtlasGraph graph;
private final AtlasGraph graph;
private final TaskManagement taskManagement;
private static final ThreadLocal<Map<Object, String>> vertexGuidCache =
new ThreadLocal<Map<Object, String>>() {
@ -80,8 +83,9 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
};
@Inject
public GraphTransactionInterceptor(AtlasGraph graph) {
this.graph = graph;
public GraphTransactionInterceptor(AtlasGraph graph, TaskManagement taskManagement) {
this.graph = graph;
this.taskManagement = taskManagement;
}
@Override
@ -89,7 +93,7 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
Method method = invocation.getMethod();
String invokingClass = method.getDeclaringClass().getSimpleName();
String invokedMethodName = method.getName();
boolean logRollback = method.getAnnotation(GraphTransaction.class).logRollback();
boolean logRollback = method.getAnnotation(GraphTransaction.class) == null || method.getAnnotation(GraphTransaction.class).logRollback();
boolean isInnerTxn = isTxnOpen.get();
// Outermost txn marks any subsequent transaction as inner
@ -164,6 +168,10 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
}
OBJECT_UPDATE_SYNCHRONIZER.releaseLockedObjects();
if (isSuccess) {
submitTasks();
}
}
}
@ -231,6 +239,14 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
edgeStateCache.get().clear();
}
private void submitTasks() {
if (CollectionUtils.isEmpty(RequestContext.get().getQueuedTasks()) || taskManagement == null) {
return;
}
taskManagement.addAll(RequestContext.get().getQueuedTasks());
}
boolean logException(Throwable t) {
if (t instanceof AtlasBaseException) {
Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode();

View File

@ -334,6 +334,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
createCommonVertexIndex(management, PROPAGATED_CLASSIFICATION_NAMES_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, TRAIT_NAMES_PROPERTY_KEY, UniqueKind.NONE, String.class, SET, true, true);
createCommonVertexIndex(management, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, UniqueKind.NONE, String.class, LIST, true, true);
createCommonVertexIndex(management, PENDING_TASKS_PROPERTY_KEY, UniqueKind.NONE, String.class, SET, true, false);
createCommonVertexIndex(management, IS_INCOMPLETE_PROPERTY_KEY, UniqueKind.NONE, Integer.class, SINGLE, true, true);
createCommonVertexIndex(management, CUSTOM_ATTRIBUTES_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, LABELS_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
@ -344,6 +345,12 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
createCommonVertexIndex(management, PATCH_ACTION_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, PATCH_STATE_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
// tasks
createCommonVertexIndex(management, TASK_GUID, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, TASK_TYPE_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, false);
createCommonVertexIndex(management, TASK_CREATED_TIME, UniqueKind.NONE, Long.class, SINGLE, true, false);
createCommonVertexIndex(management, TASK_STATUS, UniqueKind.NONE, String.class, SINGLE, true, false);
// create vertex-centric index
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class, SINGLE);
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, Boolean.class, SINGLE);

View File

@ -21,6 +21,7 @@ package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.RequestContext;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.DeleteType;
import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
import org.slf4j.Logger;
@ -38,13 +39,15 @@ public class DeleteHandlerDelegate {
private final SoftDeleteHandlerV1 softDeleteHandler;
private final HardDeleteHandlerV1 hardDeleteHandler;
private final DeleteHandlerV1 defaultHandler;
private final AtlasGraph atlasGraph;
private final AtlasGraph graph;
private final TaskManagement taskManagement;
@Inject
public DeleteHandlerDelegate(AtlasGraph atlasGraph, AtlasTypeRegistry typeRegistry) {
this.atlasGraph = atlasGraph;
this.softDeleteHandler = new SoftDeleteHandlerV1(atlasGraph, typeRegistry);
this.hardDeleteHandler = new HardDeleteHandlerV1(atlasGraph, typeRegistry);
public DeleteHandlerDelegate(AtlasGraph graph, AtlasTypeRegistry typeRegistry, TaskManagement taskManagement) {
this.graph = graph;
this.taskManagement = taskManagement;
this.softDeleteHandler = new SoftDeleteHandlerV1(graph, typeRegistry, taskManagement);
this.hardDeleteHandler = new HardDeleteHandlerV1(graph, typeRegistry, taskManagement);
this.defaultHandler = getDefaultConfiguredHandler(typeRegistry);
}
@ -77,7 +80,8 @@ public class DeleteHandlerDelegate {
LOG.info("Default delete handler set to: {}", handlerFromProperties.getName());
ret = (DeleteHandlerV1) handlerFromProperties.getConstructor(AtlasGraph.class, AtlasTypeRegistry.class).newInstance(this.atlasGraph, typeRegistry);
ret = (DeleteHandlerV1) handlerFromProperties.getConstructor(AtlasGraph.class, AtlasTypeRegistry.class, TaskManagement.class)
.newInstance(this.graph, typeRegistry, taskManagement);
} catch (Exception ex) {
LOG.error("Error instantiating default delete handler. Defaulting to: {}", softDeleteHandler.getClass().getName(), ex);

View File

@ -17,6 +17,7 @@
*/
package org.apache.atlas.repository.store.graph.v1;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
@ -26,6 +27,8 @@ import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.repository.graph.AtlasEdgeLabel;
@ -37,6 +40,8 @@ import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.DeleteType;
import org.apache.atlas.repository.store.graph.v2.tasks.ClassificationTask;
import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.*;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
@ -59,23 +64,31 @@ import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
import static org.apache.atlas.repository.graph.GraphHelper.*;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getState;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.*;
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_ADD;
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_DELETE;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
public abstract class DeleteHandlerV1 {
public static final Logger LOG = LoggerFactory.getLogger(DeleteHandlerV1.class);
protected final GraphHelper graphHelper;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityRetriever;
private final boolean shouldUpdateInverseReferences;
private final boolean softDelete;
private static final boolean DEFERRED_ACTION_ENABLED = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
public DeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, boolean shouldUpdateInverseReference, boolean softDelete) {
protected final GraphHelper graphHelper;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityRetriever;
private final boolean shouldUpdateInverseReferences;
private final boolean softDelete;
private final TaskManagement taskManagement;
public DeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, boolean shouldUpdateInverseReference, boolean softDelete, TaskManagement taskManagement) {
this.typeRegistry = typeRegistry;
this.graphHelper = new GraphHelper(graph);
this.entityRetriever = new EntityGraphRetriever(graph, typeRegistry);
this.shouldUpdateInverseReferences = shouldUpdateInverseReference;
this.softDelete = softDelete;
this.taskManagement = taskManagement;
}
/**
@ -383,16 +396,24 @@ public abstract class DeleteHandlerV1 {
}
private void addTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException {
final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex);
final List<AtlasVertex> propagatedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? entityRetriever.getIncludedImpactedVerticesV2(toVertex, getRelationshipGuid(edge)) : null;
if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Propagate {} tags: from {} entity to {} entities", classificationVertices.size(), getTypeName(fromVertex), propagatedEntityVertices.size());
}
final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex);
String relationshipGuid = getRelationshipGuid(edge);
if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
for (AtlasVertex classificationVertex : classificationVertices) {
addTagPropagation(classificationVertex, propagatedEntityVertices);
createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD, toVertex, classificationVertex.getIdForDisplay(), relationshipGuid);
}
} else {
final List<AtlasVertex> propagatedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? entityRetriever.getIncludedImpactedVerticesV2(toVertex, relationshipGuid) : null;
if (CollectionUtils.isNotEmpty(propagatedEntityVertices)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Propagate {} tags: from {} entity to {} entities", classificationVertices.size(), getTypeName(fromVertex), propagatedEntityVertices.size());
}
for (AtlasVertex classificationVertex : classificationVertices) {
addTagPropagation(classificationVertex, propagatedEntityVertices);
}
}
}
}
@ -561,75 +582,6 @@ public abstract class DeleteHandlerV1 {
}
}
public void removeTagPropagation(AtlasEdge edge, PropagateTags propagateTags) throws AtlasBaseException {
if (edge == null) {
return;
}
AtlasVertex outVertex = edge.getOutVertex();
AtlasVertex inVertex = edge.getInVertex();
if (propagateTags == ONE_TO_TWO || propagateTags == PropagateTags.BOTH) {
removeTagPropagation(outVertex, inVertex, edge);
}
if (propagateTags == PropagateTags.TWO_TO_ONE || propagateTags == PropagateTags.BOTH) {
removeTagPropagation(inVertex, outVertex, edge);
}
}
private void removeTagPropagation(AtlasVertex fromVertex, AtlasVertex toVertex, AtlasEdge edge) throws AtlasBaseException {
final List<AtlasVertex> classificationVertices = getPropagationEnabledClassificationVertices(fromVertex);
final List<AtlasVertex> impactedEntityVertices = CollectionUtils.isNotEmpty(classificationVertices) ? entityRetriever.getIncludedImpactedVerticesV2(toVertex, getRelationshipGuid(edge)) : null;
if (CollectionUtils.isNotEmpty(impactedEntityVertices)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing {} propagated tags: for {} from {} entities", classificationVertices.size(), getTypeName(fromVertex), impactedEntityVertices.size());
}
for (AtlasVertex classificationVertex : classificationVertices) {
String classificationName = getTypeName(classificationVertex);
AtlasVertex associatedEntityVertex = getAssociatedEntityVertex(classificationVertex);
List<AtlasVertex> referrals = entityRetriever.getIncludedImpactedVerticesV2(associatedEntityVertex, getRelationshipGuid(edge));
for (AtlasVertex impactedEntityVertex : impactedEntityVertices) {
if (referrals.contains(impactedEntityVertex)) {
if (LOG.isDebugEnabled()) {
if (org.apache.commons.lang3.StringUtils.equals(getGuid(impactedEntityVertex), getGuid(associatedEntityVertex))) {
LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is associated with [{}]",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL, classificationName, getTypeName(associatedEntityVertex));
} else {
LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}], since [{}] is propagated through other path",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL, classificationName);
}
}
continue;
}
// remove propagated classification edge and classificationName from propagatedTraitNames vertex property
AtlasEdge propagatedEdge = getPropagatedClassificationEdge(impactedEntityVertex, classificationVertex);
if (propagatedEdge != null) {
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Removing propagated classification edge from [{}] --> [{}][{}] with edge label: [{}]",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
}
graphHelper.removeEdge(propagatedEdge);
removeFromPropagatedClassificationNames(impactedEntityVertex, classificationName);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(" --> Not removing propagated classification edge from [{}] --> [{}][{}] using edge label: [{}], since edge doesn't exist",
getTypeName(impactedEntityVertex), getTypeName(classificationVertex), getTypeName(associatedEntityVertex), CLASSIFICATION_LABEL);
}
}
}
}
}
}
public void deletePropagatedClassification(AtlasVertex entityVertex, String classificationName, String associatedEntityGuid) throws AtlasBaseException {
AtlasEdge propagatedEdge = getPropagatedClassificationEdge(entityVertex, classificationName, associatedEntityGuid);
@ -1059,7 +1011,11 @@ public abstract class DeleteHandlerV1 {
boolean removePropagations = getRemovePropagations(classificationVertex);
if (isClassificationEdge && removePropagations) {
removeTagPropagation(classificationVertex);
if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, instanceVertex, classificationVertex.getIdForDisplay(), null);
} else {
removeTagPropagation(classificationVertex);
}
}
deleteEdgeReference(edge, CLASSIFICATION, false, false, instanceVertex);
@ -1089,4 +1045,166 @@ public abstract class DeleteHandlerV1 {
return ret;
}
public void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship) throws AtlasBaseException {
PropagateTags oldTagPropagation = getPropagateTags(edge);
PropagateTags newTagPropagation = relationship.getPropagateTags();
if (newTagPropagation != oldTagPropagation) {
List<AtlasVertex> currentClassificationVertices = getPropagatableClassifications(edge);
Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap = entityRetriever.getClassificationPropagatedEntitiesMapping(currentClassificationVertices);
// Update propagation edge
AtlasGraphUtilsV2.setEncodedProperty(edge, RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, newTagPropagation.name());
List<AtlasVertex> updatedClassificationVertices = getPropagatableClassifications(edge);
List<AtlasVertex> classificationVerticesUnion = (List<AtlasVertex>) CollectionUtils.union(currentClassificationVertices, updatedClassificationVertices);
Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap = entityRetriever.getClassificationPropagatedEntitiesMapping(classificationVerticesUnion);
// compute add/remove propagations list
Map<AtlasVertex, List<AtlasVertex>> addPropagationsMap = new HashMap<>();
Map<AtlasVertex, List<AtlasVertex>> removePropagationsMap = new HashMap<>();
if (MapUtils.isEmpty(currentClassificationsMap) && MapUtils.isNotEmpty(updatedClassificationsMap)) {
addPropagationsMap.putAll(updatedClassificationsMap);
} else if (MapUtils.isNotEmpty(currentClassificationsMap) && MapUtils.isEmpty(updatedClassificationsMap)) {
removePropagationsMap.putAll(currentClassificationsMap);
} else {
for (AtlasVertex classificationVertex : updatedClassificationsMap.keySet()) {
List<AtlasVertex> currentPropagatingEntities = currentClassificationsMap.containsKey(classificationVertex) ? currentClassificationsMap.get(classificationVertex) : Collections.emptyList();
List<AtlasVertex> updatedPropagatingEntities = updatedClassificationsMap.containsKey(classificationVertex) ? updatedClassificationsMap.get(classificationVertex) : Collections.emptyList();
List<AtlasVertex> entitiesAdded = (List<AtlasVertex>) CollectionUtils.subtract(updatedPropagatingEntities, currentPropagatingEntities);
List<AtlasVertex> entitiesRemoved = (List<AtlasVertex>) CollectionUtils.subtract(currentPropagatingEntities, updatedPropagatingEntities);
if (CollectionUtils.isNotEmpty(entitiesAdded)) {
addPropagationsMap.put(classificationVertex, entitiesAdded);
}
if (CollectionUtils.isNotEmpty(entitiesRemoved)) {
removePropagationsMap.put(classificationVertex, entitiesRemoved);
}
}
}
for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) {
List<AtlasVertex> entitiesToAddPropagation = addPropagationsMap.get(classificationVertex);
addTagPropagation(classificationVertex, entitiesToAddPropagation);
}
for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) {
List<AtlasVertex> entitiesToRemovePropagation = removePropagationsMap.get(classificationVertex);
removeTagPropagation(classificationVertex, entitiesToRemovePropagation);
}
} else {
// update blocked propagated classifications only if there is no change is tag propagation (don't update both)
handleBlockedClassifications(edge, relationship.getBlockedPropagatedClassifications());
}
}
public void handleBlockedClassifications(AtlasEdge edge, Set<AtlasClassification> blockedClassifications) throws AtlasBaseException {
if (blockedClassifications != null) {
List<AtlasVertex> propagatableClassifications = getPropagatableClassifications(edge);
List<String> currBlockedClassificationIds = getBlockedClassificationIds(edge);
List<AtlasVertex> currBlockedClassifications = getVerticesForIds(propagatableClassifications, currBlockedClassificationIds);
List<AtlasVertex> classificationsToBlock = new ArrayList<>();
List<String> classificationIdsToBlock = new ArrayList<>();
for (AtlasClassification blockedClassification : blockedClassifications) {
AtlasVertex classificationVertex = validateBlockedPropagatedClassification(propagatableClassifications, blockedClassification);
if (classificationVertex != null) {
classificationsToBlock.add(classificationVertex);
classificationIdsToBlock.add(classificationVertex.getIdForDisplay());
}
}
setBlockedClassificationIds(edge, classificationIdsToBlock);
List<AtlasVertex> propagationChangedClassifications = (List<AtlasVertex>) CollectionUtils.disjunction(classificationsToBlock, currBlockedClassifications);
for (AtlasVertex classificationVertex : propagationChangedClassifications) {
List<AtlasVertex> propagationsToRemove = new ArrayList<>();
List<AtlasVertex> propagationsToAdd = new ArrayList<>();
entityRetriever.evaluateClassificationPropagation(classificationVertex, propagationsToAdd, propagationsToRemove);
if (CollectionUtils.isNotEmpty(propagationsToAdd)) {
addTagPropagation(classificationVertex, propagationsToAdd);
}
if (CollectionUtils.isNotEmpty(propagationsToRemove)) {
removeTagPropagation(classificationVertex, propagationsToRemove);
}
}
}
}
private List<AtlasVertex> getVerticesForIds(List<AtlasVertex> vertices, List<String> vertexIds) {
List<AtlasVertex> ret = new ArrayList<>();
if (CollectionUtils.isNotEmpty(vertexIds)) {
for (AtlasVertex vertex : vertices) {
String vertexId = vertex.getIdForDisplay();
if (vertexIds.contains(vertexId)) {
ret.add(vertex);
}
}
}
return ret;
}
// propagated classifications should contain blocked propagated classification
private AtlasVertex validateBlockedPropagatedClassification(List<AtlasVertex> classificationVertices, AtlasClassification classification) {
AtlasVertex ret = null;
for (AtlasVertex vertex : classificationVertices) {
String classificationName = getClassificationName(vertex);
String entityGuid = getClassificationEntityGuid(vertex);
if (classificationName.equals(classification.getTypeName()) && entityGuid.equals(classification.getEntityGuid())) {
ret = vertex;
break;
}
}
return ret;
}
private void setBlockedClassificationIds(AtlasEdge edge, List<String> classificationIds) {
if (edge != null) {
if (classificationIds.isEmpty()) {
edge.removeProperty(org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY);
} else {
edge.setListProperty(org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY, classificationIds);
}
}
}
public void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId, String relationshipGuid) {
String currentUser = RequestContext.getCurrentUser();
String entityGuid = GraphHelper.getGuid(entityVertex);
Map<String, Object> taskParams = ClassificationTask.toParameters(entityGuid, classificationVertexId, relationshipGuid);
AtlasTask task = taskManagement.createTask(taskType, currentUser, taskParams);
AtlasGraphUtilsV2.addEncodedProperty(entityVertex, PENDING_TASKS_PROPERTY_KEY, task.getGuid());
RequestContext.get().queueTask(task);
}
public void createAndQueueTask(String taskType, AtlasEdge relationshipEdge, AtlasRelationship relationship) {
String currentUser = RequestContext.getCurrentUser();
String relationshipEdgeId = relationshipEdge.getIdForDisplay();
Map<String, Object> taskParams = ClassificationTask.toParameters(relationshipEdgeId, relationship);
AtlasTask task = taskManagement.createTask(taskType, currentUser, taskParams);
AtlasGraphUtilsV2.addEncodedProperty(relationshipEdge, PENDING_TASKS_PROPERTY_KEY, task.getGuid());
RequestContext.get().queueTask(task);
}
}

View File

@ -24,6 +24,7 @@ import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.springframework.stereotype.Component;
@ -34,8 +35,8 @@ import javax.inject.Inject;
public class HardDeleteHandlerV1 extends DeleteHandlerV1 {
@Inject
public HardDeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
super(graph, typeRegistry, true, false);
public HardDeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, TaskManagement taskManagement) {
super(graph, typeRegistry, true, false, taskManagement);
}
@Override

View File

@ -26,6 +26,7 @@ import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasTypeRegistry;
import javax.inject.Inject;
@ -38,8 +39,8 @@ import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
public class SoftDeleteHandlerV1 extends DeleteHandlerV1 {
@Inject
public SoftDeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
super(graph, typeRegistry, false, true);
public SoftDeleteHandlerV1(AtlasGraph graph, AtlasTypeRegistry typeRegistry, TaskManagement taskManagement) {
super(graph, typeRegistry, false, true, taskManagement);
}
@Override

View File

@ -192,6 +192,22 @@ public class AtlasGraphUtilsV2 {
return addProperty(vertex, propertyName, value, true);
}
public static AtlasEdge addEncodedProperty(AtlasEdge edge, String propertyName, String value) {
List<String> listPropertyValues = edge.getListProperty(propertyName);
if (listPropertyValues == null) {
listPropertyValues = new ArrayList<>();
}
listPropertyValues.add(value);
edge.removeProperty(propertyName);
edge.setListProperty(propertyName, listPropertyValues);
return edge;
}
public static AtlasVertex addProperty(AtlasVertex vertex, String propertyName, Object value, boolean isEncoded) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> addProperty({}, {}, {})", toString(vertex), propertyName, value);

View File

@ -18,6 +18,7 @@
*/
package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
@ -26,8 +27,6 @@ import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.authorize.AtlasRelationshipAccessRequest;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelationship;
@ -51,7 +50,6 @@ import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@ -61,7 +59,6 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -69,10 +66,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_RELATIONSHIPS_ENABLED;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.BOTH;
import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.NONE;
@ -84,13 +79,9 @@ import static org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY;
import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationName;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagatableClassifications;
import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getState;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getTypeName;
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE;
@Component
public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
@ -98,7 +89,8 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
private static final Long DEFAULT_RELATIONSHIP_VERSION = 0L;
private final AtlasGraph graph;
private boolean notificationsEnabled = NOTIFICATION_RELATIONSHIPS_ENABLED.getBoolean();
private boolean notificationsEnabled = NOTIFICATION_RELATIONSHIPS_ENABLED.getBoolean();
private boolean DEFERRED_ACTION_ENABLED = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityRetriever;
@ -401,7 +393,7 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
AtlasGraphUtilsV2.setEncodedProperty(ret, RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, tagPropagation.name());
// blocked propagated classifications
handleBlockedClassifications(ret, relationship.getBlockedPropagatedClassifications());
deleteDelegate.getHandler().handleBlockedClassifications(ret, relationship.getBlockedPropagatedClassifications());
// propagate tags
deleteDelegate.getHandler().addTagPropagation(ret, tagPropagation);
@ -459,140 +451,11 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
return entityRetriever.mapEdgeToAtlasRelationship(relationshipEdge);
}
private void handleBlockedClassifications(AtlasEdge edge, Set<AtlasClassification> blockedClassifications) throws AtlasBaseException {
if (blockedClassifications != null) {
List<AtlasVertex> propagatableClassifications = getPropagatableClassifications(edge);
List<String> currBlockedClassificationIds = getBlockedClassificationIds(edge);
List<AtlasVertex> currBlockedClassifications = getVerticesForIds(propagatableClassifications, currBlockedClassificationIds);
List<AtlasVertex> classificationsToBlock = new ArrayList<>();
List<String> classificationIdsToBlock = new ArrayList<>();
for (AtlasClassification blockedClassification : blockedClassifications) {
AtlasVertex classificationVertex = validateBlockedPropagatedClassification(propagatableClassifications, blockedClassification);
if (classificationVertex != null) {
classificationsToBlock.add(classificationVertex);
classificationIdsToBlock.add(classificationVertex.getIdForDisplay());
}
}
setBlockedClassificationIds(edge, classificationIdsToBlock);
List<AtlasVertex> propagationChangedClassifications = (List<AtlasVertex>) CollectionUtils.disjunction(classificationsToBlock, currBlockedClassifications);
for (AtlasVertex classificationVertex : propagationChangedClassifications) {
List<AtlasVertex> propagationsToRemove = new ArrayList<>();
List<AtlasVertex> propagationsToAdd = new ArrayList<>();
entityRetriever.evaluateClassificationPropagation(classificationVertex, propagationsToAdd, propagationsToRemove);
if (CollectionUtils.isNotEmpty(propagationsToAdd)) {
deleteDelegate.getHandler().addTagPropagation(classificationVertex, propagationsToAdd);
}
if (CollectionUtils.isNotEmpty(propagationsToRemove)) {
deleteDelegate.getHandler().removeTagPropagation(classificationVertex, propagationsToRemove);
}
}
}
}
private List<AtlasVertex> getVerticesForIds(List<AtlasVertex> vertices, List<String> vertexIds) {
List<AtlasVertex> ret = new ArrayList<>();
if (CollectionUtils.isNotEmpty(vertexIds)) {
for (AtlasVertex vertex : vertices) {
String vertexId = vertex.getIdForDisplay();
if (vertexIds.contains(vertexId)) {
ret.add(vertex);
}
}
}
return ret;
}
// propagated classifications should contain blocked propagated classification
private AtlasVertex validateBlockedPropagatedClassification(List<AtlasVertex> classificationVertices, AtlasClassification classification) {
AtlasVertex ret = null;
for (AtlasVertex vertex : classificationVertices) {
String classificationName = getClassificationName(vertex);
String entityGuid = getClassificationEntityGuid(vertex);
if (classificationName.equals(classification.getTypeName()) && entityGuid.equals(classification.getEntityGuid())) {
ret = vertex;
break;
}
}
return ret;
}
private void setBlockedClassificationIds(AtlasEdge edge, List<String> classificationIds) {
if (edge != null) {
if (classificationIds.isEmpty()) {
edge.removeProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY);
} else {
edge.setListProperty(Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY, classificationIds);
}
}
}
private void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship) throws AtlasBaseException {
PropagateTags oldTagPropagation = getPropagateTags(edge);
PropagateTags newTagPropagation = relationship.getPropagateTags();
if (newTagPropagation != oldTagPropagation) {
List<AtlasVertex> currentClassificationVertices = getPropagatableClassifications(edge);
Map<AtlasVertex, List<AtlasVertex>> currentClassificationsMap = entityRetriever.getClassificationPropagatedEntitiesMapping(currentClassificationVertices);
// Update propagation edge
AtlasGraphUtilsV2.setEncodedProperty(edge, RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, newTagPropagation.name());
List<AtlasVertex> updatedClassificationVertices = getPropagatableClassifications(edge);
List<AtlasVertex> classificationVerticesUnion = (List<AtlasVertex>) CollectionUtils.union(currentClassificationVertices, updatedClassificationVertices);
Map<AtlasVertex, List<AtlasVertex>> updatedClassificationsMap = entityRetriever.getClassificationPropagatedEntitiesMapping(classificationVerticesUnion);
// compute add/remove propagations list
Map<AtlasVertex, List<AtlasVertex>> addPropagationsMap = new HashMap<>();
Map<AtlasVertex, List<AtlasVertex>> removePropagationsMap = new HashMap<>();
if (MapUtils.isEmpty(currentClassificationsMap) && MapUtils.isNotEmpty(updatedClassificationsMap)) {
addPropagationsMap.putAll(updatedClassificationsMap);
} else if (MapUtils.isNotEmpty(currentClassificationsMap) && MapUtils.isEmpty(updatedClassificationsMap)) {
removePropagationsMap.putAll(currentClassificationsMap);
} else {
for (AtlasVertex classificationVertex : updatedClassificationsMap.keySet()) {
List<AtlasVertex> currentPropagatingEntities = currentClassificationsMap.containsKey(classificationVertex) ? currentClassificationsMap.get(classificationVertex) : Collections.emptyList();
List<AtlasVertex> updatedPropagatingEntities = updatedClassificationsMap.containsKey(classificationVertex) ? updatedClassificationsMap.get(classificationVertex) : Collections.emptyList();
List<AtlasVertex> entitiesAdded = (List<AtlasVertex>) CollectionUtils.subtract(updatedPropagatingEntities, currentPropagatingEntities);
List<AtlasVertex> entitiesRemoved = (List<AtlasVertex>) CollectionUtils.subtract(currentPropagatingEntities, updatedPropagatingEntities);
if (CollectionUtils.isNotEmpty(entitiesAdded)) {
addPropagationsMap.put(classificationVertex, entitiesAdded);
}
if (CollectionUtils.isNotEmpty(entitiesRemoved)) {
removePropagationsMap.put(classificationVertex, entitiesRemoved);
}
}
}
for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) {
deleteDelegate.getHandler().addTagPropagation(classificationVertex, addPropagationsMap.get(classificationVertex));
}
for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) {
deleteDelegate.getHandler().removeTagPropagation(classificationVertex, removePropagationsMap.get(classificationVertex));
}
private void updateTagPropagations(AtlasEdge relationshipEdge, AtlasRelationship relationship) throws AtlasBaseException {
if (DEFERRED_ACTION_ENABLED) {
createAndQueueTask(CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE, relationshipEdge, relationship);
} else {
// update blocked propagated classifications only if there is no change is tag propagation (don't update both)
handleBlockedClassifications(edge, relationship.getBlockedPropagatedClassifications());
deleteDelegate.getHandler().updateTagPropagations(relationshipEdge, relationship);
}
}
@ -934,4 +797,8 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
entityChangeNotifier.notifyRelationshipMutation(ret, relationshipUpdate);
}
}
private void createAndQueueTask(String taskType, AtlasEdge relationshipEdge, AtlasRelationship relationship) {
deleteDelegate.getHandler().createAndQueueTask(taskType, relationshipEdge, relationship);
}
}

View File

@ -18,10 +18,12 @@
package org.apache.atlas.repository.store.graph.v2;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TimeBoundary;
import org.apache.atlas.model.TypeCategory;
@ -39,8 +41,8 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinali
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.RepositoryException;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.graph.IFullTextMapper;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graph.IFullTextMapper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
@ -48,12 +50,13 @@ import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes;
import org.apache.atlas.type.AtlasBusinessMetadataType.AtlasBusinessAttribute;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasMapType;
import org.apache.atlas.type.AtlasBusinessMetadataType.AtlasBusinessAttribute;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
@ -73,7 +76,17 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -89,9 +102,9 @@ import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PA
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
import static org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graph.GraphHelper.getCollectionElementsUsingRelationship;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex;
import static org.apache.atlas.repository.graph.GraphHelper.getCollectionElementsUsingRelationship;
import static org.apache.atlas.repository.graph.GraphHelper.getDelimitedClassificationNames;
import static org.apache.atlas.repository.graph.GraphHelper.getLabels;
import static org.apache.atlas.repository.graph.GraphHelper.getMapElementsProperty;
@ -107,6 +120,8 @@ import static org.apache.atlas.repository.graph.GraphHelper.string;
import static org.apache.atlas.repository.graph.GraphHelper.updateModificationMetadata;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference;
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_ADD;
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_DELETE;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
@ -128,6 +143,7 @@ public class EntityGraphMapper {
private static final boolean ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES = AtlasConfiguration.ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES.getBoolean();
private static final boolean CLASSIFICATION_PROPAGATION_DEFAULT = AtlasConfiguration.CLASSIFICATION_PROPAGATION_DEFAULT.getBoolean();
private boolean DEFERRED_ACTION_ENABLED = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
private final GraphHelper graphHelper;
private final AtlasGraph graph;
@ -137,12 +153,14 @@ public class EntityGraphMapper {
private final IAtlasEntityChangeNotifier entityChangeNotifier;
private final AtlasInstanceConverter instanceConverter;
private final EntityGraphRetriever entityRetriever;
private final IFullTextMapper fullTextMapperV2;
private final IFullTextMapper fullTextMapperV2;
private final TaskManagement taskManagement;
@Inject
public EntityGraphMapper(DeleteHandlerDelegate deleteDelegate, AtlasTypeRegistry typeRegistry, AtlasGraph graph,
AtlasRelationshipStore relationshipStore, IAtlasEntityChangeNotifier entityChangeNotifier,
AtlasInstanceConverter instanceConverter, IFullTextMapper fullTextMapperV2) {
AtlasInstanceConverter instanceConverter, IFullTextMapper fullTextMapperV2,
TaskManagement taskManagement) {
this.graphHelper = new GraphHelper(graph);
this.deleteDelegate = deleteDelegate;
this.typeRegistry = typeRegistry;
@ -152,6 +170,12 @@ public class EntityGraphMapper {
this.instanceConverter = instanceConverter;
this.entityRetriever = new EntityGraphRetriever(graph, typeRegistry);
this.fullTextMapperV2 = fullTextMapperV2;
this.taskManagement = taskManagement;
}
@VisibleForTesting
public void setTasksUseFlag(boolean value) {
DEFERRED_ACTION_ENABLED = value;
}
public AtlasVertex createVertex(AtlasEntity entity) throws AtlasBaseException {
@ -1943,6 +1967,12 @@ public class EntityGraphMapper {
LOG.debug("created vertex {} for trait {}", string(classificationVertex), classificationName);
}
if (propagateTags && taskManagement != null && DEFERRED_ACTION_ENABLED) {
propagateTags = false;
createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD, entityVertex, classificationVertex.getIdForDisplay());
}
// add the attributes for the trait instance
mapClassification(EntityOperation.CREATE, context, classification, entityType, entityVertex, classificationVertex);
updateModificationMetadata(entityVertex);
@ -2000,6 +2030,58 @@ public class EntityGraphMapper {
}
}
@GraphTransaction
public List<String> propagateClassification(String entityGuid, String classificationVertexId, String relationshipGuid) throws AtlasBaseException {
try {
if (StringUtils.isEmpty(entityGuid) || StringUtils.isEmpty(classificationVertexId)) {
LOG.warn("propagateClassification(entityGuid={}, classificationVertexId={}): entityGuid and/or classification vertex id is empty", entityGuid, classificationVertexId);
return null;
}
AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid);
if (entityVertex == null) {
LOG.warn("propagateClassification(entityGuid={}, classificationVertexId={}): entity vertex not found", entityGuid, classificationVertexId);
return null;
}
AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
if (classificationVertex == null) {
LOG.warn("propagateClassification(entityGuid={}, classificationVertexId={}): classification vertex not found", entityGuid, classificationVertexId);
return null;
}
List<AtlasVertex> impactedVertices = entityRetriever.getImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertexId);
if (CollectionUtils.isEmpty(impactedVertices)) {
LOG.debug("propagateClassification(entityGuid={}, classificationVertexId={}): found no entities to propagate the classification", entityGuid, classificationVertexId);
return null;
}
AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex);
List<AtlasVertex> entitiesPropagatedTo = deleteDelegate.getHandler().addTagPropagation(classificationVertex, impactedVertices);
if (CollectionUtils.isEmpty(entitiesPropagatedTo)) {
return null;
}
List<AtlasEntity> propagatedEntities = updateClassificationText(classification, entitiesPropagatedTo);
entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, Collections.singletonList(classification));
return propagatedEntities.stream().map(x -> x.getGuid()).collect(Collectors.toList());
} catch (Exception e) {
LOG.error("propagateClassification(entityGuid={}, classificationVertexId={}): error while propagating classification", entityGuid, classificationVertexId, e);
throw new AtlasBaseException(e);
}
}
public void deleteClassification(String entityGuid, String classificationName, String associatedEntityGuid) throws AtlasBaseException {
if (StringUtils.isEmpty(associatedEntityGuid) || associatedEntityGuid.equals(entityGuid)) {
deleteClassification(entityGuid, classificationName);
@ -2058,10 +2140,16 @@ public class EntityGraphMapper {
final List<AtlasVertex> entityVertices;
if (isPropagationEnabled(classificationVertex)) {
entityVertices = deleteDelegate.getHandler().removeTagPropagation(classificationVertex);
if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE, entityVertex, classificationVertex.getIdForDisplay());
if (LOG.isDebugEnabled()) {
LOG.debug("Number of propagations to delete -> {}", entityVertices.size());
entityVertices = new ArrayList<>();
} else {
entityVertices = deleteDelegate.getHandler().removeTagPropagation(classificationVertex);
if (LOG.isDebugEnabled()) {
LOG.debug("Number of propagations to delete -> {}", entityVertices.size());
}
}
} else {
entityVertices = new ArrayList<>();
@ -2267,6 +2355,27 @@ public class EntityGraphMapper {
Boolean currentTagPropagation = currentClassification.isPropagate();
Boolean updatedTagPropagation = classification.isPropagate();
/* -----------------------------
| Current Tag | Updated Tag |
| Propagation | Propagation |
|-------------|-------------|
| true | true | => no-op
|-------------|-------------|
| false | false | => no-op
|-------------|-------------|
| false | true | => Add Tag Propagation (send ADD classification notifications)
|-------------|-------------|
| true | false | => Remove Tag Propagation (send REMOVE classification notifications)
|-------------|-------------| */
if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
String propagationType = updatedTagPropagation ? CLASSIFICATION_PROPAGATION_ADD : CLASSIFICATION_PROPAGATION_DELETE;
createAndQueueTask(propagationType, entityVertex, classificationVertex.getIdForDisplay());
updatedTagPropagation = null;
}
// compute propagatedEntityVertices once and use it for subsequent iterations and notifications
if (updatedTagPropagation != null && currentTagPropagation != updatedTagPropagation) {
if (updatedTagPropagation) {
@ -2340,6 +2449,63 @@ public class EntityGraphMapper {
AtlasPerfTracer.log(perf);
}
@GraphTransaction
public List<String> updateClassificationsPropagation(String entityGuid, String classificationVertexId, String relationshipGuid) throws AtlasBaseException {
try {
if (StringUtils.isEmpty(entityGuid) || StringUtils.isEmpty(classificationVertexId)) {
LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): entityGuid and/or classification vertex id is empty", entityGuid, classificationVertexId);
return null;
}
AtlasVertex entityVertex = graphHelper.getVertexForGUID(entityGuid);
if (entityVertex == null) {
LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): entity vertex not found", entityGuid, classificationVertexId);
return null;
}
AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
if (classificationVertex == null) {
LOG.warn("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): classification vertex not found", entityGuid, classificationVertexId);
return null;
}
List<AtlasVertex> entitiesToPropagateTo = entityRetriever.getImpactedVerticesV2(entityVertex, relationshipGuid, classificationVertex.getIdForDisplay());
if (CollectionUtils.isEmpty(entitiesToPropagateTo)) {
LOG.debug("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): no impacted vertices found!", entityGuid, classificationVertexId);
return null;
}
List<AtlasVertex> entitiesPropagatedTo = deleteDelegate.getHandler().addTagPropagation(classificationVertex, entitiesToPropagateTo);
if (CollectionUtils.isEmpty(entitiesPropagatedTo)) {
LOG.debug("updateClassificationsPropagation(entityGuid={}, classificationVertexId={}): no propagations added!", entityGuid, classificationVertexId);
return null;
}
AtlasClassification updatedClassification = entityRetriever.toAtlasClassification(classificationVertex);
List<String> ret = new ArrayList<>();
for (AtlasVertex vertex : entitiesToPropagateTo) {
AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
ret.add(entity.getGuid());
if (isActive(entity)) {
vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
entityChangeNotifier.onClassificationUpdatedToEntity(entity, Collections.singletonList(updatedClassification));
}
}
return ret;
} catch (Exception ex) {
throw new AtlasBaseException(ex);
}
}
private AtlasEdge mapClassification(EntityOperation operation, final EntityMutationContext context, AtlasClassification classification,
AtlasEntityType entityType, AtlasVertex parentInstanceVertex, AtlasVertex traitInstanceVertex)
throws AtlasBaseException {
@ -2387,6 +2553,50 @@ public class EntityGraphMapper {
}
}
@GraphTransaction
public List<String> deleteClassificationPropagation(String classificationVertexId) throws AtlasBaseException {
try {
if (StringUtils.isEmpty(classificationVertexId)) {
LOG.warn("deleteClassificationPropagation(classificationVertexId={}): classification vertex id is empty", classificationVertexId);
return null;
}
AtlasVertex classificationVertex = graph.getVertex(classificationVertexId);
if (classificationVertex == null) {
LOG.warn("deleteClassificationPropagation(classificationVertexId={}): classification vertex not found", classificationVertexId);
return null;
}
List<AtlasVertex> entityVertices = deleteDelegate.getHandler().removeTagPropagation(classificationVertex);
if (CollectionUtils.isEmpty(entityVertices)) {
return null;
}
AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex);
List<AtlasEntity> propagatedEntities = updateClassificationText(classification, entityVertices);
entityChangeNotifier.onClassificationsDeletedFromEntities(propagatedEntities, Collections.singletonList(classification));
return propagatedEntities.stream().map(x -> x.getGuid()).collect(Collectors.toList());
} catch (Exception e) {
throw new AtlasBaseException(e);
}
}
@GraphTransaction
public void updateTagPropagations(String relationshipEdgeId, AtlasRelationship relationship) throws AtlasBaseException {
AtlasEdge relationshipEdge = graph.getEdge(relationshipEdgeId);
deleteDelegate.getHandler().updateTagPropagations(relationshipEdge, relationship);
entityChangeNotifier.notifyPropagatedEntities();
}
private void validateClassificationExists(List<String> existingClassifications, List<String> suppliedClassifications) throws AtlasBaseException {
Set<String> existingNames = new HashSet<>(existingClassifications);
for (String classificationName : suppliedClassifications) {
@ -2608,4 +2818,8 @@ public class EntityGraphMapper {
attributes.put(bmAttribute.getName(), attrValue);
}
}
private void createAndQueueTask(String taskType, AtlasVertex entityVertex, String classificationVertexId) {
deleteDelegate.getHandler().createAndQueueTask(taskType, entityVertex, classificationVertexId, null);
}
}

View File

@ -68,6 +68,7 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -79,6 +80,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
@ -116,6 +118,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT;
import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
@Component
public class EntityGraphRetriever {
@ -501,7 +504,7 @@ public class EntityGraphRetriever {
public List<AtlasVertex> getImpactedVerticesV2(AtlasVertex entityVertex, String relationshipGuidToExclude) {
List<AtlasVertex> ret = new ArrayList<>();
traverseImpactedVertices(entityVertex, relationshipGuidToExclude, null, new HashSet<>(), ret);
traverseImpactedVertices(entityVertex, relationshipGuidToExclude, null, ret);
return ret;
}
@ -509,7 +512,7 @@ public class EntityGraphRetriever {
public List<AtlasVertex> getIncludedImpactedVerticesV2(AtlasVertex entityVertex, String relationshipGuidToExclude) {
List<AtlasVertex> ret = new ArrayList<>(Arrays.asList(entityVertex));
traverseImpactedVertices(entityVertex, relationshipGuidToExclude, null, new HashSet<>(), ret);
traverseImpactedVertices(entityVertex, relationshipGuidToExclude, null, ret);
return ret;
}
@ -517,20 +520,37 @@ public class EntityGraphRetriever {
public List<AtlasVertex> getImpactedVerticesV2(AtlasVertex entityVertex, String relationshipGuidToExclude, String classificationId) {
List<AtlasVertex> ret = new ArrayList<>();
traverseImpactedVertices(entityVertex, relationshipGuidToExclude, classificationId, new HashSet<>(), ret);
traverseImpactedVertices(entityVertex, relationshipGuidToExclude, classificationId, ret);
return ret;
}
private void traverseImpactedVertices(AtlasVertex entityVertex, String relationshipGuidToExclude, String classificationId, Set<String> visitedVertices, List<AtlasVertex> result) {
visitedVertices.add(entityVertex.getIdForDisplay());
private void traverseImpactedVertices(final AtlasVertex entityVertexStart, final String relationshipGuidToExclude,
final String classificationId, final List<AtlasVertex> result) {
Set<String> visitedVertices = new HashSet<>();
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(getTypeName(entityVertex));
String[] tagPropagationEdges = entityType != null ? entityType.getTagPropagationEdgesArray() : null;
Queue<AtlasVertex> queue = new ArrayDeque<AtlasVertex>() {{ add(entityVertexStart); }};
while (!queue.isEmpty()) {
AtlasVertex entityVertex = queue.poll();
String entityVertexId = entityVertex.getIdForDisplay();
if (visitedVertices.contains(entityVertexId)) {
LOG.info("Already visited: {}", entityVertexId);
continue;
}
visitedVertices.add(entityVertexId);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(getTypeName(entityVertex));
String[] tagPropagationEdges = entityType != null ? entityType.getTagPropagationEdgesArray() : null;
if (tagPropagationEdges == null) {
continue;
}
if (tagPropagationEdges != null) {
Iterable<AtlasEdge> propagationEdges = entityVertex.getEdges(AtlasEdgeDirection.BOTH, tagPropagationEdges);
for (AtlasEdge propagationEdge : propagationEdges) {
if (getEdgeStatus(propagationEdge) != ACTIVE) {
continue;
@ -540,7 +560,7 @@ public class EntityGraphRetriever {
if (tagPropagation == null || tagPropagation == NONE) {
continue;
} else if (tagPropagation == TWO_TO_ONE) {
} else if (tagPropagation == TWO_TO_ONE) {
if (isOutVertex(entityVertex, propagationEdge)) {
continue;
}
@ -558,18 +578,18 @@ public class EntityGraphRetriever {
if (classificationId != null) {
List<String> blockedClassificationIds = getBlockedClassificationIds(propagationEdge);
if (CollectionUtils.isNotEmpty(blockedClassificationIds) && blockedClassificationIds.contains(classificationId)) {
continue;
}
}
AtlasVertex adjacentVertex = getOtherVertex(propagationEdge, entityVertex);
AtlasVertex adjacentVertex = getOtherVertex(propagationEdge, entityVertex);
String adjacentVertexIdForDisplay = adjacentVertex.getIdForDisplay();
if (!visitedVertices.contains(adjacentVertex.getIdForDisplay())) {
if (!visitedVertices.contains(adjacentVertexIdForDisplay)) {
result.add(adjacentVertex);
traverseImpactedVertices(adjacentVertex, relationshipGuidToExclude, classificationId, visitedVertices, result);
queue.add(adjacentVertex);
}
}
}
@ -768,25 +788,32 @@ public class EntityGraphRetriever {
LOG.debug("Mapping system attributes for type {}", entity.getTypeName());
}
entity.setGuid(getGuid(entityVertex));
entity.setTypeName(getTypeName(entityVertex));
entity.setStatus(GraphHelper.getStatus(entityVertex));
entity.setVersion(GraphHelper.getVersion(entityVertex));
try {
if (entityVertex != null) {
entity.setGuid(getGuid(entityVertex));
entity.setTypeName(getTypeName(entityVertex));
entity.setStatus(GraphHelper.getStatus(entityVertex));
entity.setVersion(GraphHelper.getVersion(entityVertex));
entity.setCreatedBy(GraphHelper.getCreatedByAsString(entityVertex));
entity.setUpdatedBy(GraphHelper.getModifiedByAsString(entityVertex));
entity.setCreatedBy(GraphHelper.getCreatedByAsString(entityVertex));
entity.setUpdatedBy(GraphHelper.getModifiedByAsString(entityVertex));
entity.setCreateTime(new Date(GraphHelper.getCreatedTime(entityVertex)));
entity.setUpdateTime(new Date(GraphHelper.getModifiedTime(entityVertex)));
entity.setCreateTime(new Date(GraphHelper.getCreatedTime(entityVertex)));
entity.setUpdateTime(new Date(GraphHelper.getModifiedTime(entityVertex)));
entity.setHomeId(GraphHelper.getHomeId(entityVertex));
entity.setHomeId(GraphHelper.getHomeId(entityVertex));
entity.setIsProxy(GraphHelper.isProxy(entityVertex));
entity.setIsIncomplete(isEntityIncomplete(entityVertex));
entity.setIsProxy(GraphHelper.isProxy(entityVertex));
entity.setIsIncomplete(isEntityIncomplete(entityVertex));
entity.setProvenanceType(GraphHelper.getProvenanceType(entityVertex));
entity.setCustomAttributes(getCustomAttributes(entityVertex));
entity.setLabels(getLabels(entityVertex));
entity.setProvenanceType(GraphHelper.getProvenanceType(entityVertex));
entity.setCustomAttributes(getCustomAttributes(entityVertex));
entity.setLabels(getLabels(entityVertex));
entity.setPendingTasks(getPendingTasks(entityVertex));
}
} catch (Throwable t) {
LOG.warn("Got exception while mapping system attributes for type {} : ", entity.getTypeName(), t);
}
return entity;
}
@ -1660,4 +1687,14 @@ public class EntityGraphRetriever {
relationship.setAttribute(attribute.getName(), attrValue);
}
}
private Set<String> getPendingTasks(AtlasVertex entityVertex) {
Collection<String> ret = entityVertex.getPropertyValues(PENDING_TASKS_PROPERTY_KEY, String.class);
if (CollectionUtils.isEmpty(ret)) {
return null;
}
return new HashSet<>(ret);
}
}

View File

@ -123,12 +123,12 @@ public class MigrationImport extends ImportStrategy {
private AtlasEntityStoreV2 createEntityStore(AtlasGraph graph, AtlasTypeRegistry typeRegistry) {
FullTextMapperV2Nop fullTextMapperV2 = new FullTextMapperV2Nop();
IAtlasEntityChangeNotifier entityChangeNotifier = new EntityChangeNotifierNop();
DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(graph, typeRegistry);
DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(graph, typeRegistry, null);
AtlasFormatConverters formatConverters = new AtlasFormatConverters(typeRegistry);
AtlasInstanceConverter instanceConverter = new AtlasInstanceConverter(graph, typeRegistry, formatConverters);
AtlasRelationshipStore relationshipStore = new AtlasRelationshipStoreV2(graph, typeRegistry, deleteDelegate, entityChangeNotifier);
EntityGraphMapper entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, graph, relationshipStore, entityChangeNotifier, instanceConverter, fullTextMapperV2);
EntityGraphMapper entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, graph, relationshipStore, entityChangeNotifier, instanceConverter, fullTextMapperV2, null);
return new AtlasEntityStoreV2(graph, deleteDelegate, typeRegistry, entityChangeNotifier, entityGraphMapper);
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.tasks;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.tasks.TaskFactory;
import org.apache.atlas.tasks.TaskManagement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.List;
@Component
public class ClassificationPropagateTaskFactory implements TaskFactory {
private static final Logger LOG = LoggerFactory.getLogger(ClassificationPropagateTaskFactory.class);
public static final String CLASSIFICATION_PROPAGATION_ADD = "CLASSIFICATION_PROPAGATION_ADD";
public static final String CLASSIFICATION_PROPAGATION_UPDATE = "CLASSIFICATION_PROPAGATION_UPDATE";
public static final String CLASSIFICATION_PROPAGATION_DELETE = "CLASSIFICATION_PROPAGATION_DELETE";
public static final String CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE = "CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE";
private static final List<String> supportedTypes = new ArrayList<String>() {{
add(CLASSIFICATION_PROPAGATION_ADD);
add(CLASSIFICATION_PROPAGATION_UPDATE);
add(CLASSIFICATION_PROPAGATION_DELETE);
add(CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE);
}};
private final AtlasGraph graph;
private final EntityGraphMapper entityGraphMapper;
private final DeleteHandlerDelegate deleteDelegate;
private final AtlasRelationshipStore relationshipStore;
@Inject
public ClassificationPropagateTaskFactory(TaskManagement taskManagement, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
this.graph = graph;
this.entityGraphMapper = entityGraphMapper;
this.deleteDelegate = deleteDelegate;
this.relationshipStore = relationshipStore;
taskManagement.addFactory(this);
}
public org.apache.atlas.tasks.AbstractTask create(AtlasTask task) {
String taskType = task.getType();
String taskGuid = task.getGuid();
switch (taskType) {
case CLASSIFICATION_PROPAGATION_ADD:
return new ClassificationPropagationTasks.Add(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
case CLASSIFICATION_PROPAGATION_UPDATE:
return new ClassificationPropagationTasks.Update(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
case CLASSIFICATION_PROPAGATION_DELETE:
return new ClassificationPropagationTasks.Delete(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
case CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE:
return new ClassificationPropagationTasks.UpdateRelationship(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
default:
LOG.warn("Type: {} - {} not found!. The task will be ignored.", taskType, taskGuid);
return null;
}
}
@Override
public List<String> getSupportedTypes() {
return this.supportedTypes;
}
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.tasks;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.type.AtlasType;
import java.util.Map;
public class ClassificationPropagationTasks {
public static class Add extends ClassificationTask {
public Add(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
}
@Override
protected void run(Map<String, Object> parameters) throws AtlasBaseException {
String entityGuid = (String) parameters.get(PARAM_ENTITY_GUID);
String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID);
String relationshipGuid = (String) parameters.get(PARAM_RELATIONSHIP_GUID);
entityGraphMapper.propagateClassification(entityGuid, classificationVertexId, relationshipGuid);
}
}
public static class Update extends ClassificationTask {
public Update(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
}
@Override
protected void run(Map<String, Object> parameters) throws AtlasBaseException {
String entityGuid = (String) parameters.get(PARAM_ENTITY_GUID);
String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID);
String relationshipGuid = (String) parameters.get(PARAM_RELATIONSHIP_GUID);
entityGraphMapper.updateClassificationsPropagation(entityGuid, classificationVertexId, relationshipGuid);
}
}
public static class Delete extends ClassificationTask {
public Delete(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
}
@Override
protected void run(Map<String, Object> parameters) throws AtlasBaseException {
String classificationVertexId = (String) parameters.get(PARAM_CLASSIFICATION_VERTEX_ID);
entityGraphMapper.deleteClassificationPropagation(classificationVertexId);
}
}
public static class UpdateRelationship extends ClassificationTask {
public UpdateRelationship(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
super(task, graph, entityGraphMapper, deleteDelegate, relationshipStore);
}
@Override
protected void run(Map<String, Object> parameters) throws AtlasBaseException {
String relationshipEdgeId = (String) parameters.get(PARAM_RELATIONSHIP_EDGE_ID);
AtlasRelationship relationship = AtlasType.fromJson((String) parameters.get(PARAM_RELATIONSHIP_OBJECT), AtlasRelationship.class);
entityGraphMapper.updateTagPropagations(relationshipEdgeId, relationship);
}
}
}

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.tasks;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.tasks.AbstractTask;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import static org.apache.atlas.model.tasks.AtlasTask.Status.COMPLETE;
import static org.apache.atlas.model.tasks.AtlasTask.Status.FAILED;
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory.CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE;
import static org.apache.atlas.type.Constants.PENDING_TASKS_PROPERTY_KEY;
public abstract class ClassificationTask extends AbstractTask {
private static final Logger LOG = LoggerFactory.getLogger(ClassificationTask.class);
protected static final String PARAM_ENTITY_GUID = "entityGuid";
protected static final String PARAM_CLASSIFICATION_VERTEX_ID = "classificationVertexId";
protected static final String PARAM_RELATIONSHIP_GUID = "relationshipGuid";
protected static final String PARAM_RELATIONSHIP_OBJECT = "relationshipObject";
protected static final String PARAM_RELATIONSHIP_EDGE_ID = "relationshipEdgeId";
protected final AtlasGraph graph;
protected final EntityGraphMapper entityGraphMapper;
protected final DeleteHandlerDelegate deleteDelegate;
protected final AtlasRelationshipStore relationshipStore;
public ClassificationTask(AtlasTask task, AtlasGraph graph, EntityGraphMapper entityGraphMapper, DeleteHandlerDelegate deleteDelegate, AtlasRelationshipStore relationshipStore) {
super(task);
this.graph = graph;
this.entityGraphMapper = entityGraphMapper;
this.deleteDelegate = deleteDelegate;
this.relationshipStore = relationshipStore;
}
@Override
public AtlasTask.Status perform() throws Exception {
Map<String, Object> params = getTaskDef().getParameters();
if (MapUtils.isEmpty(params)) {
LOG.warn("Task: {}: Unable to process task: Parameters is not readable!", getTaskGuid());
return FAILED;
}
String userName = getTaskDef().getCreatedBy();
if (StringUtils.isEmpty(userName)) {
LOG.warn("Task: {}: Unable to process task as user name is empty!", getTaskGuid());
return FAILED;
}
RequestContext.get().setUser(userName, null);
try {
run(params);
setStatus(COMPLETE);
} catch (Exception e) {
LOG.error("Task: {}: Error performing task!", getTaskGuid(), e);
setStatus(FAILED);
throw e;
} finally {
graph.commit();
}
return getStatus();
}
public static Map<String, Object> toParameters(String entityGuid, String classificationVertexId, String relationshipGuid) {
return new HashMap<String, Object>() {{
put(PARAM_ENTITY_GUID, entityGuid);
put(PARAM_CLASSIFICATION_VERTEX_ID, classificationVertexId);
put(PARAM_RELATIONSHIP_GUID, relationshipGuid);
}};
}
public static Map<String, Object> toParameters(String relationshipEdgeId, AtlasRelationship relationship) {
return new HashMap<String, Object>() {{
put(PARAM_RELATIONSHIP_EDGE_ID, relationshipEdgeId);
put(PARAM_RELATIONSHIP_OBJECT, AtlasType.toJson(relationship));
}};
}
protected void setStatus(AtlasTask.Status status) {
super.setStatus(status);
// remove pending task guid from entity vertex or relationship edge
AtlasElement element;
if (getTaskType() == CLASSIFICATION_PROPAGATION_RELATIONSHIP_UPDATE) {
element = graph.getEdge((String) getTaskDef().getParameters().get(PARAM_RELATIONSHIP_EDGE_ID));
} else {
element = AtlasGraphUtilsV2.findByGuid((String) getTaskDef().getParameters().get(PARAM_ENTITY_GUID));
}
element.removePropertyValue(PENDING_TASKS_PROPERTY_KEY, getTaskGuid());
}
protected abstract void run(Map<String, Object> parameters) throws AtlasBaseException;
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.tasks;
import org.apache.atlas.model.tasks.AtlasTask;
import static org.apache.atlas.model.tasks.AtlasTask.Status;
public abstract class AbstractTask {
private final AtlasTask task;
public AbstractTask(AtlasTask task) {
this.task = task;
}
public void run() throws Exception {
try {
perform();
} catch (Exception exception) {
task.setStatusPending();
task.setErrorMessage(exception.getMessage());
task.incrementAttemptCount();
throw exception;
} finally {
task.end();
}
}
protected void setStatus(Status status) {
task.setStatus(status);
}
public Status getStatus() {
return this.task.getStatus();
}
public String getTaskGuid() {
return task.getGuid();
}
public String getTaskType() {
return task.getType();
}
protected AtlasTask getTaskDef() {
return this.task;
}
public abstract Status perform() throws Exception;
}

View File

@ -0,0 +1,181 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.tasks;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TaskExecutor {
private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class);
private static final TaskLogger TASK_LOG = TaskLogger.getLogger();
private static final String TASK_NAME_FORMAT = "atlas-task-%d-";
private final TaskRegistry registry;
private final Map<String, TaskFactory> taskTypeFactoryMap;
private final TaskManagement.Statistics statistics;
private final ExecutorService executorService;
public TaskExecutor(TaskRegistry registry, Map<String, TaskFactory> taskTypeFactoryMap, TaskManagement.Statistics statistics) {
this.registry = registry;
this.taskTypeFactoryMap = taskTypeFactoryMap;
this.statistics = statistics;
this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(TASK_NAME_FORMAT + Thread.currentThread().getName())
.build());
}
public void addAll(List<AtlasTask> tasks) {
for (AtlasTask task : tasks) {
if (task == null) {
continue;
}
TASK_LOG.log(task);
this.executorService.submit(new TaskConsumer(task, this.registry, this.taskTypeFactoryMap, this.statistics));
}
}
@VisibleForTesting
void waitUntilDone() throws InterruptedException {
Thread.sleep(5000);
}
static class TaskConsumer implements Runnable {
private static final int MAX_ATTEMPT_COUNT = 3;
private final Map<String, TaskFactory> taskTypeFactoryMap;
private final TaskRegistry registry;
private final TaskManagement.Statistics statistics;
private final AtlasTask task;
public TaskConsumer(AtlasTask task, TaskRegistry registry, Map<String, TaskFactory> taskTypeFactoryMap, TaskManagement.Statistics statistics) {
this.task = task;
this.registry = registry;
this.taskTypeFactoryMap = taskTypeFactoryMap;
this.statistics = statistics;
}
@Override
public void run() {
AtlasVertex taskVertex = null;
int attemptCount;
try {
taskVertex = registry.getVertex(task.getGuid());
if (task == null || taskVertex == null || task.getStatus() == AtlasTask.Status.COMPLETE) {
TASK_LOG.warn("Task not scheduled as it was not found or status was COMPLETE!", task);
return;
}
statistics.increment(1);
attemptCount = task.getAttemptCount();
if (attemptCount >= MAX_ATTEMPT_COUNT) {
TASK_LOG.warn("Max retry count for task exceeded! Skipping!", task);
return;
}
performTask(taskVertex, task);
} catch (InterruptedException exception) {
if (task != null) {
registry.updateStatus(taskVertex, task);
TASK_LOG.error("{}: {}: Interrupted!", task, exception);
} else {
LOG.error("Interrupted!", exception);
}
statistics.error();
} catch (Exception exception) {
if (task != null) {
task.updateStatusFromAttemptCount();
registry.updateStatus(taskVertex, task);
TASK_LOG.error("Error executing task. Please perform the operation again!", task, exception);
} else {
LOG.error("Error executing. Please perform the operation again!", exception);
}
statistics.error();
} finally {
if (task != null) {
this.registry.commit();
TASK_LOG.log(task);
}
}
}
private void performTask(AtlasVertex taskVertex, AtlasTask task) throws Exception {
TaskFactory factory = taskTypeFactoryMap.get(task.getType());
if (factory == null) {
LOG.error("taskTypeFactoryMap does not contain task of type: {}", task.getType());
return;
}
AbstractTask runnableTask = factory.create(task);
runnableTask.run();
registry.deleteComplete(taskVertex, task);
statistics.successPrint();
}
}
static class TaskLogger {
private static final Logger LOG = LoggerFactory.getLogger("TASKS");
public static TaskLogger getLogger() {
return new TaskLogger();
}
public void info(String message) {
LOG.info(message);
}
public void log(AtlasTask task) {
LOG.info(AtlasType.toJson(task));
}
public void warn(String message, AtlasTask task) {
LOG.warn(message, AtlasType.toJson(task));
}
public void error(String s, AtlasTask task, Exception exception) {
LOG.error(s, AtlasType.toJson(task), exception);
}
}
}

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.tasks;
import org.apache.atlas.model.tasks.AtlasTask;
import java.util.List;
public interface TaskFactory {
/**
* Creates a concrete task using the task definition.
* @param atlasTask
* @return
*/
AbstractTask create(AtlasTask atlasTask);
List<String> getSupportedTypes();
}

View File

@ -0,0 +1,280 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.tasks;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@Component
@Order(7)
public class TaskManagement implements Service, ActiveStateChangeHandler {
private static final Logger LOG = LoggerFactory.getLogger(TaskManagement.class);
private final ThreadLocal<TaskExecutor> taskExecutorThreadLocal = new ThreadLocal<>();
private final Configuration configuration;
private final TaskRegistry registry;
private final Statistics statistics;
private final Map<String, TaskFactory> taskTypeFactoryMap;
@Inject
public TaskManagement(Configuration configuration, TaskRegistry taskRegistry) {
this.configuration = configuration;
this.registry = taskRegistry;
this.statistics = new Statistics();
this.taskTypeFactoryMap = new HashMap<>();
}
@VisibleForTesting
TaskManagement(Configuration configuration, TaskRegistry taskRegistry, TaskFactory taskFactory) {
this.configuration = configuration;
this.registry = taskRegistry;
this.statistics = new Statistics();
this.taskTypeFactoryMap = new HashMap<>();
createTaskTypeFactoryMap(taskTypeFactoryMap, taskFactory);
}
@Override
public void start() throws AtlasException {
if (configuration == null || !HAConfiguration.isHAEnabled(configuration)) {
startInternal();
} else {
LOG.info("TaskManagement.start(): deferring patches until instance activation");
}
}
@Override
public void stop() throws AtlasException {
LOG.info("TaskManagement: Stopped!");
}
@Override
public void instanceIsActive() throws AtlasException {
LOG.info("==> TaskManagement.instanceIsActive()");
startInternal();
LOG.info("<== TaskManagement.instanceIsActive()");
}
@Override
public void instanceIsPassive() throws AtlasException {
LOG.info("TaskManagement.instanceIsPassive(): no action needed");
}
@Override
public int getHandlerOrder() {
return HandlerOrder.TASK_MANAGEMENT.getOrder();
}
public void addFactory(TaskFactory taskFactory) {
createTaskTypeFactoryMap(this.taskTypeFactoryMap, taskFactory);
}
public AtlasTask createTask(String taskType, String createdBy, Map<String, Object> parameters) {
return this.registry.createVertex(taskType, createdBy, parameters);
}
public List<AtlasTask> getAll() {
return this.registry.getAll();
}
public void addAll(List<AtlasTask> tasks) {
if (CollectionUtils.isEmpty(tasks)) {
return;
}
dispatchTasks(tasks);
}
public AtlasTask getByGuid(String guid) throws AtlasBaseException {
try {
return this.registry.getById(guid);
} catch (Exception exception) {
LOG.error("Error: getByGuid: {}", guid);
throw new AtlasBaseException(exception);
}
}
public List<AtlasTask> getByGuids(List<String> guids) throws AtlasBaseException {
List<AtlasTask> ret = new ArrayList<>();
for (String guid : guids) {
AtlasTask task = getByGuid(guid);
if (task != null) {
ret.add(task);
}
}
return ret;
}
public void deleteByGuid(String guid) throws AtlasBaseException {
try {
this.registry.deleteByGuid(guid);
} catch (Exception exception) {
throw new AtlasBaseException(exception);
}
}
public void deleteByGuids(List<String> guids) throws AtlasBaseException {
if (CollectionUtils.isEmpty(guids)) {
return;
}
for (String guid : guids) {
this.registry.deleteByGuid(guid);
}
}
private void dispatchTasks(List<AtlasTask> tasks) {
if (CollectionUtils.isEmpty(tasks)) {
return;
}
if (this.taskExecutorThreadLocal.get() == null) {
this.taskExecutorThreadLocal.set(new TaskExecutor(registry, taskTypeFactoryMap, statistics));
}
this.taskExecutorThreadLocal.get().addAll(tasks);
this.statistics.print();
}
private void startInternal() {
if (AtlasConfiguration.TASKS_USE_ENABLED.getBoolean() == false) {
return;
}
LOG.info("TaskManagement: Started!");
}
public void queuePendingTasks() {
if (AtlasConfiguration.TASKS_USE_ENABLED.getBoolean() == false) {
return;
}
List<AtlasTask> pendingTasks = this.registry.getPendingTasks();
LOG.info("TaskManagement: Found: {}: Tasks in pending state.", pendingTasks.size());
addAll(pendingTasks);
}
@VisibleForTesting
static Map<String, TaskFactory> createTaskTypeFactoryMap(Map<String, TaskFactory> taskTypeFactoryMap, TaskFactory factory) {
List<String> supportedTypes = factory.getSupportedTypes();
if (CollectionUtils.isEmpty(supportedTypes)) {
LOG.warn("{}: Supported types returned empty!", factory.getClass());
return taskTypeFactoryMap;
}
for (String type : supportedTypes) {
taskTypeFactoryMap.put(type, factory);
}
return taskTypeFactoryMap;
}
static class Statistics {
private static final TaskExecutor.TaskLogger logger = TaskExecutor.TaskLogger.getLogger();
private static final long REPORT_FREQUENCY = 30000L;
private final AtomicInteger total = new AtomicInteger(0);
private final AtomicInteger countSinceLastCheck = new AtomicInteger(0);
private final AtomicInteger totalWithErrors = new AtomicInteger(0);
private final AtomicInteger totalSucceed = new AtomicInteger(0);
private long lastCheckTime = System.currentTimeMillis();
public void error() {
this.countSinceLastCheck.incrementAndGet();
this.totalWithErrors.incrementAndGet();
}
public void success() {
this.countSinceLastCheck.incrementAndGet();
this.totalSucceed.incrementAndGet();
}
public void increment() {
increment(1);
}
public void increment(int delta) {
this.total.addAndGet(delta);
this.countSinceLastCheck.addAndGet(delta);
}
public void print() {
long now = System.currentTimeMillis();
long diff = now - this.lastCheckTime;
if (diff < REPORT_FREQUENCY) {
return;
}
logger.info(String.format("TaskManagement: Processing stats: total=%d, sinceLastStatsReport=%d completedWithErrors=%d, succeded=%d",
this.total.get(), this.countSinceLastCheck.getAndSet(0),
this.totalWithErrors.get(), this.totalSucceed.get()));
this.lastCheckTime = now;
}
public void successPrint() {
success();
print();
}
@VisibleForTesting
int getTotal() {
return this.total.get();
}
@VisibleForTesting
int getTotalSuccess() {
return this.totalSucceed.get();
}
@VisibleForTesting
int getTotalError() {
return this.totalWithErrors.get();
}
}
}

View File

@ -0,0 +1,235 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.tasks;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasJson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.repository.Constants.TASK_GUID;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
@Component
public class TaskRegistry {
private static final Logger LOG = LoggerFactory.getLogger(TaskRegistry.class);
private AtlasGraph graph;
@Inject
public TaskRegistry(AtlasGraph graph) {
this.graph = graph;
}
@GraphTransaction
public AtlasTask save(AtlasTask task) {
AtlasVertex vertex = createVertex(task);
return toAtlasTask(vertex);
}
@GraphTransaction
public List<AtlasTask> getPendingTasks() {
List<AtlasTask> ret = new ArrayList<>();
try {
AtlasGraphQuery query = graph.query()
.has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
.has(Constants.TASK_STATUS, AtlasTask.Status.PENDING)
.orderBy(Constants.TASK_CREATED_TIME, AtlasGraphQuery.SortOrder.ASC);
Iterator<AtlasVertex> results = query.vertices().iterator();
while (results.hasNext()) {
AtlasVertex vertex = results.next();
ret.add(toAtlasTask(vertex));
}
} catch (Exception exception) {
LOG.error("Error fetching pending tasks!", exception);
} finally {
graph.commit();
}
return ret;
}
@GraphTransaction
public void updateStatus(AtlasVertex taskVertex, AtlasTask task) {
if (taskVertex == null) {
return;
}
setEncodedProperty(taskVertex, Constants.TASK_ATTEMPT_COUNT, task.getAttemptCount());
setEncodedProperty(taskVertex, Constants.TASK_STATUS, task.getStatus().toString());
setEncodedProperty(taskVertex, Constants.TASK_UPDATED_TIME, System.currentTimeMillis());
setEncodedProperty(taskVertex, Constants.TASK_ERROR_MESSAGE, task.getErrorMessage());
}
@GraphTransaction
public void deleteByGuid(String guid) throws AtlasBaseException {
try {
AtlasGraphQuery query = graph.query()
.has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
.has(TASK_GUID, guid);
Iterator<AtlasVertex> results = query.vertices().iterator();
if (results.hasNext()) {
graph.removeVertex(results.next());
}
} catch (Exception exception) {
LOG.error("Error: deletingByGuid: {}", guid);
throw new AtlasBaseException(exception);
}
}
@GraphTransaction
public void deleteComplete(AtlasVertex taskVertex, AtlasTask task) {
updateStatus(taskVertex, task);
deleteVertex(taskVertex);
}
@GraphTransaction
public AtlasTask getById(String guid) {
AtlasGraphQuery query = graph.query()
.has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
.has(TASK_GUID, guid);
Iterator<AtlasVertex> results = query.vertices().iterator();
return results.hasNext() ? toAtlasTask(results.next()) : null;
}
@GraphTransaction
public AtlasVertex getVertex(String taskGuid) {
AtlasGraphQuery query = graph.query().has(Constants.TASK_GUID, taskGuid);
Iterator<AtlasVertex> results = query.vertices().iterator();
return results.hasNext() ? results.next() : null;
}
@GraphTransaction
public List<AtlasTask> getAll() {
List<AtlasTask> ret = new ArrayList<>();
AtlasGraphQuery query = graph.query()
.has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
.orderBy(Constants.TASK_CREATED_TIME, AtlasGraphQuery.SortOrder.ASC);
Iterator<AtlasVertex> results = query.vertices().iterator();
while (results.hasNext()) {
ret.add(toAtlasTask(results.next()));
}
return ret;
}
public void commit() {
this.graph.commit();
}
public AtlasTask createVertex(String taskType, String createdBy, Map<String, Object> parameters) {
AtlasTask ret = new AtlasTask(taskType, createdBy, parameters);
createVertex(ret);
return ret;
}
private void deleteVertex(AtlasVertex taskVertex) {
if (taskVertex == null) {
return;
}
graph.removeVertex(taskVertex);
}
private AtlasTask toAtlasTask(AtlasVertex v) {
AtlasTask ret = new AtlasTask();
ret.setGuid(v.getProperty(Constants.TASK_GUID, String.class));
ret.setType(v.getProperty(Constants.TASK_TYPE, String.class));
ret.setStatus(v.getProperty(Constants.TASK_STATUS, String.class));
ret.setCreatedBy(v.getProperty(Constants.TASK_CREATED_BY, String.class));
ret.setCreatedTime(new Date(v.getProperty(Constants.TASK_CREATED_TIME, Long.class)));
ret.setUpdatedTime(new Date(v.getProperty(Constants.TASK_UPDATED_TIME, Long.class)));
Long startTime = v.getProperty(Constants.TASK_START_TIME, Long.class);
if (startTime != null) {
ret.setStartTime(new Date(startTime));
}
Long endTime = v.getProperty(Constants.TASK_END_TIME, Long.class);
if (endTime != null) {
ret.setEndTime(new Date(endTime));
}
String parametersJson = v.getProperty(Constants.TASK_PARAMETERS, String.class);
ret.setParameters(AtlasType.fromJson(parametersJson, Map.class));
ret.setAttemptCount(v.getProperty(Constants.TASK_ATTEMPT_COUNT, Integer.class));
ret.setErrorMessage(v.getProperty(Constants.TASK_ERROR_MESSAGE, String.class));
return ret;
}
private AtlasVertex createVertex(AtlasTask task) {
AtlasVertex ret = graph.addVertex();
setEncodedProperty(ret, Constants.TASK_GUID, task.getGuid());
setEncodedProperty(ret, Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME);
setEncodedProperty(ret, Constants.TASK_STATUS, task.getStatus().toString());
setEncodedProperty(ret, Constants.TASK_TYPE, task.getType());
setEncodedProperty(ret, Constants.TASK_CREATED_BY, task.getCreatedBy());
setEncodedProperty(ret, Constants.TASK_CREATED_TIME, task.getCreatedTime());
setEncodedProperty(ret, Constants.TASK_UPDATED_TIME, task.getUpdatedTime());
if (task.getStartTime() != null) {
setEncodedProperty(ret, Constants.TASK_START_TIME, task.getStartTime().getTime());
}
if (task.getEndTime() != null) {
setEncodedProperty(ret, Constants.TASK_END_TIME, task.getEndTime().getTime());
}
setEncodedProperty(ret, Constants.TASK_PARAMETERS, AtlasJson.toJson(task.getParameters()));
setEncodedProperty(ret, Constants.TASK_ATTEMPT_COUNT, task.getAttemptCount());
setEncodedProperty(ret, Constants.TASK_ERROR_MESSAGE, task.getErrorMessage());
return ret;
}
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.atlas;
package org.apache.atlas.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;

View File

@ -64,9 +64,11 @@ import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier;
import org.apache.atlas.repository.store.graph.v2.tasks.ClassificationPropagateTaskFactory;
import org.apache.atlas.runner.LocalSolrRunner;
import org.apache.atlas.service.Service;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.AtlasRepositoryConfiguration;
import org.apache.atlas.util.SearchTracker;
@ -183,7 +185,11 @@ public class TestModules {
// Glossary related bindings
bind(GlossaryService.class).asEagerSingleton();
final GraphTransactionInterceptor graphTransactionInterceptor = new GraphTransactionInterceptor(new AtlasGraphProvider().get());
// TaskManagement
bind(TaskManagement.class).asEagerSingleton();
bind(ClassificationPropagateTaskFactory.class).asEagerSingleton();
final GraphTransactionInterceptor graphTransactionInterceptor = new GraphTransactionInterceptor(new AtlasGraphProvider().get(), null);
requestInjection(graphTransactionInterceptor);
bindInterceptor(Matchers.any(), Matchers.annotatedWith(GraphTransaction.class), graphTransactionInterceptor);
}

View File

@ -152,7 +152,7 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
assertEquals(entityHeaders.size(), 1);
assertEquals(entityHeaders.size(), 11);
}

View File

@ -0,0 +1,227 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.tagpropagation;
import org.apache.atlas.RequestContext;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.exception.EntityNotFoundException;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.AtlasTestBase;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.repository.impexp.ZipFileResourceTestUtils;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.lang.StringUtils;
import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static org.apache.atlas.utils.TestLoadModelUtils.loadModelFromJson;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ClassificationPropagationWithTasksTest extends AtlasTestBase {
private static final String IMPORT_FILE = "tag-propagation-data.zip";
private static final String HDFS_PATH_EMPLOYEES = "a3955120-ac17-426f-a4af-972ec8690e5f";
@Inject
private AtlasTypeDefStore typeDefStore;
@Inject
private AtlasTypeRegistry typeRegistry;
@Inject
private AtlasEntityStore entityStore;
@Inject
private ImportService importService;
@Inject
private EntityGraphMapper entityGraphMapper;
@Inject
private TaskManagement tasksManagement;
@BeforeClass
public void setup() throws Exception {
RequestContext.clear();
super.initialize();
this.tasksManagement.start();
entityGraphMapper.setTasksUseFlag(true);
loadModelFilesAndImportTestData();
}
private void loadModelFilesAndImportTestData() {
try {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
loadModelFromJson("1000-Hadoop/1020-fs_model.json", typeDefStore, typeRegistry);
loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry);
loadSampleClassificationDefs();
runImportWithNoParameters(importService, getZipSource(IMPORT_FILE));
} catch (AtlasBaseException | IOException e) {
throw new SkipException("Model loading failed!");
}
}
private void loadSampleClassificationDefs() throws AtlasBaseException {
AtlasClassificationDef tagX = new AtlasClassificationDef("tagX");
AtlasClassificationDef tagY = new AtlasClassificationDef("tagY");
typeDefStore.createTypesDef(new AtlasTypesDef(Collections.emptyList(), Collections.emptyList(),
Arrays.asList(tagX, tagY),
Collections.emptyList(), Collections.emptyList()));
}
public static InputStream getZipSource(String fileName) throws IOException {
return ZipFileResourceTestUtils.getFileInputStream(fileName);
}
@Test
public void parameterValidation() throws AtlasBaseException {
try {
entityGraphMapper.propagateClassification(null, null, null);
entityGraphMapper.propagateClassification("unknown", "abcd", "xyz");
}
catch (AtlasBaseException e) {
assertNotNull(e.getCause());
assertTrue(e.getCause() instanceof EntityNotFoundException);
}
List<String> ret = entityGraphMapper.propagateClassification(HDFS_PATH_EMPLOYEES, StringUtils.EMPTY, StringUtils.EMPTY);
assertNull(ret);
ret = entityGraphMapper.updateClassificationsPropagation(HDFS_PATH_EMPLOYEES, StringUtils.EMPTY, StringUtils.EMPTY);
assertNull(ret);
ret = entityGraphMapper.deleteClassificationPropagation(StringUtils.EMPTY);
assertNull(ret);
AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
ret = entityGraphMapper.propagateClassification(hdfs_employees.getGuid(), StringUtils.EMPTY, StringUtils.EMPTY);
assertNull(ret);
}
@Test
public void add() throws AtlasBaseException {
final String TAG_NAME_X = "tagX";
final String TAG_NAME_Y = "tagY";
AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
AtlasClassification tagX = new AtlasClassification(TAG_NAME_X);
tagX.setEntityGuid(hdfs_employees.getGuid());
tagX.setPropagate(true);
AtlasClassification tagY = new AtlasClassification(TAG_NAME_Y);
tagY.setEntityGuid(hdfs_employees.getGuid());
tagY.setPropagate(false);
entityStore.addClassification(Collections.singletonList(HDFS_PATH_EMPLOYEES), tagX);
entityStore.addClassification(Collections.singletonList(HDFS_PATH_EMPLOYEES), tagY);
AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(hdfs_employees.getGuid());
AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(entityVertex, TAG_NAME_X);
assertNotNull(entityVertex);
assertNotNull(classificationVertex);
AtlasEntity entityUpdated = getEntity(HDFS_PATH_EMPLOYEES);
assertNotNull(entityUpdated.getPendingTasks());
List<String> impactedEntities = entityGraphMapper.propagateClassification(hdfs_employees.getGuid(), classificationVertex.getId().toString(), StringUtils.EMPTY);
assertNotNull(impactedEntities);
}
@Test(dependsOnMethods = "add")
public void update() throws AtlasBaseException {
final String TAG_NAME_Y = "tagY";
AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
AtlasClassification tagY = new AtlasClassification(TAG_NAME_Y);
tagY.setEntityGuid(hdfs_employees.getGuid());
tagY.setPropagate(true);
entityStore.updateClassifications(hdfs_employees.getGuid(), Collections.singletonList(tagY));
AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(hdfs_employees.getGuid());
AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(entityVertex, TAG_NAME_Y);
assertNotNull(RequestContext.get().getQueuedTasks());
assertTrue(RequestContext.get().getQueuedTasks().size() > 0, "No tasks were queued!");
assertNotNull(entityVertex);
assertNotNull(classificationVertex);
List<String> impactedEntities = entityGraphMapper.updateClassificationsPropagation(hdfs_employees.getGuid(), classificationVertex.getId().toString(), StringUtils.EMPTY);
assertNotNull(impactedEntities);
}
@Test(dependsOnMethods = "update")
public void delete() throws AtlasBaseException {
final String TAG_NAME = "tagX";
AtlasEntity hdfs_employees = getEntity(HDFS_PATH_EMPLOYEES);
entityGraphMapper.propagateClassification(hdfs_employees.getGuid(), StringUtils.EMPTY, StringUtils.EMPTY);
AtlasClassification tagX = new AtlasClassification(TAG_NAME);
tagX.setEntityGuid(hdfs_employees.getGuid());
tagX.setPropagate(false);
AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(hdfs_employees.getGuid());
AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(entityVertex, TAG_NAME);
entityStore.deleteClassification(HDFS_PATH_EMPLOYEES, tagX.getTypeName());
assertNotNull(entityVertex);
assertNotNull(classificationVertex);
List<String> impactedEntities = entityGraphMapper.deleteClassificationPropagation(classificationVertex.getId().toString());
assertNotNull(impactedEntities);
}
private AtlasEntity getEntity(String entityGuid) throws AtlasBaseException {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = entityStore.getById(entityGuid);
return entityWithExtInfo.getEntity();
}
}

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.tasks;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class BaseTaskFixture {
protected static final String SPYING_TASK_ADD = "add";
protected static final String SPYING_TASK_ERROR_THROWING = "errorThrowingTask";
@Inject
protected AtlasGraph graph;
@Inject
protected TaskRegistry taskRegistry;
static class SpyConcreteTask extends AbstractTask {
private boolean taskPerformed;
public SpyConcreteTask(AtlasTask atlasTask) {
super(atlasTask);
}
@Override
public AtlasTask.Status perform() {
this.taskPerformed = true;
return AtlasTask.Status.COMPLETE;
}
public boolean taskPerformed() {
return this.taskPerformed;
}
}
static class SpyErrorThrowingTask extends AbstractTask {
private boolean taskPerformed;
public SpyErrorThrowingTask(AtlasTask atlasTask) {
super(atlasTask);
}
@Override
public AtlasTask.Status perform() {
this.taskPerformed = true;
throw new NullPointerException("SpyErrorThrowingTask: NullPointerException Encountered!");
}
public boolean taskPerformed() {
return this.taskPerformed;
}
}
static class SpyingFactory implements TaskFactory {
private SpyConcreteTask addTask;
private SpyErrorThrowingTask errorTask;
@Override
public AbstractTask create(AtlasTask atlasTask) {
switch (atlasTask.getType()) {
case "add":
addTask = new SpyConcreteTask(atlasTask);
return addTask;
case "errorThrowingTask":
errorTask = new SpyErrorThrowingTask(atlasTask);
return errorTask;
default:
return null;
}
}
@Override
public List<String> getSupportedTypes() {
return new ArrayList<String>() {{
add(SPYING_TASK_ADD);
add(SPYING_TASK_ERROR_THROWING);
}};
}
public SpyConcreteTask getAddTask() {
return this.addTask;
}
public SpyErrorThrowingTask getErrorTask() {
return this.errorTask;
}
}
protected AtlasTask createTask(TaskManagement taskManagement, String type) {
return taskManagement.createTask(type, "testUser", Collections.singletonMap("params", "params"));
}
}

View File

@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.tasks;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.commons.lang3.StringUtils;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import org.testng.Assert;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Guice(modules = TestModules.TestOnlyModule.class)
public class TaskExecutorTest extends BaseTaskFixture {
@Inject
private AtlasGraph graph;
@Inject
private TaskRegistry taskRegistry;
@Inject
private TaskManagement taskManagement;
@Test
public void noTasksExecuted() {
TaskManagementTest.SpyingFactory spyingFactory = new TaskManagementTest.SpyingFactory();
Map<String, TaskFactory> taskFactoryMap = new HashMap<>();
TaskManagement.createTaskTypeFactoryMap(new HashMap<>(), spyingFactory);
TaskManagement.Statistics statistics = new TaskManagement.Statistics();
new TaskExecutor(taskRegistry, taskFactoryMap, statistics);
Assert.assertEquals(statistics.getTotal(), 0);
}
@Test
public void tasksNotPersistedIsNotExecuted() throws InterruptedException {
TaskManagementTest.SpyingFactory spyingFactory = new TaskManagementTest.SpyingFactory();
Map<String, TaskFactory> taskFactoryMap = new HashMap<>();
TaskManagement.createTaskTypeFactoryMap(taskFactoryMap, spyingFactory);
TaskManagement.Statistics statistics = new TaskManagement.Statistics();
TaskExecutor taskExecutor = new TaskExecutor(taskRegistry, taskFactoryMap, statistics);
taskExecutor.addAll(Collections.singletonList(new AtlasTask(SPYING_TASK_ADD, "test", Collections.emptyMap())));
taskExecutor.waitUntilDone();
Assert.assertEquals(statistics.getTotal(), 0);
}
@Test
public void persistedIsExecuted() throws AtlasBaseException, InterruptedException {
TaskManagementTest.SpyingFactory spyingFactory = new TaskManagementTest.SpyingFactory();
Map<String, TaskFactory> taskFactoryMap = new HashMap<>();
TaskManagement.createTaskTypeFactoryMap(taskFactoryMap, spyingFactory);
AtlasTask addTask = taskManagement.createTask("add", "test", Collections.emptyMap());
AtlasTask errorThrowingTask = taskManagement.createTask("errorThrowingTask", "test", Collections.emptyMap());
TaskManagement.Statistics statistics = new TaskManagement.Statistics();
List<AtlasTask> tasks = new ArrayList<AtlasTask>() {{
add(addTask);
add(errorThrowingTask);
}};
graph.commit();
TaskExecutor taskExecutor = new TaskExecutor(taskRegistry, taskFactoryMap, statistics);
taskExecutor.addAll(tasks);
taskExecutor.waitUntilDone();
Assert.assertEquals(statistics.getTotal(), 2);
Assert.assertEquals(statistics.getTotalSuccess(), 1);
Assert.assertEquals(statistics.getTotalError(), 1);
Assert.assertNotNull(spyingFactory.getAddTask());
Assert.assertNotNull(spyingFactory.getErrorTask());
Assert.assertTrue(spyingFactory.getAddTask().taskPerformed());
Assert.assertTrue(spyingFactory.getErrorTask().taskPerformed());
assertTaskUntilFail(errorThrowingTask, taskExecutor);
}
private void assertTaskUntilFail(AtlasTask errorThrowingTask, TaskExecutor taskExecutor) throws AtlasBaseException, InterruptedException {
AtlasTask errorTaskFromDB = taskManagement.getByGuid(errorThrowingTask.getGuid());
Assert.assertNotNull(errorTaskFromDB);
Assert.assertTrue(StringUtils.isNotEmpty(errorTaskFromDB.getErrorMessage()));
Assert.assertEquals(errorTaskFromDB.getAttemptCount(), 1);
Assert.assertEquals(errorTaskFromDB.getStatus(), AtlasTask.Status.PENDING);
for (int i = errorTaskFromDB.getAttemptCount(); i <= AtlasTask.MAX_ATTEMPT_COUNT; i++) {
taskExecutor.addAll(Collections.singletonList(errorThrowingTask));
}
taskExecutor.waitUntilDone();
graph.commit();
Assert.assertEquals(errorThrowingTask.getStatus(), AtlasTask.Status.FAILED);
}
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.tasks;
import org.apache.atlas.AtlasException;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.tasks.AtlasTask;
import org.testng.Assert;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Guice(modules = TestModules.TestOnlyModule.class)
public class TaskManagementTest extends BaseTaskFixture {
private static class NullFactory implements TaskFactory {
@Override
public AbstractTask create(AtlasTask atlasTask) {
return null;
}
@Override
public List<String> getSupportedTypes() {
return null;
}
}
@Test
public void factoryReturningNullIsHandled() throws AtlasException {
TaskManagement taskManagement = new TaskManagement(null, taskRegistry, new NullFactory());
taskManagement.start();
}
@Test
public void taskSucceedsTaskVertexRemoved() throws AtlasException, InterruptedException, AtlasBaseException {
SpyingFactory spyingFactory = new SpyingFactory();
TaskManagement taskManagement = new TaskManagement(null, taskRegistry, spyingFactory);
taskManagement.start();
AtlasTask spyTask = createTask(taskManagement, SPYING_TASK_ADD);
AtlasTask spyTaskError = createTask(taskManagement, SPYING_TASK_ERROR_THROWING);
graph.commit();
taskManagement.addAll(Arrays.asList(spyTask, spyTaskError));
TimeUnit.SECONDS.sleep(5);
Assert.assertTrue(spyingFactory.getAddTask().taskPerformed());
Assert.assertTrue(spyingFactory.getErrorTask().taskPerformed());
AtlasTask task = taskManagement.getByGuid(spyTask.getGuid());
Assert.assertNull(task);
}
@Test
public void severalTaskAdds() throws AtlasException, InterruptedException {
int MAX_THREADS = 5;
TaskManagement taskManagement = new TaskManagement(null, taskRegistry);
taskManagement.start();
Thread[] threads = new Thread[MAX_THREADS];
for (int i = 0; i < MAX_THREADS; i++) {
threads[i] = new Thread(() -> {
try {
AtlasTask spyAdd = taskManagement.createTask(SPYING_TASK_ADD, "test", Collections.emptyMap());
AtlasTask spyErr = taskManagement.createTask(SPYING_TASK_ERROR_THROWING, "test", Collections.emptyMap());
taskManagement.addAll(Collections.singletonList(spyAdd));
taskManagement.addAll(Collections.singletonList(spyErr));
Thread.sleep(10000);
for (int j = 0; j <= AtlasTask.MAX_ATTEMPT_COUNT; j++) {
taskManagement.addAll(Collections.singletonList(spyErr));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
for (int i = 0; i < MAX_THREADS; i++) {
threads[i].start();
}
for (int i = 0; i < MAX_THREADS; i++) {
threads[i].join();
}
}
}

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.tasks;
import org.apache.atlas.AtlasException;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.testng.Assert;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import java.util.Collections;
import java.util.List;
@Guice(modules = TestModules.TestOnlyModule.class)
public class TaskRegistryTest {
@Inject
AtlasGraph graph;
@Inject
TaskRegistry registry;
@Test
public void basic() throws AtlasException, AtlasBaseException {
AtlasTask task = new AtlasTask("abcd", "test", Collections.singletonMap("p1", "p1"));
Assert.assertNull(registry.getById(task.getGuid()));
AtlasTask taskFromVertex = registry.save(task);
AtlasVertex taskVertex = registry.getVertex(task.getGuid());
Assert.assertEquals(taskFromVertex.getGuid(), task.getGuid());
Assert.assertEquals(taskFromVertex.getType(), task.getType());
Assert.assertEquals(taskFromVertex.getAttemptCount(), task.getAttemptCount());
Assert.assertEquals(taskFromVertex.getParameters(), task.getParameters());
Assert.assertEquals(taskFromVertex.getCreatedBy(), task.getCreatedBy());
taskFromVertex.incrementAttemptCount();
taskFromVertex.setStatusPending();
registry.updateStatus(taskVertex, taskFromVertex);
registry.commit();
taskFromVertex = registry.getById(task.getGuid());
Assert.assertNotNull(taskVertex);
Assert.assertEquals(taskFromVertex.getStatus(), AtlasTask.Status.PENDING);
Assert.assertEquals(taskFromVertex.getAttemptCount(), 1);
registry.deleteByGuid(taskFromVertex.getGuid());
try {
AtlasTask t = registry.getById(taskFromVertex.getGuid());
Assert.assertNull(t);
}
catch (IllegalStateException e) {
Assert.assertTrue(true, "Indicates vertex is deleted!");
}
}
@Test
public void pendingTasks() throws AtlasBaseException {
final int MAX_TASKS = 3;
final String TASK_TYPE_FORMAT = "abcd:%d";
for (int i = 0; i < MAX_TASKS; i++) {
AtlasTask task = new AtlasTask(String.format(TASK_TYPE_FORMAT, i), "test", Collections.singletonMap("p1", "p1"));
registry.save(task);
}
List<AtlasTask> pendingTasks = registry.getPendingTasks();
Assert.assertEquals(pendingTasks.size(), MAX_TASKS);
for (int i = 0; i < MAX_TASKS; i++) {
Assert.assertEquals(pendingTasks.get(i).getType(), String.format(TASK_TYPE_FORMAT, i));
registry.deleteByGuid(pendingTasks.get(i).getGuid());
}
graph.commit();
pendingTasks = registry.getPendingTasks();
Assert.assertEquals(pendingTasks.size(), 0);
}
}

View File

@ -23,6 +23,7 @@ import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
import org.apache.commons.lang.StringUtils;
@ -30,13 +31,13 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.atlas.model.instance.AtlasObjectId.KEY_GUID;
@ -61,6 +62,8 @@ public class RequestContext {
private final Set<String> entitiesToSkipUpdate = new HashSet<>();
private final Set<String> onlyCAUpdateEntities = new HashSet<>();
private final Set<String> onlyBAUpdateEntities = new HashSet<>();
private final List<AtlasTask> queuedTasks = new ArrayList<>();
private String user;
private Set<String> userGroups;
@ -122,6 +125,7 @@ public class RequestContext {
this.entitiesToSkipUpdate.clear();
this.onlyCAUpdateEntities.clear();
this.onlyBAUpdateEntities.clear();
this.queuedTasks.clear();
if (metrics != null && !metrics.isEmpty()) {
METRICS.debug(metrics.toString());
@ -446,6 +450,14 @@ public class RequestContext {
}
}
public void queueTask(AtlasTask task) {
queuedTasks.add(task);
}
public List<AtlasTask> getQueuedTasks() {
return this.queuedTasks;
}
public class EntityGuidPair {
private final Object entity;
private final String guid;

View File

@ -32,8 +32,8 @@ public interface ActiveStateChangeHandler {
TYPEDEF_STORE_INITIALIZER(2),
ATLAS_PATCH_SERVICE(3),
DEFAULT_METADATA_SERVICE(4),
NOTIFICATION_HOOK_CONSUMER(5);
NOTIFICATION_HOOK_CONSUMER(5),
TASK_MANAGEMENT(6);
private final int order;

View File

@ -48,10 +48,6 @@
<groupId>info.ganglia.gmetric4j</groupId>
<artifactId>gmetric4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>

View File

@ -30,3 +30,12 @@ log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%t]
# Set both appenders (file and console) on the root logger.
#log4j.rootLogger=INFO, file, console
log4j.rootLogger=ERROR, file
# Tasks log file
log4j.appender.TASKS=org.apache.log4j.RollingFileAppender
log4j.appender.TASKS.File=targettasks.log
log4j.appender.TASKS.Append=true
log4j.appender.TASKS.layout=org.apache.log4j.PatternLayout
log4j.appender.TASKS.layout.ConversionPattern=[%t] %d %x %m%n
log4j.appender.TASKS.layout.maxFileSize=1024MB
log4j.category.TASKS=INFO,TASKS

View File

@ -433,29 +433,19 @@
will be overridden by parameters in the request
-->
<!--
35x_t __guid
f0l_t __typeName
i6d_l __timestamp
jr9_t __state
lc5_t __classificationsText
mx1_t __classificationNames
iyt_l __modificationTimestamp
zk5_t __customAttributes
1151_t __labels
ohx_t __propagatedClassificationNames
3thh_s Asset.__s_name
3z0l_s Asset.__s_owner
3wn9_t Asset.description
3v2d_s Asset.__s_displayName
3xfp_s Asset.__s_userDescription
3mdh_t Referenceable.qualifiedName
c45h_t hive_serde.name
cb9h_t hive_process.queryText
c83p_t hive_process.userName
Steps to generate these keys:
- Add your fields to GraphBackedSearchIndexer.
- Run AtlasDiscoverServiceTest. Most likely the test will fail as the sequence of the fields
loaded from the models would have changed and hence the names would have changed.
- From typeRegistry, use typeRegistry.commonIndexFieldNameCache.values().stream().collect(Collectors.joining(" "))
to collect all the field names. Past them here in the 'qf' tag.
- Clean and build.
- Re-run the test. It should pass now.
Thanks to @pshah for the steps.
-->
<lst name="defaults">
<str name="defType">edismax</str>
<str name="qf">35x_t f0l_t i6d_l jr9_t lc5_t mx1_t iyt_l zk5_t 1151_t ohx_t 3thh_s 3z0l_s 3wn9_t 3v2d_s 3xfp_s 3mdh_t c45h_t cb9h_t c83p_t</str>
<str name="qf">35x_t 5j9_t 7wl_t a9x_t but_t dfp_l f0l_t i6d_l iyt_l jr9_t kjp_s lc5_t m4l_s mx1_t ohx_t xz9_i 1151_t 12px_t 14at_t 16o5_t 1891_t 19tx_t 1bet_t 1czp_t 1fd1_t 1gxx_l 1iit_t 3v2d_t 47ph_s 464l_s 43r9_s 426d_s 45c5_t 4d8l_t 49ad_t 4av9_l 4bnp_t 4hz9_t 4ged_t 4jk5_t 4qo5_t 4w79_t 50xx_t 4ykl_t 4zd1_t 4xs5_t 579h_t 56h1_t 5f5x_l 5gqt_l 5dl1_t 5c05_t 5ibp_t 581x_t 58ud_t 5af9_t 5slh_t 5tdx_l 5pfp_t 5m9x_l 5nut_l 5j45_t 5r0l_t 8ikl_t 8ao5_t 8k5h_t 8g79_t 8jd1_t 8hs5_t 8tmt_t 8lqd_t 8w05_t 8r9h_t 8sud_t 8v7p_t 95hh_t 8xl1_t 9345_t 94p1_t cnwl_t cgsl_t clj9_t cn45_t cjyd_t cf7p_l cidh_t ckqt_t cg05_l cphh_l cu85_t cv0l_t cop1_l cq9x_t cr2d_i ctfp_t cy6d_t cxdx_t cyyt_t d0jp_i d4hx_t d62t_t da11_t dath_i d8g5_t dced_t dblx_t dh51_t djid_t derp_t dhxh_t dgcl_t dd6t_l dipx_t ddz9_l f56t_t fh1h_l fimd_i fbid_t fjet_i fk79_i fkzp_f frb9_d fqit_l</str>
<str name="hl.fl">*</str>
<bool name="hl.requireFieldMatch">true</bool>
<bool name="lowercaseOperators">true</bool>

View File

@ -55,9 +55,11 @@ import org.apache.atlas.repository.impexp.ExportService;
import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.repository.impexp.MigrationProgressService;
import org.apache.atlas.repository.impexp.ZipSink;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.patches.AtlasPatchManager;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.services.MetricsService;
import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.SearchTracker;
@ -78,6 +80,7 @@ import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.servlet.http.HttpServletRequest;
@ -135,7 +138,7 @@ public class AdminResource {
private static final String UI_DATE_FORMAT = "atlas.ui.date.format";
private static final String UI_DATE_DEFAULT_FORMAT = "MM/DD/YYYY hh:mm:ss A";
private static final String OPERATION_STATUS = "operationStatus";
private static final List TIMEZONE_LIST = Arrays.asList(TimeZone.getAvailableIDs());
private static final List TIMEZONE_LIST = Arrays.asList(TimeZone.getAvailableIDs());
@Context
private HttpServletRequest httpServletRequest;
@ -155,6 +158,7 @@ public class AdminResource {
private final MigrationProgressService migrationProgressService;
private final ReentrantLock importExportOperationLock;
private final ExportImportAuditService exportImportAuditService;
private final TaskManagement taskManagement;
private final AtlasServerService atlasServerService;
private final AtlasEntityStore entityStore;
private final AtlasPatchManager patchManager;
@ -178,7 +182,8 @@ public class AdminResource {
MigrationProgressService migrationProgressService,
AtlasServerService serverService,
ExportImportAuditService exportImportAuditService, AtlasEntityStore entityStore,
AtlasPatchManager patchManager, AtlasAuditService auditService, EntityAuditRepository auditRepository) {
AtlasPatchManager patchManager, AtlasAuditService auditService, EntityAuditRepository auditRepository,
TaskManagement taskManagement) {
this.serviceState = serviceState;
this.metricsService = metricsService;
this.exportService = exportService;
@ -193,6 +198,7 @@ public class AdminResource {
this.patchManager = patchManager;
this.auditService = auditService;
this.auditRepository = auditRepository;
this.taskManagement = taskManagement;
if (atlasProperties != null) {
this.defaultUIVersion = atlasProperties.getString(DEFAULT_UI_VERSION, UI_VERSION_V2);
@ -205,6 +211,11 @@ public class AdminResource {
}
}
@PostConstruct
public void init() {
taskManagement.queuePendingTasks();
}
/**
* Fetches the thread stack dump for this application.
*
@ -737,6 +748,22 @@ public class AdminResource {
return ret;
}
@GET
@Path("/tasks")
@Produces(Servlets.JSON_MEDIA_TYPE)
public List<AtlasTask> getTaskStatus(@QueryParam("guids") List<String> guids) throws AtlasBaseException {
return CollectionUtils.isNotEmpty(guids) ? taskManagement.getByGuids(guids) : taskManagement.getAll();
}
@DELETE
@Path("/tasks")
@Produces(Servlets.JSON_MEDIA_TYPE)
public void deleteTask(@QueryParam("guids") List<String> guids) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(guids)) {
taskManagement.deleteByGuids(guids);
}
}
private String getEditableEntityTypes(Configuration config) {
String ret = DEFAULT_EDITABLE_ENTITY_TYPES;

View File

@ -20,12 +20,10 @@ package org.apache.atlas.web.service;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.BeanUtil;
import org.apache.atlas.RequestContext;
import org.apache.atlas.util.BeanUtil;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.AtlasAuditEntry;
import org.apache.atlas.repository.audit.AtlasAuditService;
import org.apache.commons.lang.StringUtils;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
@ -37,8 +35,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

View File

@ -51,7 +51,7 @@ public class AdminResourceTest {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null);
Response response = adminResource.getStatus();
assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
JsonNode entity = AtlasJson.parseToV1JsonNode((String) response.getEntity());
@ -62,7 +62,7 @@ public class AdminResourceTest {
public void testResourceGetsValueFromServiceState() throws IOException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null);
Response response = adminResource.getStatus();
verify(serviceState).getState();

View File

@ -127,3 +127,6 @@ atlas.authentication.method.kerberos=false
######### Gremlin Search Configuration #########
# Set to false to disable gremlin search.
atlas.search.gremlin.enable=true
######### Configure use of Tasks #########
atlas.tasks.enabled=false

View File

@ -1,2 +1,2 @@
TypeName,UniqueAttributeValue,BusinessAttributeName,BusinessAttributeValue,UniqueAttributeName[optional]
hive_table_v2,tableqZPo3C488c,bmWithAllTypes.attr8,"Awesome Attribute 1",qualifiedName
hive_table_v2,tableY2Q72pP2Do,bmWithAllTypes.attr8,"Awesome Attribute 1",qualifiedName

1 TypeName UniqueAttributeValue BusinessAttributeName BusinessAttributeValue UniqueAttributeName[optional]
2 hive_table_v2 tableqZPo3C488c tableY2Q72pP2Do bmWithAllTypes.attr8 Awesome Attribute 1 qualifiedName