ATLAS-4129: Glossary Bulk Import Bugs consolidated fix (ATLAS-4128, ATLAS-4131, ATLAS-4160, ATLAS-4130, ATLAS-4129)

Signed-off-by: Sarath Subramanian <sarath@apache.org>
This commit is contained in:
mayanknj 2021-03-27 00:57:40 +05:30 committed by Sarath Subramanian
parent c03ee72ca5
commit 842890d27b
8 changed files with 126 additions and 85 deletions

View File

@ -1014,10 +1014,10 @@ public class AtlasClientV2 extends AtlasBaseClient {
return readStreamContents(inputStream);
}
public List<AtlasGlossaryTerm> importGlossary(String fileName) throws AtlasServiceException {
public BulkImportResponse importGlossary(String fileName) throws AtlasServiceException {
MultiPart multipartEntity = getMultiPartData(fileName);
return callAPI(API_V2.IMPORT_GLOSSARY, List.class, multipartEntity);
return callAPI(API_V2.IMPORT_GLOSSARY, BulkImportResponse.class, multipartEntity);
}

View File

@ -17,13 +17,18 @@
*/
package org.apache.atlas.bulkimport;
import org.apache.atlas.model.annotation.AtlasJSON;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class BulkImportResponse {
import static org.apache.atlas.bulkimport.BulkImportResponse.ImportStatus.SUCCESS;
private List<ImportInfo> failedImportInfoList = new ArrayList<ImportInfo>();
private List<ImportInfo> successImportInfoList = new ArrayList<ImportInfo>();
@AtlasJSON
public class BulkImportResponse implements Serializable {
private List<ImportInfo> failedImportInfoList = new ArrayList<>();
private List<ImportInfo> successImportInfoList = new ArrayList<>();
public BulkImportResponse() {}
@ -31,36 +36,32 @@ public class BulkImportResponse {
return failedImportInfoList;
}
public void setFailedImportInfoList(List<ImportInfo> failedImportInfoList){
public void setFailedImportInfoList(List<ImportInfo> failedImportInfoList) {
this.failedImportInfoList = failedImportInfoList;
}
public void setFailedImportInfoList(ImportInfo importInfo){
List<ImportInfo> failedImportInfoList = this.failedImportInfoList;
if (failedImportInfoList == null) {
failedImportInfoList = new ArrayList<>();
public void addToFailedImportInfoList(ImportInfo importInfo) {
if (this.failedImportInfoList == null) {
this.failedImportInfoList = new ArrayList<>();
}
failedImportInfoList.add(importInfo);
setFailedImportInfoList(failedImportInfoList);
this.failedImportInfoList.add(importInfo);
}
public List<ImportInfo> getSuccessImportInfoList() {
return successImportInfoList;
}
public void setSuccessImportInfoList(List<ImportInfo> successImportInfoList){
public void setSuccessImportInfoList(List<ImportInfo> successImportInfoList) {
this.successImportInfoList = successImportInfoList;
}
public void setSuccessImportInfoList(ImportInfo importInfo){
List<ImportInfo> successImportInfoList = this.successImportInfoList;
public void addToSuccessImportInfoList(ImportInfo importInfo) {
if (successImportInfoList == null) {
successImportInfoList = new ArrayList<>();
}
successImportInfoList.add(importInfo);
setSuccessImportInfoList(successImportInfoList);
}
public enum ImportStatus {
@ -75,14 +76,16 @@ public class BulkImportResponse {
'}';
}
static public class ImportInfo {
public static class ImportInfo {
private String parentObjectName;
private String childObjectName;
private ImportStatus importStatus;
private String remarks;
private Integer rowNumber;
public ImportInfo(){ }
public ImportInfo(String parentObjectName, String childObjectName, ImportStatus importStatus, String remarks, Integer rowNumber) {
this.parentObjectName = parentObjectName;
this.childObjectName = childObjectName;
@ -92,19 +95,19 @@ public class BulkImportResponse {
}
public ImportInfo(String parentObjectName, String childObjectName, ImportStatus importStatus) {
this(parentObjectName, childObjectName, importStatus, "",-1);
this(parentObjectName, childObjectName, importStatus, "", -1);
}
public ImportInfo( ImportStatus importStatus, String remarks) {
this("","", importStatus, remarks, -1);
public ImportInfo(ImportStatus importStatus, String remarks) {
this("", "", importStatus, remarks, -1);
}
public ImportInfo( ImportStatus importStatus, String remarks, Integer rowNumber) {
this("","", importStatus, remarks, rowNumber);
public ImportInfo(ImportStatus importStatus, String remarks, Integer rowNumber) {
this("", "", importStatus, remarks, rowNumber);
}
public ImportInfo(String parentObjectName, String childObjectName) {
this(parentObjectName,childObjectName, ImportStatus.SUCCESS, "", -1);
this(parentObjectName, childObjectName, SUCCESS, "", -1);
}
public ImportInfo(String parentObjectName, String childObjectName, ImportStatus importStatus, String remarks) {
@ -154,5 +157,4 @@ public class BulkImportResponse {
'}';
}
}
}

View File

@ -20,6 +20,8 @@ package org.apache.atlas.glossary;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.SortOrder;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.bulkimport.BulkImportResponse.ImportInfo;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.glossary.AtlasGlossary;
import org.apache.atlas.model.glossary.AtlasGlossaryCategory;
@ -54,7 +56,10 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.atlas.glossary.GlossaryUtils.*;
import static org.apache.atlas.bulkimport.BulkImportResponse.ImportStatus.FAILED;
import static org.apache.atlas.glossary.GlossaryUtils.getAtlasGlossaryCategorySkeleton;
import static org.apache.atlas.glossary.GlossaryUtils.getAtlasGlossaryTermSkeleton;
import static org.apache.atlas.glossary.GlossaryUtils.getGlossarySkeleton;
@Service
public class GlossaryService {
@ -1106,19 +1111,29 @@ public class GlossaryService {
}
}
public List<AtlasGlossaryTerm> importGlossaryData(InputStream inputStream, String fileName) throws AtlasBaseException {
List<AtlasGlossaryTerm> ret;
public BulkImportResponse importGlossaryData(InputStream inputStream, String fileName) throws AtlasBaseException {
BulkImportResponse ret = new BulkImportResponse();
try {
if (StringUtils.isBlank(fileName)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_FILE_TYPE, fileName);
}
List<String[]> fileData = FileUtils.readFileData(fileName, inputStream);
List<String> failedTermMsgs = new ArrayList<>();
List<String[]> fileData = FileUtils.readFileData(fileName, inputStream);
List<AtlasGlossaryTerm> glossaryTerms = glossaryTermUtils.getGlossaryTermDataList(fileData, ret);
ret = glossaryTermUtils.getGlossaryTermDataList(fileData, failedTermMsgs);
ret = createGlossaryTerms(ret);
for (AtlasGlossaryTerm glossaryTerm : glossaryTerms) {
String glossaryTermName = glossaryTerm.getName();
String glossaryName = getGlossaryName(glossaryTerm);
try {
createTerm(glossaryTerm);
ret.addToSuccessImportInfoList(new ImportInfo(glossaryName, glossaryTermName));
} catch (AtlasBaseException e) {
LOG.error("Error while importing glossary term {}", glossaryTermName);
ret.addToFailedImportInfoList(new ImportInfo(glossaryName, glossaryTermName, FAILED, e.getMessage()));
}
}
} catch (IOException e) {
throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_UPLOAD, fileName);
}
@ -1126,6 +1141,19 @@ public class GlossaryService {
return ret;
}
private String getGlossaryName(AtlasGlossaryTerm glossaryTerm) {
String ret = "";
String glossaryTermQName = glossaryTerm.getQualifiedName();
if (StringUtils.isNotBlank(glossaryTermQName)){
String[] glossaryQnameSplit = glossaryTermQName.split("@");
ret = (glossaryQnameSplit.length == 2) ? glossaryQnameSplit[1] : "";
}
return ret;
}
private List<AtlasGlossaryTerm> createGlossaryTerms(List<AtlasGlossaryTerm> glossaryTerms) throws AtlasBaseException {
List<AtlasGlossaryTerm> ret = new ArrayList<>();

View File

@ -18,6 +18,8 @@
package org.apache.atlas.glossary;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.bulkimport.BulkImportResponse.ImportInfo;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.glossary.AtlasGlossary;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
@ -53,6 +55,8 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.atlas.bulkimport.BulkImportResponse.ImportStatus.FAILED;
public class GlossaryTermUtils extends GlossaryUtils {
private static final Logger LOG = LoggerFactory.getLogger(GlossaryTermUtils.class);
private static final boolean DEBUG_ENABLED = LOG.isDebugEnabled();
@ -529,12 +533,14 @@ public class GlossaryTermUtils extends GlossaryUtils {
}
}
protected List<AtlasGlossaryTerm> getGlossaryTermDataList(List<String[]> fileData, List<String> failedTermMsgs) throws AtlasBaseException {
protected List<AtlasGlossaryTerm> getGlossaryTermDataList(List<String[]> fileData, BulkImportResponse bulkImportResponse) throws AtlasBaseException {
List<AtlasGlossaryTerm> glossaryTerms = new ArrayList<>();
Map<String, String> glossaryNameCache = new HashMap<>();
int rowCount = 1;
for (String[] record : fileData) {
AtlasGlossaryTerm glossaryTerm = new AtlasGlossaryTerm();
List<String> failedTermMsgs = new ArrayList<>();
AtlasGlossaryTerm glossaryTerm = new AtlasGlossaryTerm();
if ((record.length < 1) || StringUtils.isBlank(record[0])) {
LOG.error("The GlossaryName is blank for the record : ", Arrays.toString(record));
@ -570,15 +576,21 @@ public class GlossaryTermUtils extends GlossaryUtils {
if (glossaryGuid != null) {
glossaryNameCache.put(glossaryName, glossaryGuid);
glossaryTerm = populateGlossaryTermObject(failedTermMsgs, record, glossaryGuid);
glossaryTerms.add(glossaryTerm);
}
if (failedTermMsgs.size() == 0) {
glossaryTerms.add(glossaryTerm);
} else {
String failedTermMsg = StringUtils.join(failedTermMsgs, "\n");
String glossaryTermName = glossaryTerm.getName();
bulkImportResponse.addToFailedImportInfoList(new ImportInfo(glossaryName, glossaryTermName, FAILED, failedTermMsg, rowCount));
}
rowCount++;
}
if (failedTermMsgs.size() == 0) {
return glossaryTerms;
} else {
throw new AtlasBaseException("The uploaded file has not been processed due to the following errors : " + "\n" + failedTermMsgs.toString());
}
return glossaryTerms;
}
public static String getGlossaryTermHeaders() {
@ -667,7 +679,7 @@ public class GlossaryTermUtils extends GlossaryUtils {
relatedTermHeader.setTermGuid(glossaryTermGuid);
ret.add(relatedTermHeader);
} else {
failedTermMsgs.add("\n" + "The provided Reference Glossary and TermName does not exist in the system " +
failedTermMsgs.add(System.lineSeparator() + "The provided Reference Glossary and TermName does not exist in the system " +
dataArray[1] + FileUtils.COLON_CHARACTER + dataArray[0] + " for record with TermName : " + termName + " and GlossaryName : " + glossaryName);
}
}

View File

@ -60,6 +60,7 @@ import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.bulkimport.BulkImportResponse.ImportInfo;
import org.apache.atlas.util.FileUtils;
import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
@ -87,6 +88,7 @@ import java.util.Set;
import static java.lang.Boolean.FALSE;
import static org.apache.atlas.AtlasConfiguration.STORE_DIFFERENTIAL_AUDITS;
import static org.apache.atlas.bulkimport.BulkImportResponse.ImportStatus.FAILED;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PURGE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
@ -1534,18 +1536,17 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
Map<String, AtlasEntity> attributesToAssociate = getBusinessMetadataDefList(fileData, ret);
for (AtlasEntity entity : attributesToAssociate.values()) {
Map<String, Map<String, Object>> businessAttributes = entity.getBusinessAttributes();
String guid = entity.getGuid();
try {
addOrUpdateBusinessAttributes(entity.getGuid(), entity.getBusinessAttributes(), true);
addOrUpdateBusinessAttributes(guid, businessAttributes, true);
BulkImportResponse.ImportInfo successImportInfo = new BulkImportResponse.ImportInfo(entity.getGuid(), entity.getBusinessAttributes().toString());
ret.setSuccessImportInfoList(successImportInfo);
ret.addToSuccessImportInfoList(new ImportInfo(guid, businessAttributes.toString()));
}catch (Exception e) {
LOG.error("Error occurred while updating BusinessMetadata Attributes for Entity " + entity.getGuid());
LOG.error("Error occurred while updating BusinessMetadata Attributes for Entity " + guid);
BulkImportResponse.ImportInfo failedImportInfo = new BulkImportResponse.ImportInfo(entity.getGuid(), entity.getBusinessAttributes().toString(), BulkImportResponse.ImportStatus.FAILED, e.getMessage());
ret.getFailedImportInfoList().add(failedImportInfo);
ret.addToFailedImportInfoList(new ImportInfo(guid, businessAttributes.toString(), FAILED, e.getMessage()));
}
}
} catch (IOException e) {
@ -1676,9 +1677,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
for (String failedMsg : failedMsgList) {
LOG.error(failedMsg);
BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, failedMsg);
bulkImportResponse.getFailedImportInfoList().add(importInfo);
bulkImportResponse.addToFailedImportInfoList(new ImportInfo(FAILED, failedMsg));
}
return ret;
@ -1731,9 +1730,10 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
if(missingFieldsCheck){
LOG.error("Missing fields: " + Arrays.toString(record) + " at line #" + lineIndex);
String failedTermMsgs = "Missing fields: " + Arrays.toString(record) + " at line #" + lineIndex;
BulkImportResponse.ImportInfo importInfo = new BulkImportResponse.ImportInfo(BulkImportResponse.ImportStatus.FAILED, failedTermMsgs, lineIndex);
bulkImportResponse.getFailedImportInfoList().add(importInfo);
bulkImportResponse.addToFailedImportInfoList(new ImportInfo(FAILED, failedTermMsgs, lineIndex));
}
return missingFieldsCheck;
}

View File

@ -20,6 +20,7 @@ package org.apache.atlas.glossary;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.SortOrder;
import org.apache.atlas.TestModules;
import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.glossary.AtlasGlossary;
import org.apache.atlas.model.glossary.AtlasGlossaryCategory;
@ -65,7 +66,11 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@Guice(modules = TestModules.TestOnlyModule.class)
public class GlossaryServiceTest {
@ -933,17 +938,17 @@ public class GlossaryServiceTest {
@Test( dependsOnGroups = "Glossary.CREATE" )
public void testImportGlossaryData(){
try {
InputStream inputStream = getFile(CSV_FILES,"template_1.csv");
List<AtlasGlossaryTerm> atlasGlossaryTermList = glossaryService.importGlossaryData(inputStream,"template_1.csv");
InputStream inputStream = getFile(CSV_FILES,"template_1.csv");
BulkImportResponse bulkImportResponse = glossaryService.importGlossaryData(inputStream,"template_1.csv");
assertNotNull(atlasGlossaryTermList);
assertEquals(atlasGlossaryTermList.size(), 1);
assertNotNull(bulkImportResponse);
assertEquals(bulkImportResponse.getSuccessImportInfoList().size(), 1);
InputStream inputStream1 = getFile(EXCEL_FILES,"template_1.xlsx");
List<AtlasGlossaryTerm> atlasGlossaryTermList1 = glossaryService.importGlossaryData(inputStream1,"template_1.xlsx");
InputStream inputStream1 = getFile(EXCEL_FILES,"template_1.xlsx");
BulkImportResponse bulkImportResponse1 = glossaryService.importGlossaryData(inputStream1,"template_1.xlsx");
assertNotNull(atlasGlossaryTermList1);
assertEquals(atlasGlossaryTermList1.size(), 1);
assertNotNull(bulkImportResponse1);
assertEquals(bulkImportResponse1.getSuccessImportInfoList().size(), 1);
} catch (AtlasBaseException e){
fail("The GlossaryTerm should have been created "+e);
}
@ -980,12 +985,11 @@ public class GlossaryServiceTest {
InputStream inputStream = getFile(CSV_FILES, "incorrectFile.csv");
try {
glossaryService.importGlossaryData(inputStream, "incorrectFile.csv");
fail("Error occurred : Failed to recognize the incorrect file.");
BulkImportResponse bulkImportResponse = glossaryService.importGlossaryData(inputStream, "incorrectFile.csv");
assertEquals(bulkImportResponse.getFailedImportInfoList().size(),1);
} catch (AtlasBaseException e) {
assertEquals(e.getMessage(),"The uploaded file has not been processed due to the following errors : \n" +
"[\n" +
"The provided Reference Glossary and TermName does not exist in the system GentsFootwear: for record with TermName : BankBranch1 and GlossaryName : testBankingGlossary]");
fail("The incorrect file exception should have handled "+e);
}
}

View File

@ -21,6 +21,7 @@ import com.sun.jersey.core.header.FormDataContentDisposition;
import com.sun.jersey.multipart.FormDataParam;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.SortOrder;
import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.glossary.GlossaryService;
import org.apache.atlas.glossary.GlossaryTermUtils;
@ -983,8 +984,8 @@ public class GlossaryREST {
@POST
@Path("/import")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public List<AtlasGlossaryTerm> importGlossaryData(@FormDataParam("file") InputStream inputStream,
@FormDataParam("file") FormDataContentDisposition fileDetail) throws AtlasBaseException {
public BulkImportResponse importGlossaryData(@FormDataParam("file") InputStream inputStream,
@FormDataParam("file") FormDataContentDisposition fileDetail) throws AtlasBaseException {
return glossaryService.importGlossaryData(inputStream, fileDetail.getFileName());
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.model.glossary.AtlasGlossary;
import org.apache.atlas.model.glossary.AtlasGlossaryCategory;
import org.apache.atlas.model.glossary.AtlasGlossaryTerm;
@ -115,7 +116,7 @@ public class GlossaryClientV2IT extends BaseResourceIT {
public void getAllGlossary() throws Exception {
List<AtlasGlossary> list = atlasClientV2.getAllGlossaries("ASC", 5, 0);
assertNotNull(list);
assertEquals(list.size(), 2);
assertEquals(list.size(), 3);
}
@Test(dependsOnMethods = "testCreateGlossary")
@ -240,7 +241,7 @@ public class GlossaryClientV2IT extends BaseResourceIT {
}
List<AtlasGlossaryTerm> termList = atlasClientV2.createGlossaryTerms(list);
assertNotNull(termList);
assertEquals(termList.size(), 2);
assertEquals(termList.size(), 3);
}
@Test(dependsOnMethods = "testCreateGlossary")
@ -434,22 +435,15 @@ public class GlossaryClientV2IT extends BaseResourceIT {
@Test()
public void testImportGlossaryData() {
try {
String filePath = TestResourceFileUtils.getTestFilePath("template.csv");
List<AtlasGlossaryTerm> terms = atlasClientV2.importGlossary(filePath);
String filePath = TestResourceFileUtils.getTestFilePath("template.csv");
BulkImportResponse terms = atlasClientV2.importGlossary(filePath);
assertNotNull(terms);
List<AtlasGlossaryTerm> termList = mapper.convertValue(terms, new TypeReference<List<AtlasGlossaryTerm>>() { });
assertEquals(terms.getSuccessImportInfoList().size(), 1);
assertEquals(terms.size(), 1);
AtlasGlossaryTerm createdTerm = termList.get(0);
String glossaryGuid = createdTerm.getAnchor().getGlossaryGuid();
atlasClientV2.deleteGlossaryByGuid(glossaryGuid);
} catch (AtlasServiceException ex) {
fail("Import GlossaryData should've succeeded");
fail("Import GlossaryData should've succeeded : "+ex);
}
}