ATLAS-3879: Ozone: ozone_key entity is directly created under ozone_bucket
Signed-off-by: Madhan Neethiraj <madhan@apache.org>
This commit is contained in:
parent
cea851c551
commit
5f956110f7
|
|
@ -89,6 +89,26 @@
|
|||
],
|
||||
"classificationDefs": [],
|
||||
"entityDefs": [
|
||||
{
|
||||
"name": "ozone_parent",
|
||||
"description": "Atlas entity-type representing parent types (bucket, key) in Ozone",
|
||||
"superTypes": [
|
||||
],
|
||||
"serviceType": "ozone",
|
||||
"typeVersion": "1.0",
|
||||
"attributeDefs": [
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "ozone_child",
|
||||
"description": "Atlas entity-type representing child types (key) in Ozone",
|
||||
"superTypes": [
|
||||
],
|
||||
"serviceType": "ozone",
|
||||
"typeVersion": "1.0",
|
||||
"attributeDefs": [
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "ozone_volume",
|
||||
"description": "Atlas Type representing an volume in an Ozone Object Store",
|
||||
|
|
@ -128,7 +148,8 @@
|
|||
"name": "ozone_bucket",
|
||||
"description": "Atlas Type representing a bucket in an Ozone Object Store Volume",
|
||||
"superTypes": [
|
||||
"DataSet"
|
||||
"DataSet",
|
||||
"ozone_parent"
|
||||
],
|
||||
"serviceType": "ozone",
|
||||
"typeVersion": "1.0",
|
||||
|
|
@ -171,7 +192,9 @@
|
|||
"name": "ozone_key",
|
||||
"description": "Atlas Type representing a key in an Ozone Object Store Bucket",
|
||||
"superTypes": [
|
||||
"DataSet"
|
||||
"DataSet",
|
||||
"ozone_parent",
|
||||
"ozone_child"
|
||||
],
|
||||
"serviceType": "ozone",
|
||||
"typeVersion": "1.0",
|
||||
|
|
@ -240,21 +263,21 @@
|
|||
"propagateTags": "NONE"
|
||||
},
|
||||
{
|
||||
"name": "ozone_bucket_keys",
|
||||
"serviceType": "ozone",
|
||||
"typeVersion": "1.0",
|
||||
"name": "ozone_parent_children",
|
||||
"serviceType": "ozone",
|
||||
"typeVersion": "1.0",
|
||||
"relationshipCategory": "COMPOSITION",
|
||||
"endDef1": {
|
||||
"type": "ozone_bucket",
|
||||
"name": "keys",
|
||||
"isContainer": true,
|
||||
"cardinality": "SET"
|
||||
"type": "ozone_parent",
|
||||
"name": "children",
|
||||
"isContainer": true,
|
||||
"cardinality": "SET"
|
||||
},
|
||||
"endDef2": {
|
||||
"type": "ozone_key",
|
||||
"name": "bucket",
|
||||
"isContainer": false,
|
||||
"cardinality": "SINGLE"
|
||||
"type": "ozone_child",
|
||||
"name": "parent",
|
||||
"isContainer": false,
|
||||
"cardinality": "SINGLE"
|
||||
},
|
||||
"propagateTags": "NONE"
|
||||
}
|
||||
|
|
|
|||
|
|
@ -73,14 +73,14 @@ public class AtlasPathExtractorUtil {
|
|||
public static final String RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN = "adls_gen2_parent_children";
|
||||
|
||||
// Ozone
|
||||
public static final String OZONE_VOLUME = "ozone_volume";
|
||||
public static final String OZONE_BUCKET = "ozone_bucket";
|
||||
public static final String OZONE_KEY = "ozone_key";
|
||||
public static final String OZONE_SCHEME = "ofs" + SCHEME_SEPARATOR;
|
||||
public static final String OZONE_3_SCHEME = "o3fs" + SCHEME_SEPARATOR;
|
||||
public static final String ATTRIBUTE_VOLUME = "volume";
|
||||
public static final String RELATIONSHIP_OZONE_VOLUME_BUCKET = "ozone_volume_buckets";
|
||||
public static final String RELATIONSHIP_OZONE_BUCKET_KEY = "ozone_bucket_keys";
|
||||
public static final String OZONE_VOLUME = "ozone_volume";
|
||||
public static final String OZONE_BUCKET = "ozone_bucket";
|
||||
public static final String OZONE_KEY = "ozone_key";
|
||||
public static final String OZONE_SCHEME = "ofs" + SCHEME_SEPARATOR;
|
||||
public static final String OZONE_3_SCHEME = "o3fs" + SCHEME_SEPARATOR;
|
||||
public static final String ATTRIBUTE_VOLUME = "volume";
|
||||
public static final String RELATIONSHIP_OZONE_VOLUME_BUCKET = "ozone_volume_buckets";
|
||||
public static final String RELATIONSHIP_OZONE_PARENT_CHILDREN = "ozone_parent_children";
|
||||
|
||||
public static AtlasEntityWithExtInfo getPathEntity(Path path, PathExtractorContext context) {
|
||||
AtlasEntityWithExtInfo entityWithExtInfo = new AtlasEntityWithExtInfo();
|
||||
|
|
@ -400,13 +400,39 @@ public class AtlasPathExtractorUtil {
|
|||
|
||||
extInfo.addReferredEntity(bucketEntity);
|
||||
|
||||
ret = new AtlasEntity(OZONE_KEY);
|
||||
AtlasRelatedObjectId parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_OZONE_PARENT_CHILDREN);
|
||||
String parentPath = Path.SEPARATOR;
|
||||
String dirPath = path.toUri().getPath();
|
||||
|
||||
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
|
||||
ret.setAttribute(ATTRIBUTE_NAME, path.toUri().getPath());
|
||||
ret.setRelationshipAttribute( ATTRIBUTE_BUCKET, AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_OZONE_BUCKET_KEY));
|
||||
if (StringUtils.isEmpty(dirPath)) {
|
||||
dirPath = Path.SEPARATOR;
|
||||
}
|
||||
|
||||
context.putEntity(pathQualifiedName, ret);
|
||||
String keyQNamePrefix = ozoneScheme + SCHEME_SEPARATOR + path.toUri().getAuthority();
|
||||
|
||||
for (String subDirName : dirPath.split(Path.SEPARATOR)) {
|
||||
if (StringUtils.isEmpty(subDirName)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String subDirPath = parentPath + subDirName;
|
||||
String subDirQualifiedName = keyQNamePrefix + subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
|
||||
|
||||
ret = new AtlasEntity(OZONE_KEY);
|
||||
|
||||
ret.setRelationshipAttribute(ATTRIBUTE_PARENT, parentObjId);
|
||||
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName);
|
||||
ret.setAttribute(ATTRIBUTE_NAME, subDirName);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("adding entity: typeName={}, qualifiedName={}", ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
|
||||
}
|
||||
|
||||
context.putEntity(subDirQualifiedName, ret);
|
||||
|
||||
parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, RELATIONSHIP_OZONE_PARENT_CHILDREN);
|
||||
parentPath = subDirPath + Path.SEPARATOR;;
|
||||
}
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
|
|
|||
|
|
@ -29,9 +29,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.testng.Assert.assertEquals;
|
||||
import static org.testng.Assert.assertNotNull;
|
||||
import static org.testng.Assert.assertNull;
|
||||
import static org.testng.Assert.*;
|
||||
|
||||
public class AtlasPathExtractorUtilTest {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AtlasPathExtractorUtilTest.class);
|
||||
|
|
@ -83,35 +81,57 @@ public class AtlasPathExtractorUtilTest {
|
|||
@DataProvider(name = "ozonePathProvider")
|
||||
private Object[][] ozonePathProvider(){
|
||||
return new Object[][]{
|
||||
{ OZONE_SCHEME, "bucket1.volume1.ozone1/files/file.txt", "/files/file.txt" },
|
||||
{ OZONE_SCHEME, "bucket1.volume1.ozone1/file21.txt", "/file21.txt" },
|
||||
{ OZONE_SCHEME, "bucket1.volume1.ozone1/quarter_one/sales", "/quarter_one/sales" },
|
||||
{ OZONE_SCHEME, "bucket1.volume1.ozone1/quarter_one/sales/", "/quarter_one/sales" },
|
||||
{ OZONE_3_SCHEME, "bucket1.volume1.ozone1/files/file.txt", "/files/file.txt" },
|
||||
{ OZONE_3_SCHEME, "bucket1.volume1.ozone1/file21.txt", "/file21.txt"},
|
||||
{ OZONE_3_SCHEME, "bucket1.volume1.ozone1/quarter_one/sales", "/quarter_one/sales" },
|
||||
{ OZONE_3_SCHEME, "bucket1.volume1.ozone1/quarter_one/sales/", "/quarter_one/sales" },
|
||||
{ new OzoneKeyValidator(OZONE_SCHEME, "bucket1.volume1.ozone1.com/files/file.txt",
|
||||
"files", "bucket1.volume1.ozone1.com/files",
|
||||
"file.txt", "bucket1.volume1.ozone1.com/files/file.txt")},
|
||||
|
||||
{ new OzoneKeyValidator(OZONE_SCHEME, "bucket1.volume1.ozone1:1234/file21.txt",
|
||||
"file21.txt", "bucket1.volume1.ozone1:1234/file21.txt") },
|
||||
|
||||
{ new OzoneKeyValidator(OZONE_SCHEME, "bucket1.volume1.ozone1/quarter_one/sales",
|
||||
"quarter_one", "bucket1.volume1.ozone1/quarter_one",
|
||||
"sales", "bucket1.volume1.ozone1/quarter_one/sales") },
|
||||
|
||||
{ new OzoneKeyValidator(OZONE_SCHEME, "bucket1.volume1.ozone1/quarter_one/sales/",
|
||||
"quarter_one", "bucket1.volume1.ozone1/quarter_one",
|
||||
"sales", "bucket1.volume1.ozone1/quarter_one/sales") },
|
||||
|
||||
{ new OzoneKeyValidator(OZONE_3_SCHEME, "bucket1.volume1.ozone1/files/file.txt",
|
||||
"files", "bucket1.volume1.ozone1/files",
|
||||
"file.txt", "bucket1.volume1.ozone1/files/file.txt") },
|
||||
|
||||
{ new OzoneKeyValidator(OZONE_3_SCHEME, "bucket1.volume1.ozone1/file21.txt",
|
||||
"file21.txt", "bucket1.volume1.ozone1/file21.txt") },
|
||||
|
||||
{ new OzoneKeyValidator(OZONE_3_SCHEME, "bucket1.volume1.ozone1/quarter_one/sales",
|
||||
"quarter_one", "bucket1.volume1.ozone1/quarter_one",
|
||||
"sales", "bucket1.volume1.ozone1/quarter_one/sales") },
|
||||
|
||||
{ new OzoneKeyValidator(OZONE_3_SCHEME, "bucket1.volume1.ozone1/quarter_one/sales/",
|
||||
"quarter_one", "bucket1.volume1.ozone1/quarter_one",
|
||||
"sales", "bucket1.volume1.ozone1/quarter_one/sales") },
|
||||
};
|
||||
}
|
||||
|
||||
@Test(dataProvider = "ozonePathProvider")
|
||||
public void testGetPathEntityOzone3Path(String scheme, String location, String keyName) {
|
||||
String ozonePath = scheme + location;
|
||||
PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE);
|
||||
public void testGetPathEntityOzone3Path(OzoneKeyValidator validator) {
|
||||
String scheme = validator.scheme;
|
||||
String ozonePath = scheme + validator.location;
|
||||
|
||||
PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE);
|
||||
Path path = new Path(ozonePath);
|
||||
|
||||
AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
|
||||
AtlasEntity entity = entityWithExtInfo.getEntity();
|
||||
|
||||
assertNotNull(entity);
|
||||
assertEquals(entity.getTypeName(), OZONE_KEY);
|
||||
verifyOzoneKeyEntity(ozonePath, keyName, entity);
|
||||
verifyOzoneKeyEntity(entity, validator);
|
||||
|
||||
assertEquals(entityWithExtInfo.getReferredEntities().size(), 2);
|
||||
verifyOzoneEntities(scheme, ozonePath, keyName, entityWithExtInfo.getReferredEntities());
|
||||
verifyOzoneEntities(entityWithExtInfo.getReferredEntities(), validator);
|
||||
|
||||
assertEquals(extractorContext.getKnownEntities().size(), 3);
|
||||
verifyOzoneEntities(scheme, ozonePath, keyName, extractorContext.getKnownEntities());
|
||||
assertEquals(extractorContext.getKnownEntities().size(), validator.knownEntitiesCount);
|
||||
verifyOzoneEntities(extractorContext.getKnownEntities(), validator);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -244,31 +264,29 @@ public class AtlasPathExtractorUtilTest {
|
|||
verifyS3KnownEntities(S3A_SCHEME, S3A_PATH, extractorContext.getKnownEntities());
|
||||
}
|
||||
|
||||
private void verifyOzoneEntities(String scheme, String path, String keyName, Map<String, AtlasEntity> knownEntities) {
|
||||
private void verifyOzoneEntities(Map<String, AtlasEntity> knownEntities, OzoneKeyValidator validator) {
|
||||
for (AtlasEntity knownEntity : knownEntities.values()) {
|
||||
switch (knownEntity.getTypeName()){
|
||||
case OZONE_KEY:
|
||||
verifyOzoneKeyEntity(path, keyName, knownEntity);
|
||||
verifyOzoneKeyEntity(knownEntity, validator);
|
||||
break;
|
||||
|
||||
case OZONE_VOLUME:
|
||||
assertEquals(knownEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), scheme + "volume1" + QNAME_METADATA_NAMESPACE);
|
||||
assertEquals(knownEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), validator.scheme + "volume1" + QNAME_METADATA_NAMESPACE);
|
||||
assertEquals(knownEntity.getAttribute(ATTRIBUTE_NAME), "volume1");
|
||||
break;
|
||||
|
||||
case OZONE_BUCKET:
|
||||
assertEquals(knownEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), scheme + "volume1.bucket1" + QNAME_METADATA_NAMESPACE);
|
||||
assertEquals(knownEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), validator.scheme + "volume1.bucket1" + QNAME_METADATA_NAMESPACE);
|
||||
assertEquals(knownEntity.getAttribute(ATTRIBUTE_NAME), "bucket1");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyOzoneKeyEntity(String path, String name, AtlasEntity entity) {
|
||||
//remove trailing "/" if present from path
|
||||
path = (path.charAt(path.length()-1) == '/') ? path.substring(0, path.length()-1) : path;
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), path + QNAME_METADATA_NAMESPACE);
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), name);
|
||||
private void verifyOzoneKeyEntity(AtlasEntity entity, OzoneKeyValidator validator) {
|
||||
assertEquals(entity.getTypeName(), OZONE_KEY);
|
||||
assertTrue(validator.validateNameQName(entity));
|
||||
}
|
||||
|
||||
private void verifyHDFSEntity(AtlasEntity entity, boolean toLowerCase) {
|
||||
|
|
@ -392,4 +410,42 @@ public class AtlasPathExtractorUtilTest {
|
|||
assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), scheme + "aws_my_bucket1" + QNAME_METADATA_NAMESPACE);
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "aws_my_bucket1");
|
||||
}
|
||||
|
||||
private class OzoneKeyValidator {
|
||||
private final String scheme;
|
||||
private final String location;
|
||||
private final int knownEntitiesCount;
|
||||
private final Map<String, String> nameQNamePairs;
|
||||
|
||||
public OzoneKeyValidator(String scheme, String location, String... pairs) {
|
||||
this.scheme = scheme;
|
||||
this.location = location;
|
||||
this.nameQNamePairs = getPairMap(scheme, pairs);
|
||||
this.knownEntitiesCount = nameQNamePairs.size() + 2;
|
||||
}
|
||||
|
||||
public boolean validateNameQName(AtlasEntity entity){
|
||||
String name = (String) entity.getAttribute(ATTRIBUTE_NAME);
|
||||
|
||||
if (this.nameQNamePairs.containsKey(name)){
|
||||
String qName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
|
||||
|
||||
if (qName.equals(this.nameQNamePairs.get(name))) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private Map<String, String> getPairMap(String scheme, String... pairs){
|
||||
Map< String, String > ret = new HashMap<>();
|
||||
|
||||
for (int i = 0; i < pairs.length; i += 2) {
|
||||
ret.put(pairs[i], scheme + pairs[i+1] + QNAME_METADATA_NAMESPACE);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue