ATLAS-4903 : When migration restarts it results into deletion of edges and vertices

Signed-off-by: Pinal Shah <pinal.shah@freestoneinfotech.com>
This commit is contained in:
chaitali 2024-09-18 23:18:20 +05:30 committed by Pinal Shah
parent 9999bed010
commit c6ceaa3454
5 changed files with 24 additions and 8 deletions

View File

@ -1458,7 +1458,7 @@ public class EntityGraphMapper {
}
if (isReference && !isSoftReference) {
boolean isAppendOnPartialUpdate = getAppendOptionForRelationship(ctx.getReferringVertex(), attribute.getName());
boolean isAppendOnPartialUpdate = RequestContext.get().isMigrationInProgress() || getAppendOptionForRelationship(ctx.getReferringVertex(), attribute.getName());
if (isAppendOnPartialUpdate) {
allArrayElements = unionCurrentAndNewElements(attribute, (List) currentElements, (List) newElementsCreated);

View File

@ -106,10 +106,13 @@ public class MigrationImport extends ImportStrategy {
int batchSize = importResult.getRequest().getOptionKeyBatchSize();
int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
boolean isMigrationImport = false;
if (importResult.getRequest().getOptions().get("migration")!=null) {
isMigrationImport = Boolean.valueOf(importResult.getRequest().getOptions().get("migration"));
}
EntityConsumerBuilder consumerBuilder =
new EntityConsumerBuilder(typeRegistry, this.graph, entityStore, entityGraphRetriever, graphBulk,
entityStoreBulk, entityGraphRetrieverBulk, batchSize);
entityStoreBulk, entityGraphRetrieverBulk, batchSize, isMigrationImport);
LOG.info("MigrationImport: EntityCreationManager: Created!");
return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, dataMigrationStatusService);

View File

@ -21,7 +21,6 @@ import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.pc.WorkItemConsumer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
@ -33,8 +32,8 @@ import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityStream;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -57,6 +56,7 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
private final AtlasEntityStore entityStoreBulk;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityRetrieverBulk;
private final boolean isMigrationImport;
private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new ArrayList<>();
private List<String> localResults = new ArrayList<>();
@ -64,7 +64,7 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
public EntityConsumer(AtlasTypeRegistry typeRegistry,
AtlasGraph atlasGraph, AtlasEntityStore entityStore,
AtlasGraph atlasGraphBulk, AtlasEntityStore entityStoreBulk, EntityGraphRetriever entityRetrieverBulk,
BlockingQueue queue, int batchSize) {
BlockingQueue queue, int batchSize , boolean isMigrationImport) {
super(queue);
this.typeRegistry = typeRegistry;
@ -76,6 +76,7 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
this.entityRetrieverBulk = entityRetrieverBulk;
this.batchSize = batchSize;
this.isMigrationImport = isMigrationImport;
}
@Override
@ -98,6 +99,7 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
private void processEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, long currentCount) {
RequestContext.get().setImportInProgress(true);
RequestContext.get().setCreateShellEntityForNonExistingReference(true);
RequestContext.get().setMigrationInProgress(this.isMigrationImport);
try {
LOG.debug("Processing: {}", currentCount);

View File

@ -37,10 +37,11 @@ public class EntityConsumerBuilder implements WorkItemBuilder<EntityConsumer, At
private final AtlasTypeRegistry typeRegistry;
private EntityGraphRetriever entityRetrieverBulk;
private int batchSize;
private final boolean isMigrationImport;
public EntityConsumerBuilder(AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph, AtlasEntityStoreV2 entityStore, EntityGraphRetriever entityRetriever,
AtlasGraph atlasGraphBulk, AtlasEntityStoreV2 entityStoreBulk, EntityGraphRetriever entityRetrieverBulk,
int batchSize) {
int batchSize, boolean isMigrationImport) {
this.typeRegistry = typeRegistry;
this.atlasGraph = atlasGraph;
@ -52,12 +53,13 @@ public class EntityConsumerBuilder implements WorkItemBuilder<EntityConsumer, At
this.entityRetrieverBulk = entityRetrieverBulk;
this.batchSize = batchSize;
this.isMigrationImport = isMigrationImport;
}
@Override
public EntityConsumer build(BlockingQueue<AtlasEntity.AtlasEntityWithExtInfo> queue) {
return new EntityConsumer(typeRegistry, atlasGraph, entityStore,
atlasGraphBulk, entityStoreBulk, entityRetrieverBulk,
queue, this.batchSize);
queue, this.batchSize, this.isMigrationImport);
}
}

View File

@ -74,6 +74,7 @@ public class RequestContext {
private int maxAttempts = 1;
private int attemptCount = 1;
private boolean isImportInProgress = false;
private boolean isMigrationInProgress = false;
private boolean isInNotificationProcessing = false;
private boolean isInTypePatching = false;
private boolean createShellEntityForNonExistingReference = false;
@ -202,6 +203,14 @@ public class RequestContext {
return isImportInProgress;
}
public boolean isMigrationInProgress() {
return isMigrationInProgress;
}
public void setMigrationInProgress(boolean migrationInProgress) {
isMigrationInProgress = migrationInProgress;
}
public void setImportInProgress(boolean importInProgress) {
isImportInProgress = importInProgress;
}