From c6ceaa3454867006089c0337a03d21f129714d12 Mon Sep 17 00:00:00 2001 From: chaitali Date: Wed, 18 Sep 2024 23:18:20 +0530 Subject: [PATCH] ATLAS-4903 : When migration restarts it results into deletion of edges and vertices Signed-off-by: Pinal Shah --- .../repository/store/graph/v2/EntityGraphMapper.java | 2 +- .../store/graph/v2/bulkimport/MigrationImport.java | 7 +++++-- .../store/graph/v2/bulkimport/pc/EntityConsumer.java | 8 +++++--- .../graph/v2/bulkimport/pc/EntityConsumerBuilder.java | 6 ++++-- .../src/main/java/org/apache/atlas/RequestContext.java | 9 +++++++++ 5 files changed, 24 insertions(+), 8 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 6b395dd17..3f7f73e70 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -1458,7 +1458,7 @@ public class EntityGraphMapper { } if (isReference && !isSoftReference) { - boolean isAppendOnPartialUpdate = getAppendOptionForRelationship(ctx.getReferringVertex(), attribute.getName()); + boolean isAppendOnPartialUpdate = RequestContext.get().isMigrationInProgress() || getAppendOptionForRelationship(ctx.getReferringVertex(), attribute.getName()); if (isAppendOnPartialUpdate) { allArrayElements = unionCurrentAndNewElements(attribute, (List) currentElements, (List) newElementsCreated); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java index f8c9218c6..d34d9b658 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java @@ -106,10 +106,13 @@ public class MigrationImport extends ImportStrategy { int batchSize = importResult.getRequest().getOptionKeyBatchSize(); int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers()); - + boolean isMigrationImport = false; + if (importResult.getRequest().getOptions().get("migration")!=null) { + isMigrationImport = Boolean.valueOf(importResult.getRequest().getOptions().get("migration")); + } EntityConsumerBuilder consumerBuilder = new EntityConsumerBuilder(typeRegistry, this.graph, entityStore, entityGraphRetriever, graphBulk, - entityStoreBulk, entityGraphRetrieverBulk, batchSize); + entityStoreBulk, entityGraphRetrieverBulk, batchSize, isMigrationImport); LOG.info("MigrationImport: EntityCreationManager: Created!"); return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, dataMigrationStatusService); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java index b73988fd7..a22300f1b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java @@ -21,7 +21,6 @@ import org.apache.atlas.GraphTransactionInterceptor; import org.apache.atlas.RequestContext; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.pc.WorkItemConsumer; import org.apache.atlas.repository.graphdb.AtlasGraph; @@ -33,8 +32,8 @@ import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; import org.apache.atlas.repository.store.graph.v2.EntityStream; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +56,7 @@ public class EntityConsumer extends WorkItemConsumer entityBuffer = new ArrayList<>(); private List localResults = new ArrayList<>(); @@ -64,7 +64,7 @@ public class EntityConsumer extends WorkItemConsumer queue) { return new EntityConsumer(typeRegistry, atlasGraph, entityStore, atlasGraphBulk, entityStoreBulk, entityRetrieverBulk, - queue, this.batchSize); + queue, this.batchSize, this.isMigrationImport); } } diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java index e144d3650..e1670a924 100644 --- a/server-api/src/main/java/org/apache/atlas/RequestContext.java +++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java @@ -74,6 +74,7 @@ public class RequestContext { private int maxAttempts = 1; private int attemptCount = 1; private boolean isImportInProgress = false; + private boolean isMigrationInProgress = false; private boolean isInNotificationProcessing = false; private boolean isInTypePatching = false; private boolean createShellEntityForNonExistingReference = false; @@ -202,6 +203,14 @@ public class RequestContext { return isImportInProgress; } + public boolean isMigrationInProgress() { + return isMigrationInProgress; + } + + public void setMigrationInProgress(boolean migrationInProgress) { + isMigrationInProgress = migrationInProgress; + } + public void setImportInProgress(boolean importInProgress) { isImportInProgress = importInProgress; }