ATLAS-4181: Provide option to add mandatory attribute to existing entity definition

Signed-off-by: Madhan Neethiraj <madhan@apache.org>
This commit is contained in:
Radhika Kundam 2021-03-05 09:33:48 -08:00 committed by Madhan Neethiraj
parent 33f7fcbb68
commit d653cea3c2
6 changed files with 279 additions and 4 deletions

View File

@ -202,6 +202,11 @@ public final class Constants {
public static final String ATTR_NAME_REPLICATED_FROM = "replicatedFrom";
public static final Integer INCOMPLETE_ENTITY_VALUE = Integer.valueOf(1);
/*
* typedef patch constants
*/
public static final String TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE = "ADD_MANDATORY_ATTRIBUTE";
/*
* All supported file-format extensions for Bulk Imports through file upload
*/

View File

@ -200,7 +200,7 @@ public class AtlasSolrQueryBuilderTest {
processSearchParameters(fileName, underTest);
Assert.assertEquals(underTest.build(), "+t10 AND -__state_index:DELETED AND +__typeName__index:(hive_table ) AND ( ( +created__index:[ * TO100} ) )");
Assert.assertEquals(underTest.build(), "+t10 AND -__state_index:DELETED AND +__typeName__index:(hive_table ) AND ( ( +created__index:[ * TO 100} ) )");
}
@Test

View File

@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.patches;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import java.util.Iterator;
import java.util.Set;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
public class AddMandatoryAttributesPatch extends AtlasPatchHandler {
private static final Logger LOG = LoggerFactory.getLogger(AddMandatoryAttributesPatch.class);
private static final String PATCH_ID = "JAVA_PATCH_0000_008";
private static final String PATCH_DESCRIPTION = "Add mandatory attributes for all existing entities for given typeName";
private final PatchContext context;
private final String typeName;
private final List<AtlasAttributeDef> attributesToAdd;
public AddMandatoryAttributesPatch(PatchContext context, String typedefPatchId, String typeName, List<AtlasAttributeDef> attributesToAdd) {
super(context.getPatchRegistry(), PATCH_ID + "_" + typedefPatchId, PATCH_DESCRIPTION);
this.context = context;
this.typeName = typeName;
this.attributesToAdd = attributesToAdd;
}
@Override
public void apply() throws AtlasBaseException {
LOG.info("==> MandatoryAttributePatch.apply(): patchId={}", getPatchId());
ConcurrentPatchProcessor patchProcessor = new AddMandatoryAttributesPatchProcessor(context, typeName, attributesToAdd);
patchProcessor.apply();
setStatus(APPLIED);
LOG.info("<== MandatoryAttributePatch.apply(): patchId={}, status={}", getPatchId(), getStatus());
}
public static class AddMandatoryAttributesPatchProcessor extends ConcurrentPatchProcessor {
private final String typeName;
private final Set<String> typeAndAllSubTypes;
private final List<AtlasAttributeDef> attributesToAdd;
public AddMandatoryAttributesPatchProcessor(PatchContext context, String typeName, List<AtlasAttributeDef> attributesToAdd) {
super(context);
AtlasEntityType entityType = getTypeRegistry().getEntityTypeByName(typeName);
this.typeName = typeName;
this.attributesToAdd = attributesToAdd;
if (entityType != null) {
this.typeAndAllSubTypes = entityType.getTypeAndAllSubTypes();
} else {
LOG.warn("AddMandatoryAttributesPatchProcessor(): failed to find entity-type {}", typeName);
this.typeAndAllSubTypes = Collections.emptySet();
}
}
@Override
public void submitVerticesToUpdate(WorkItemManager manager) {
if (CollectionUtils.isNotEmpty(typeAndAllSubTypes)) {
LOG.info("Entity types to be updated with mandatory attributes: {}", typeAndAllSubTypes.size());
for (String typeName : typeAndAllSubTypes) {
LOG.info("finding entities of type {}", typeName);
AtlasGraph graph = getGraph();
Iterable<Object> vertexIds = graph.query().has(ENTITY_TYPE_PROPERTY_KEY, typeName).vertexIds();
int count = 0;
for (Iterator<Object> iterator = vertexIds.iterator(); iterator.hasNext(); ) {
Object vertexId = iterator.next();
manager.checkProduce(vertexId);
count++;
}
LOG.info("found {} entities of type {}", count, typeName);
}
}
}
@Override
protected void processVertexItem(Long vertexId, AtlasVertex vertex, String typeName, AtlasEntityType entityType) {
if (LOG.isDebugEnabled()) {
LOG.debug("==> AddMandatoryAttributesPatchProcessor.processVertexItem(typeName={}, vertexId={})", typeName, vertexId);
}
for (AtlasAttributeDef attributeDef : attributesToAdd) {
AtlasAttribute attribute = entityType.getAttribute(attributeDef.getName());
if (attribute != null) {
Object existingValue = vertex.getProperty(attribute.getVertexPropertyName(), Object.class);
if (existingValue == null) {
vertex.setProperty(attribute.getVertexPropertyName(), attributeDef.getDefaultValue());
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== AddMandatoryAttributesPatchProcessor.processVertexItem(typeName={}, vertexId={})", typeName, vertexId);
}
}
@Override
protected void prepareForExecution() {
//do nothing
}
}
}

View File

@ -28,6 +28,7 @@ import org.apache.atlas.authorize.AtlasAuthorizerFactory;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasBusinessMetadataDef;
@ -41,8 +42,10 @@ import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.patches.AddMandatoryAttributesPatch;
import org.apache.atlas.repository.patches.SuperTypesUpdatePatch;
import org.apache.atlas.repository.patches.AtlasPatchManager;
import org.apache.atlas.repository.patches.AtlasPatchRegistry;
@ -454,7 +457,8 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
new UpdateTypeDefOptionsPatchHandler(typeDefStore, typeRegistry),
new SetServiceTypePatchHandler(typeDefStore, typeRegistry),
new UpdateAttributeMetadataHandler(typeDefStore, typeRegistry),
new AddSuperTypePatchHandler(typeDefStore, typeRegistry)
new AddSuperTypePatchHandler(typeDefStore, typeRegistry),
new AddMandatoryAttributePatchHandler(typeDefStore, typeRegistry)
};
Map<String, PatchHandler> patchHandlerRegistry = new HashMap<>();
@ -787,6 +791,113 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
}
}
class AddMandatoryAttributePatchHandler extends PatchHandler {
public AddMandatoryAttributePatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) {
super(typeDefStore, typeRegistry, new String[] { Constants.TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE });
}
@Override
public PatchStatus applyPatch(TypeDefPatch patch) throws AtlasBaseException {
String typeName = patch.getTypeName();
AtlasBaseTypeDef typeDef = typeRegistry.getTypeDefByName(typeName);
PatchStatus ret;
if (typeDef == null) {
throw new AtlasBaseException(AtlasErrorCode.PATCH_FOR_UNKNOWN_TYPE, patch.getAction(), typeName);
}
if (isPatchApplicable(patch, typeDef)) {
List<AtlasAttributeDef> attributesToAdd = getAttributesToAdd(patch, (AtlasStructDef) typeDef);
if (CollectionUtils.isEmpty(attributesToAdd)) {
LOG.info("patch skipped: typeName={}; mandatory attributes are not valid in patch {}",patch.getTypeName(), patch.getId());
ret = SKIPPED;
} else {
try {
RequestContext.get().setInTypePatching(true);
RequestContext.get().setCurrentTypePatchAction(Constants.TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE);
if (typeDef.getClass().equals(AtlasEntityDef.class)) {
AtlasEntityDef updatedDef = new AtlasEntityDef((AtlasEntityDef) typeDef);
updateTypeDefWithPatch(patch, updatedDef, attributesToAdd);
typeDefStore.updateEntityDefByName(typeName, updatedDef);
} else if (typeDef.getClass().equals(AtlasClassificationDef.class)) {
AtlasClassificationDef updatedDef = new AtlasClassificationDef((AtlasClassificationDef) typeDef);
updateTypeDefWithPatch(patch, updatedDef, attributesToAdd);
typeDefStore.updateClassificationDefByName(typeName, updatedDef);
} else if (typeDef.getClass().equals(AtlasStructDef.class)) {
AtlasStructDef updatedDef = new AtlasStructDef((AtlasStructDef) typeDef);
updateTypeDefWithPatch(patch, updatedDef, attributesToAdd);
typeDefStore.updateStructDefByName(typeName, updatedDef);
} else {
throw new AtlasBaseException(AtlasErrorCode.PATCH_NOT_APPLICABLE_FOR_TYPE, patch.getAction(), typeDef.getClass().getSimpleName());
}
LOG.info("adding a Java patch to update entities of {} with new mandatory attributes", typeName);
// Java patch handler to add mandatory attributes
patchManager.addPatchHandler(new AddMandatoryAttributesPatch(patchManager.getContext(), patch.getId(), typeName, attributesToAdd));
ret = APPLIED;
} finally {
RequestContext.get().setInTypePatching(false);
RequestContext.clear();
}
}
} else {
LOG.info("patch skipped: typeName={}; applyToVersion={}; updateToVersion={}",
patch.getTypeName(), patch.getApplyToVersion(), patch.getUpdateToVersion());
ret = SKIPPED;
}
return ret;
}
// Validate mandatory attribute with non-empty default value if PRIMITIVE, not unique and doesn't exists
private List<AtlasAttributeDef> getAttributesToAdd(TypeDefPatch patch, AtlasStructDef updatedDef) throws AtlasBaseException {
List<AtlasAttributeDef> ret = new ArrayList<>();
for (AtlasAttributeDef attributeDef : patch.getAttributeDefs()) {
TypeCategory attributeType = typeRegistry.getType(attributeDef.getTypeName()).getTypeCategory();
if (updatedDef.hasAttribute(attributeDef.getName())) {
LOG.warn("AddMandatoryAttributePatchHandler(id={}, typeName={}, attribute={}): already exists in type {}. Ignoring attribute", patch.getId(), patch.getTypeName(), attributeDef.getName(), updatedDef.getName());
} else if (attributeDef.getIsOptional()) {
LOG.warn("AddMandatoryAttributePatchHandler(id={}, typeName={}, attribute={}): is not mandatory attribute. Ignoring attribute", patch.getId(), patch.getTypeName(), attributeDef.getName());
} else if (StringUtils.isEmpty(attributeDef.getDefaultValue())) {
LOG.warn("AddMandatoryAttributePatchHandler(id={}, typeName={}, attribute={}): default value is missing. Ignoring attribute", patch.getId(), patch.getTypeName(), attributeDef.getName());
} else if (!TypeCategory.PRIMITIVE.equals(attributeType)) {
LOG.warn("AddMandatoryAttributePatchHandler(id={}, typeName={}, attribute={}): type {} is not primitive. Ignoring attribute", patch.getId(), patch.getTypeName(), attributeDef.getName(), attributeDef.getTypeName());
} else if (attributeDef.getIsUnique()) {
LOG.warn("AddMandatoryAttributePatchHandler(id={}, typeName={}, attribute={}): is not unique. Ignoring attribute", patch.getId(), patch.getTypeName(), attributeDef.getName());
} else {
ret.add(attributeDef);
}
}
return ret;
}
private void updateTypeDefWithPatch(TypeDefPatch patch, AtlasStructDef updatedDef, List<AtlasAttributeDef> attributesToAdd) {
for (AtlasAttributeDef attributeDef : attributesToAdd) {
updatedDef.addAttribute(attributeDef);
}
updatedDef.setTypeVersion(patch.getUpdateToVersion());
}
}
class UpdateAttributePatchHandler extends PatchHandler {
public UpdateAttributePatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) {
super(typeDefStore, typeRegistry, new String[] { "UPDATE_ATTRIBUTE" });

View File

@ -442,8 +442,8 @@ public class AtlasStructDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasStructDe
continue;
}
// new attribute - only allow if optional
if (!attributeDef.getIsOptional()) {
// new attribute - allow optional by default or allow mandatory only with typedef patch ADD_MANDATORY_ATTRIBUTE
if (!attributeDef.getIsOptional() && !isInAddMandatoryAttributePatch()) {
throw new AtlasBaseException(AtlasErrorCode.CANNOT_ADD_MANDATORY_ATTRIBUTE, structDef.getName(), attributeDef.getName());
}
}
@ -470,6 +470,11 @@ public class AtlasStructDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasStructDe
AtlasGraphUtilsV2.setEncodedProperty(vertex, encodedStructDefPropertyKey, attrNames);
}
public static boolean isInAddMandatoryAttributePatch() {
return RequestContext.get().isInTypePatching() &&
StringUtils.equals(Constants.TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE, RequestContext.get().getCurrentTypePatchAction());
}
public static void updateVertexAddReferences(AtlasStructDef structDef, AtlasVertex vertex,
AtlasTypeDefGraphStoreV2 typeDefStore) throws AtlasBaseException {
for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) {

View File

@ -75,6 +75,7 @@ public class RequestContext {
private boolean isInTypePatching = false;
private boolean createShellEntityForNonExistingReference = false;
private boolean skipFailedEntities = false;
private String currentTypePatchAction = "";
private RequestContext() {
}
@ -237,6 +238,14 @@ public class RequestContext {
this.skipFailedEntities = skipFailedEntities;
}
public String getCurrentTypePatchAction() {
return currentTypePatchAction;
}
public void setCurrentTypePatchAction(String currentTypePatchAction) {
this.currentTypePatchAction = currentTypePatchAction;
}
public void recordEntityUpdate(AtlasEntityHeader entity) {
if (entity != null && entity.getGuid() != null && ! entitiesToSkipUpdate.contains(entity.getGuid())) {
updatedEntities.put(entity.getGuid(), entity);