ATLAS-4230: Add support for Google Cloud Storage Path Entity creation
Signed-off-by: Sarath Subramanian <sarath@apache.org>
This commit is contained in:
parent
2aa448c110
commit
c03ee72ca5
|
|
@ -82,6 +82,13 @@ public class AtlasPathExtractorUtil {
|
|||
public static final String RELATIONSHIP_OZONE_VOLUME_BUCKET = "ozone_volume_buckets";
|
||||
public static final String RELATIONSHIP_OZONE_PARENT_CHILDREN = "ozone_parent_children";
|
||||
|
||||
//Google Cloud Storage
|
||||
public static final String GCS_SCHEME = "gs" + SCHEME_SEPARATOR;
|
||||
public static final String GCS_BUCKET = "gcp_storage_bucket";
|
||||
public static final String GCS_VIRTUAL_DIR = "gcp_storage_virtual_directory";
|
||||
public static final String ATTRIBUTE_GCS_PARENT = "parent";
|
||||
public static final String RELATIONSHIP_GCS_PARENT_CHILDREN = "gcp_storage_parent_children";
|
||||
|
||||
public static AtlasEntityWithExtInfo getPathEntity(Path path, PathExtractorContext context) {
|
||||
AtlasEntityWithExtInfo entityWithExtInfo = new AtlasEntityWithExtInfo();
|
||||
AtlasEntity ret;
|
||||
|
|
@ -98,9 +105,12 @@ public class AtlasPathExtractorUtil {
|
|||
ret = addAbfsPathEntity(path, entityWithExtInfo, context);
|
||||
} else if (isOzonePath(strPath)) {
|
||||
ret = addOzonePathEntity(path, entityWithExtInfo, context);
|
||||
} else if (isGCSPath(strPath)) {
|
||||
ret = addGCSPathEntity(path, entityWithExtInfo, context);
|
||||
} else {
|
||||
ret = addHDFSPathEntity(path, context);
|
||||
}
|
||||
|
||||
entityWithExtInfo.setEntity(ret);
|
||||
|
||||
return entityWithExtInfo;
|
||||
|
|
@ -123,6 +133,10 @@ public class AtlasPathExtractorUtil {
|
|||
return strPath != null && (strPath.startsWith(OZONE_SCHEME) || strPath.startsWith(OZONE_3_SCHEME));
|
||||
}
|
||||
|
||||
private static boolean isGCSPath(String strPath) {
|
||||
return strPath != null && strPath.startsWith(GCS_SCHEME);
|
||||
}
|
||||
|
||||
private static AtlasEntity addS3PathEntityV1(Path path, AtlasEntityExtInfo extInfo, PathExtractorContext context) {
|
||||
String strPath = path.toString();
|
||||
|
||||
|
|
@ -217,7 +231,7 @@ public class AtlasPathExtractorUtil {
|
|||
ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR);
|
||||
|
||||
ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId);
|
||||
ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath);
|
||||
ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, parentPath);
|
||||
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName);
|
||||
ret.setAttribute(ATTRIBUTE_NAME, subDirName);
|
||||
|
||||
|
|
@ -442,6 +456,83 @@ public class AtlasPathExtractorUtil {
|
|||
return ret;
|
||||
}
|
||||
|
||||
private static AtlasEntity addGCSPathEntity(Path path, AtlasEntityExtInfo extInfo, PathExtractorContext context) {
|
||||
String strPath = path.toString();
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("==> addGCSPathEntity(strPath={})", strPath);
|
||||
}
|
||||
|
||||
String metadataNamespace = context.getMetadataNamespace();
|
||||
String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
|
||||
AtlasEntity ret = context.getEntity(pathQualifiedName);
|
||||
|
||||
if (ret == null) {
|
||||
String bucketName = path.toUri().getAuthority();
|
||||
String schemeAndBucketName = (path.toUri().getScheme() + SCHEME_SEPARATOR + bucketName).toLowerCase();
|
||||
String bucketQualifiedName = schemeAndBucketName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
|
||||
AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName);
|
||||
|
||||
if (bucketEntity == null) {
|
||||
bucketEntity = new AtlasEntity(GCS_BUCKET);
|
||||
|
||||
bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName);
|
||||
bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("adding entity: typeName={}, qualifiedName={}", bucketEntity.getTypeName(), bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
|
||||
}
|
||||
|
||||
context.putEntity(bucketQualifiedName, bucketEntity);
|
||||
}
|
||||
|
||||
extInfo.addReferredEntity(bucketEntity);
|
||||
|
||||
AtlasRelatedObjectId parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_GCS_PARENT_CHILDREN);
|
||||
String parentPath = Path.SEPARATOR;
|
||||
String dirPath = path.toUri().getPath();
|
||||
|
||||
if (StringUtils.isEmpty(dirPath)) {
|
||||
dirPath = Path.SEPARATOR;
|
||||
}
|
||||
|
||||
for (String subDirName : dirPath.split(Path.SEPARATOR)) {
|
||||
if (StringUtils.isEmpty(subDirName)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String subDirPath = parentPath + subDirName + Path.SEPARATOR;
|
||||
String subDirQualifiedName = schemeAndBucketName + subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
|
||||
|
||||
ret = new AtlasEntity(GCS_VIRTUAL_DIR);
|
||||
|
||||
ret.setRelationshipAttribute(ATTRIBUTE_GCS_PARENT, parentObjId);
|
||||
ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, parentPath);
|
||||
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName);
|
||||
ret.setAttribute(ATTRIBUTE_NAME, subDirName);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("adding entity: typeName={}, qualifiedName={}", ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
|
||||
}
|
||||
|
||||
context.putEntity(subDirQualifiedName, ret);
|
||||
|
||||
parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, RELATIONSHIP_GCS_PARENT_CHILDREN);
|
||||
parentPath = subDirPath;
|
||||
}
|
||||
|
||||
if (ret == null) {
|
||||
ret = bucketEntity;
|
||||
}
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("<== addGCSPathEntity(strPath={})", strPath);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
private static AtlasEntity addHDFSPathEntity(Path path, PathExtractorContext context) {
|
||||
String strPath = path.toString();
|
||||
|
||||
|
|
|
|||
|
|
@ -78,6 +78,12 @@ public class AtlasPathExtractorUtilTest {
|
|||
private static final String S3_PATH = S3_SCHEME + "aws_my_bucket1/1234567890/renders/Irradiance_A.csv";
|
||||
private static final String S3A_PATH = S3A_SCHEME + "aws_my_bucket1/1234567890/renders/Irradiance_A.csv";
|
||||
|
||||
// Google Cloud Storage
|
||||
private static final String GCS_VIRTUAL_DIR = "gcp_storage_virtual_directory";
|
||||
private static final String GCS_BUCKET = "gcp_storage_bucket";
|
||||
private static final String GCS_SCHEME = "gs" + SCHEME_SEPARATOR;
|
||||
private static final String GCS_PATH = GCS_SCHEME + "gcs_test_bucket1/1234567890/data";
|
||||
|
||||
@DataProvider(name = "ozonePathProvider")
|
||||
private Object[][] ozonePathProvider(){
|
||||
return new Object[][]{
|
||||
|
|
@ -264,6 +270,22 @@ public class AtlasPathExtractorUtilTest {
|
|||
verifyS3KnownEntities(S3A_SCHEME, S3A_PATH, extractorContext.getKnownEntities());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetPathEntityGCSPath() {
|
||||
PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE);
|
||||
|
||||
Path path = new Path(GCS_PATH);
|
||||
AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
|
||||
AtlasEntity entity = entityWithExtInfo.getEntity();
|
||||
|
||||
assertNotNull(entity);
|
||||
assertEquals(entity.getTypeName(), GCS_VIRTUAL_DIR);
|
||||
assertEquals(entityWithExtInfo.getReferredEntities().size(), 1);
|
||||
|
||||
verifyGCSVirtualDir(GCS_SCHEME, GCS_PATH, entity);
|
||||
verifyGCSKnownEntities(GCS_SCHEME, GCS_PATH, extractorContext.getKnownEntities());
|
||||
}
|
||||
|
||||
private void verifyOzoneEntities(Map<String, AtlasEntity> knownEntities, OzoneKeyValidator validator) {
|
||||
for (AtlasEntity knownEntity : knownEntities.values()) {
|
||||
switch (knownEntity.getTypeName()){
|
||||
|
|
@ -350,20 +372,35 @@ public class AtlasPathExtractorUtilTest {
|
|||
|
||||
if (pathQName.equalsIgnoreCase(entityQName)){
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "Irradiance_A.csv");
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/renders/Irradiance_A.csv/");
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/renders/");
|
||||
} else {
|
||||
pathQName = s3Scheme + "aws_my_bucket1/1234567890/" + QNAME_METADATA_NAMESPACE;
|
||||
if (pathQName.equalsIgnoreCase(entityQName)){
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "1234567890");
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/");
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/");
|
||||
} else {
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), s3Scheme + "aws_my_bucket1/1234567890/renders/" + QNAME_METADATA_NAMESPACE);
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "renders");
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/renders/");
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyGCSVirtualDir(String s3Scheme, String path, AtlasEntity entity) {
|
||||
String pathQName = path + "/" + QNAME_METADATA_NAMESPACE;
|
||||
String entityQName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
|
||||
|
||||
if (pathQName.equalsIgnoreCase(entityQName)){
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "data");
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/");
|
||||
} else {
|
||||
pathQName = s3Scheme + "gcs_test_bucket1/1234567890/" + QNAME_METADATA_NAMESPACE;
|
||||
assertEquals(entityQName, pathQName);
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "1234567890");
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/");
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyS3V2KnownEntities(String scheme, String path, Map<String, AtlasEntity> knownEntities) {
|
||||
assertEquals(knownEntities.size(), 4);
|
||||
int dirCount = 0;
|
||||
|
|
@ -411,6 +448,29 @@ public class AtlasPathExtractorUtilTest {
|
|||
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "aws_my_bucket1");
|
||||
}
|
||||
|
||||
private void verifyGCSKnownEntities(String scheme, String path, Map<String, AtlasEntity> knownEntities) {
|
||||
assertEquals(knownEntities.size(), 3);
|
||||
int dirCount = 0;
|
||||
for (AtlasEntity knownEntity : knownEntities.values()) {
|
||||
switch (knownEntity.getTypeName()){
|
||||
case GCS_VIRTUAL_DIR:
|
||||
verifyGCSVirtualDir(scheme, path, knownEntity);
|
||||
dirCount++;
|
||||
break;
|
||||
|
||||
case GCS_BUCKET:
|
||||
verifyGCSBucketEntity(scheme, knownEntity);
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertEquals(dirCount, 2);
|
||||
}
|
||||
|
||||
private void verifyGCSBucketEntity(String scheme, AtlasEntity entity) {
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), scheme + "gcs_test_bucket1" + QNAME_METADATA_NAMESPACE);
|
||||
assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "gcs_test_bucket1");
|
||||
}
|
||||
|
||||
private class OzoneKeyValidator {
|
||||
private final String scheme;
|
||||
private final String location;
|
||||
|
|
|
|||
Loading…
Reference in New Issue