ATLAS-4861: Export/Import: add flag to skip updating replicated attributes

Signed-off-by: Pinal Shah <pinal.shah@freestoneinfotech.com>
This commit is contained in:
Pinal Shah 2024-05-10 17:29:19 +07:00
parent 76f918cfe4
commit 88f829f6ce
3 changed files with 45 additions and 4 deletions

View File

@ -53,6 +53,7 @@ public class AtlasExportRequest implements Serializable {
public static final String OPTION_ATTR_MATCH_TYPE = "matchType";
public static final String OPTION_SKIP_LINEAGE = "skipLineage";
public static final String OPTION_KEY_REPLICATED_TO = "replicatedTo";
public static final String OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR = "skipUpdateReplicationAttr";
public static final String FETCH_TYPE_FULL = "full";
public static final String FETCH_TYPE_CONNECTED = "connected";
public static final String FETCH_TYPE_INCREMENTAL = "incremental";
@ -141,6 +142,23 @@ public class AtlasExportRequest implements Serializable {
return MapUtils.isNotEmpty(options) && options.containsKey(OPTION_KEY_REPLICATED_TO);
}
@JsonIgnore
public boolean skipUpdateReplicationAttr() {
if (MapUtils.isNotEmpty(getOptions()) && getOptions().containsKey(OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR)) {
Object o = getOptions().get(AtlasExportRequest.OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR);
if (o instanceof String) {
return Boolean.parseBoolean((String) o);
}
if (o instanceof Boolean) {
return (Boolean) o;
}
}
return false;
}
@JsonIgnore
public String getOptionKeyReplicatedTo() {
String replicateToServerName = isReplicationOptionSet() ? (String) options.get(OPTION_KEY_REPLICATED_TO) : StringUtils.EMPTY;

View File

@ -43,6 +43,7 @@ public class AtlasImportRequest implements Serializable {
public static final String TRANSFORMS_KEY = "transforms";
public static final String TRANSFORMERS_KEY = "transformers";
public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
public static final String OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR = "skipUpdateReplicationAttr";
public static final String OPTION_KEY_MIGRATION_FILE_NAME = "migrationFileName";
public static final String OPTION_KEY_MIGRATION = "migration";
public static final String OPTION_KEY_NUM_WORKERS = "numWorkers";
@ -122,6 +123,23 @@ public class AtlasImportRequest implements Serializable {
return MapUtils.isNotEmpty(options) && options.containsKey(OPTION_KEY_REPLICATED_FROM);
}
@JsonIgnore
public boolean skipUpdateReplicationAttr() {
if (MapUtils.isNotEmpty(getOptions()) && getOptions().containsKey(OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR)) {
Object o = getOptions().get(AtlasExportRequest.OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR);
if (o instanceof String) {
return Boolean.parseBoolean((String) o);
}
if (o instanceof Boolean) {
return (Boolean) o;
}
}
return false;
}
@JsonIgnore
public String getOptionKeyReplicatedFrom() {
return isReplicationOptionSet() ? options.get(OPTION_KEY_REPLICATED_FROM) : StringUtils.EMPTY;

View File

@ -97,6 +97,7 @@ public class AuditsWriter {
String attrNameReplicated,
long lastModifiedTimestamp) throws AtlasBaseException {
if (!isReplicationSet || CollectionUtils.isEmpty(exportedGuids)) {
LOG.warn("Skipping Updating Replication Attributes");
return;
}
@ -200,13 +201,15 @@ public class AuditsWriter {
private AtlasExportRequest request;
private String targetServerName;
private boolean replicationOptionState;
private boolean skipUpdateReplicationAttr;
private String targetServerFullName;
public void add(String userName, AtlasExportResult result,
long startTime, long endTime,
List<String> entityGuids) throws AtlasBaseException {
request = result.getRequest();
replicationOptionState = request.isReplicationOptionSet();
replicationOptionState = request.isReplicationOptionSet();
skipUpdateReplicationAttr = request.skipUpdateReplicationAttr();
saveCurrentServer();
@ -220,7 +223,7 @@ public class AuditsWriter {
return;
}
updateReplicationAttribute(replicationOptionState, targetServerName, targetServerFullName,
updateReplicationAttribute((replicationOptionState && !skipUpdateReplicationAttr), targetServerName, targetServerFullName,
entityGuids, Constants.ATTR_NAME_REPLICATED_TO, result.getChangeMarker());
}
}
@ -228,6 +231,7 @@ public class AuditsWriter {
private class ImportAudits {
private AtlasImportRequest request;
private boolean replicationOptionState;
private boolean skipUpdateReplicationAttr;
private String sourceServerName;
private String sourceServerFullName;
@ -235,7 +239,8 @@ public class AuditsWriter {
long startTime, long endTime,
List<String> entityGuids) throws AtlasBaseException {
request = result.getRequest();
replicationOptionState = request.isReplicationOptionSet();
replicationOptionState = request.isReplicationOptionSet();
skipUpdateReplicationAttr = request.skipUpdateReplicationAttr();
saveCurrentServer();
@ -250,7 +255,7 @@ public class AuditsWriter {
return;
}
updateReplicationAttribute(replicationOptionState, sourceServerName, sourceServerFullName, entityGuids,
updateReplicationAttribute((replicationOptionState && !skipUpdateReplicationAttr), sourceServerName, sourceServerFullName, entityGuids,
Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker());
}