ATLAS-4842 : Export/Import: fetchType as incremental does full export instead of connected

Signed-off-by: Pinal Shah <pinal.shah@freestoneinfotech.com>
This commit is contained in:
priyanshi-shah26 2024-04-05 14:06:55 +05:30 committed by Pinal Shah
parent ab89e78ed5
commit 51691b139b
5 changed files with 131 additions and 9 deletions

View File

@ -55,8 +55,12 @@ public class EntitiesExtractor {
if (context.isHiveDBIncrementalSkipLineage()) {
extractors.get(INCREMENTAL_EXTRACT).fullFetch(entity, context);
break;
} else if (context.isHiveTableIncrementalSkipLineage()) {
extractors.get(INCREMENTAL_EXTRACT).connectedFetch(entity, context);
} else if (context.isHiveTableIncremental()) {
if (context.skipLineage) {
extractors.get(INCREMENTAL_EXTRACT).connectedFetch(entity, context);
} else {
extractor.connectedFetch(entity, context);
}
break;
}

View File

@ -408,7 +408,7 @@ public class ExportService {
skipLineage = result.getRequest().getSkipLineageOptionValue();
this.changeMarker = result.getRequest().getChangeTokenFromOptions();
this.isHiveDBIncremental = checkHiveDBIncrementalSkipLineage(result.getRequest());
this.isHiveTableIncremental = checkHiveTableIncrementalSkipLineage(result.getRequest());
this.isHiveTableIncremental = checkHiveTableIncremental(result.getRequest());
this.isSkipConnectedFetch = false;
}
@ -422,14 +422,13 @@ public class ExportService {
request.getSkipLineageOptionValue();
}
private boolean checkHiveTableIncrementalSkipLineage(AtlasExportRequest request) {
if(CollectionUtils.isEmpty(request.getItemsToExport())) {
private boolean checkHiveTableIncremental(AtlasExportRequest request) {
if (CollectionUtils.isEmpty(request.getItemsToExport())) {
return false;
}
return request.getItemsToExport().get(0).getTypeName().equalsIgnoreCase(ATLAS_TYPE_HIVE_TABLE) &&
request.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL) &&
request.getSkipLineageOptionValue();
request.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL);
}
public List<AtlasEntity> getEntitiesWithModifiedTimestamp(AtlasEntityWithExtInfo entityWithExtInfo) {
@ -501,6 +500,10 @@ public class ExportService {
return isHiveTableIncremental;
}
public boolean isHiveTableIncremental() {
return isHiveTableIncremental;
}
public void addToEntityCreationOrder(String guid) {
entityCreationOrder.add(guid);
}

View File

@ -62,6 +62,7 @@ import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImp
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertFalse;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ExportIncrementalTest extends AtlasTestBase {
@ -83,22 +84,27 @@ public class ExportIncrementalTest extends AtlasTestBase {
private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental";
private final String EXPORT_REQUEST_CONNECTED = "export-connected";
private AtlasClassificationType classificationTypeT1;
private AtlasClassificationType classificationTypeT2;
private AtlasClassificationType classificationTypeT3;
private long nextTimestamp;
private static final String EXPORT_INCREMENTAL = "incremental";
private static final String QUALIFIED_NAME_DB = "db_test_1@02052019";
private static final String QUALIFIED_NAME_TABLE_LINEAGE = "db_test_1.test_tbl_ctas_2@02052019";
private static final String QUALIFIED_NAME_TABLE_2 = "db_test_1.test_tbl_2@02052019";
private static final String GUID_DB = "f0b72ab4-7452-4e42-ac74-2aee7728cce4";
private static final String GUID_TABLE_2 = "8d0b834c-61ce-42d8-8f66-6fa51c36bccb";
private static final String GUID_TABLE_CTAS_2 = "eaec545b-3ac7-4e1b-a497-bd4a2b6434a2";
private static final String GUID_HIVE_PROCESS = "bd3138b2-f29e-4226-b859-de25eaa1c18b";
private static final String GUID_TABLE_1 = "4d5adf00-2c9b-4877-ad23-c41fd7319150";
@BeforeClass
public void setup() throws IOException, AtlasBaseException {
basicSetup(typeDefStore, typeRegistry);
RequestContext.get().setImportInProgress(true);
classificationTypeT1 = createNewClassification();
classificationTypeT2 = createNewClassificationT2();
classificationTypeT3 = createNewClassificationT3();
createEntities(entityStore, ENTITIES_SUB_DIR, new String[] { "db", "table-columns"});
final String[] entityGuids = {DB_GUID, TABLE_GUID};
@ -163,6 +169,15 @@ public class ExportIncrementalTest extends AtlasTestBase {
return typeRegistry.getClassificationTypeByName("T1");
}
private AtlasClassificationType createNewClassificationT2() {
createTypes(typeDefStore, ENTITIES_SUB_DIR,"typesdef-new-classification-T2");
return typeRegistry.getClassificationTypeByName("T2");
}
private AtlasClassificationType createNewClassificationT3() {
createTypes(typeDefStore, ENTITIES_SUB_DIR,"typedef-new-classification-T3");
return typeRegistry.getClassificationTypeByName("T3");
}
@Test(dependsOnMethods = "atT1_NewClassificationAttachedToTable_ReturnsChangedTable")
public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn() throws AtlasBaseException, IOException {
final int expectedEntityCount = 1;
@ -245,6 +260,54 @@ public class ExportIncrementalTest extends AtlasTestBase {
verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_CTAS_2);
}
@Test(dependsOnMethods = "importHiveDb")
public void exportTableIncrementalForParentEntity() throws AtlasBaseException, IOException {
InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL, 0, false));
ZipSource sourceCopy = getZipSourceCopy(source);
verifyExpectedEntities(getFileNames(sourceCopy), GUID_DB, GUID_HIVE_PROCESS, GUID_TABLE_2, GUID_TABLE_CTAS_2);
source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL, 0, false));
verifyUnExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_1);
nextTimestamp = updateTimesampForNextIncrementalExport(sourceCopy);
try {
source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL, nextTimestamp, false));
} catch (SkipException e) {
throw e;
}
entityStore.addClassifications(GUID_TABLE_1, ImmutableList.of(classificationTypeT2.createDefaultValue()));
entityStore.addClassifications(GUID_TABLE_2, ImmutableList.of(classificationTypeT2.createDefaultValue()));
source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL, nextTimestamp, false));
verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_2);
source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL, nextTimestamp, false));
verifyUnExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_1);
}
@Test(dependsOnMethods = "importHiveDb")
public void exportTableIncremental() throws AtlasBaseException, IOException {
InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true));
ZipSource sourceCopy = getZipSourceCopy(source);
verifyExpectedEntities(getFileNames(sourceCopy), GUID_DB, GUID_TABLE_CTAS_2);
nextTimestamp = updateTimesampForNextIncrementalExport(sourceCopy);
entityStore.addClassifications(GUID_TABLE_1, ImmutableList.of(classificationTypeT3.createDefaultValue()));
entityStore.addClassifications(GUID_TABLE_CTAS_2, ImmutableList.of(classificationTypeT3.createDefaultValue()));
source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, false));
verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_CTAS_2);
source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, false));
verifyUnExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_1);
}
private AtlasExportRequest getIncrementalRequest(long timestamp) {
try {
@ -292,6 +355,12 @@ public class ExportIncrementalTest extends AtlasTestBase {
}
}
private void verifyUnExpectedEntities(List<String> fileNames, String... guids){
for (String guid : guids) {
assertFalse(fileNames.contains(guid.toLowerCase()));
}
}
private List<String> getFileNames(ZipSource zipSource){
List<String> ret = new ArrayList<>();
assertTrue(zipSource.hasNext());

View File

@ -0,0 +1,23 @@
{
"classificationDefs": [
{
"category": "CLASSIFICATION",
"guid": "6hee5e9f-e703-447a-b23b-0b831dc8a933",
"createdBy": "admin",
"updatedBy": "admin",
"createTime": 1711991058600,
"updateTime": 1711991058600,
"version": 1,
"name": "T3",
"description": "T3",
"typeVersion": "1.0",
"attributeDefs": [],
"superTypes": [],
"entityTypes": [],
"subTypes": []
}
],
"entityDefs": [],
"enumDefs": [],
"structDefs": []
}

View File

@ -0,0 +1,23 @@
{
"classificationDefs": [
{
"category": "CLASSIFICATION",
"guid": "2e135e9f-e703-447a-b23b-0b831dc8a933",
"createdBy": "admin",
"updatedBy": "admin",
"createTime": 1711991058474,
"updateTime": 1711991058474,
"version": 1,
"name": "T2",
"description": "T2",
"typeVersion": "1.0",
"attributeDefs": [],
"superTypes": [],
"entityTypes": [],
"subTypes": []
}
],
"entityDefs": [],
"enumDefs": [],
"structDefs": []
}