[Enhancement] Check if locations are valid when creating storage volume (#34619)

Signed-off-by: Zijie Lu <wslzj40@gmail.com>
This commit is contained in:
Zijie Lu 2023-11-09 14:45:57 +08:00 committed by GitHub
parent f3aa40c2d5
commit 8dfc8b5415
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 61 additions and 1 deletions

View File

@ -44,6 +44,8 @@ import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -52,6 +54,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
public abstract class StorageVolumeMgr implements Writable, GsonPostProcessable {
private static final String ENABLED = "enabled";
@ -62,6 +65,12 @@ public abstract class StorageVolumeMgr implements Writable, GsonPostProcessable
public static final String BUILTIN_STORAGE_VOLUME = "builtin_storage_volume";
private static final String S3 = "s3";
private static final String AZBLOB = "azblob";
private static final String HDFS = "hdfs";
@SerializedName("defaultSVId")
protected String defaultStorageVolumeId = "";
@ -107,6 +116,7 @@ public abstract class StorageVolumeMgr implements Writable, GsonPostProcessable
throws DdlException, AlreadyExistsException {
try (LockCloseable lock = new LockCloseable(rwLock.writeLock())) {
validateParams(svType, params);
validateLocations(svType, locations);
if (exists(name)) {
throw new AlreadyExistsException(String.format("Storage volume '%s' already exists", name));
}
@ -273,7 +283,7 @@ public abstract class StorageVolumeMgr implements Writable, GsonPostProcessable
}
protected void validateParams(String svType, Map<String, String> params) throws DdlException {
if (svType.equalsIgnoreCase("hdfs")) {
if (svType.equalsIgnoreCase(HDFS)) {
return;
}
for (String key : params.keySet()) {
@ -283,6 +293,33 @@ public abstract class StorageVolumeMgr implements Writable, GsonPostProcessable
}
}
private void validateLocations(String svType, List<String> locations) throws DdlException {
for (String location : locations) {
try {
URI uri = new URI(location);
String scheme = uri.getScheme().toLowerCase();
switch (svType.toLowerCase()) {
case S3:
case AZBLOB:
if (!scheme.equalsIgnoreCase(svType)) {
throw new DdlException("Invalid location " + location);
}
break;
case HDFS:
String pattern = "[a-z][a-z0-9]*";
if (!Pattern.matches(pattern, scheme)) {
throw new DdlException("Invalid location " + location);
}
break;
default:
throw new DdlException("Unknown storage volume type: " + svType);
}
} catch (URISyntaxException e) {
throw new DdlException(String.format("Invalid location %s, error: %s", location, e.getMessage()));
}
}
}
public void save(DataOutputStream dos) throws IOException, SRMetaBlockException {
SRMetaBlockWriter writer = new SRMetaBlockWriter(dos, SRMetaBlockID.STORAGE_VOLUME_MGR, 1);
writer.writeJson(this);

View File

@ -784,4 +784,27 @@ public class SharedDataStorageVolumeMgrTest {
svm.updateStorageVolume("test", storageParams, Optional.of(false), "");
Assert.assertEquals(false, svm.getStorageVolumeByName(svName).getEnabled());
}
@Test
public void testCreateStorageVolumeWithInvalidParams() throws DdlException {
String svName = "test";
StorageVolumeMgr svm = new SharedDataStorageVolumeMgr();
List<String> locations = new ArrayList<>(Arrays.asList("{hdfs://abc}"));
Map<String, String> storageParams = new HashMap<>();
Assert.assertThrows(DdlException.class,
() -> svm.createStorageVolume(svName, "hdfs", locations, storageParams, Optional.empty(), ""));
locations.clear();
locations.add("ablob://abc");
Assert.assertThrows(DdlException.class,
() -> svm.createStorageVolume(svName, "s3", locations, storageParams, Optional.empty(), ""));
locations.clear();
locations.add("s3://abc");
Assert.assertThrows(DdlException.class,
() -> svm.createStorageVolume(svName, "azblob", locations, storageParams, Optional.empty(), ""));
Assert.assertThrows(DdlException.class,
() -> svm.createStorageVolume(svName, "abc", locations, storageParams, Optional.empty(), ""));
}
}