[Enhancement] Support add files iceberg procedure (part 1) (#62886)

This commit is contained in:
Youngwb 2025-09-10 10:05:30 +08:00 committed by GitHub
parent 1362075b96
commit e9969458f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1564 additions and 171 deletions

View File

@ -28,6 +28,7 @@ import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.iceberg.IcebergApiConverter;
import com.starrocks.connector.iceberg.IcebergCatalogType;
import com.starrocks.connector.iceberg.IcebergTableOperation;
import com.starrocks.connector.iceberg.procedure.AddFilesProcedure;
import com.starrocks.connector.iceberg.procedure.CherryPickSnapshotProcedure;
import com.starrocks.connector.iceberg.procedure.ExpireSnapshotsProcedure;
import com.starrocks.connector.iceberg.procedure.FastForwardProcedure;
@ -560,6 +561,7 @@ public class IcebergTable extends Table {
case REMOVE_ORPHAN_FILES -> RemoveOrphanFilesProcedure.getInstance();
case ROLLBACK_TO_SNAPSHOT -> RollbackToSnapshotProcedure.getInstance();
case REWRITE_DATA_FILES -> RewriteDataFilesProcedure.getInstance();
case ADD_FILES -> AddFilesProcedure.getInstance();
default -> throw new StarRocksConnectorException("Unsupported table operation %s", op);
};
}

View File

@ -14,7 +14,6 @@
package com.starrocks.connector.iceberg;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -80,7 +79,6 @@ import com.starrocks.sql.ast.DropTableStmt;
import com.starrocks.sql.ast.ListPartitionDesc;
import com.starrocks.sql.ast.PartitionDesc;
import com.starrocks.sql.ast.expression.TableName;
import com.starrocks.sql.common.DmlException;
import com.starrocks.sql.optimizer.OptimizerContext;
import com.starrocks.sql.optimizer.Utils;
import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator;
@ -117,7 +115,6 @@ import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StarRocksIcebergTableScan;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.TableIdentifier;
@ -126,13 +123,11 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializationUtil;
@ -142,16 +137,10 @@ import org.apache.iceberg.view.View;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.URLDecoder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
@ -1305,10 +1294,9 @@ public class IcebergMetadata implements ConnectorMetadata {
if (partitionSpec.isPartitioned()) {
String relativePartitionLocation = getIcebergRelativePartitionPath(
nativeTbl.location(), dataFile.partition_path);
PartitionData partitionData = partitionDataFromPath(
relativePartitionLocation, nullFingerprint, partitionSpec, nativeTbl);
IcebergPartitionData partitionData = IcebergPartitionData.partitionDataFromPath(
relativePartitionLocation, nullFingerprint, partitionSpec);
builder.withPartition(partitionData);
// builder.withPartitionPath(relativePartitionLocation);
}
batchWrite.addFile(builder.build());
}
@ -1361,111 +1349,6 @@ public class IcebergMetadata implements ConnectorMetadata {
return new Append(transaction);
}
public PartitionData partitionDataFromPath(String relativePartitionPath,
String partitionNullFingerprint, PartitionSpec spec,
org.apache.iceberg.Table table) {
PartitionData data = new PartitionData(spec.fields().size());
String[] partitions = relativePartitionPath.split("/", -1);
List<PartitionField> partitionFields = spec.fields();
if (partitions.length != partitionNullFingerprint.length()) {
throw new InternalError("Invalid partition and fingerprint size, partition:" + relativePartitionPath +
" partition size:" + String.valueOf(partitions.length) + " fingerprint:" + partitionNullFingerprint);
}
for (int i = 0; i < partitions.length; i++) {
PartitionField field = partitionFields.get(i);
String[] parts = partitions[i].split("=", 2);
Preconditions.checkArgument(parts.length == 2 && parts[0] != null &&
field.name().equals(parts[0]), "Invalid partition: %s", partitions[i]);
org.apache.iceberg.types.Type resultType = spec.partitionType().fields().get(i).type();
// org.apache.iceberg.types.Type sourceType = table.schema().findType(field.sourceId());
// NOTICE:
// The behavior here should match the make_partition_path method in be,
// and revert the String path value to the origin value and type of transform expr for the metastore.
// otherwise, if we use the api of iceberg to filter the scan files, the result may be incorrect!
if (partitionNullFingerprint.charAt(i) == '0') { //'0' means not null, '1' means null
// apply date decoding for date type
String transform = field.transform().toString();
if (transform.equals("year") || transform.equals("month")
|| transform.equals("day") || transform.equals("hour")) {
Integer year = org.apache.iceberg.util.DateTimeUtil.EPOCH.getYear();
Integer month = org.apache.iceberg.util.DateTimeUtil.EPOCH.getMonthValue();
Integer day = org.apache.iceberg.util.DateTimeUtil.EPOCH.getDayOfMonth();
Integer hour = org.apache.iceberg.util.DateTimeUtil.EPOCH.getHour();
String[] dateParts = parts[1].split("-");
if (dateParts.length > 0) {
year = Integer.parseInt(dateParts[0]);
}
if (dateParts.length > 1) {
month = Integer.parseInt(dateParts[1]);
}
if (dateParts.length > 2) {
day = Integer.parseInt(dateParts[2]);
}
if (dateParts.length > 3) {
hour = Integer.parseInt(dateParts[3]);
}
LocalDateTime target = LocalDateTime.of(year, month, day, hour, 0);
if (transform.equals("year")) {
//iceberg stores the result of transform as metadata.
parts[1] = String.valueOf(
ChronoUnit.YEARS.between(org.apache.iceberg.util.DateTimeUtil.EPOCH_DAY, target));
} else if (transform.equals("month")) {
parts[1] = String.valueOf(
ChronoUnit.MONTHS.between(org.apache.iceberg.util.DateTimeUtil.EPOCH_DAY, target));
} else if (transform.equals("day")) {
//The reuslt of day transform is a date type.
//It is diffrent from other date transform exprs, however other's result is a integer.
//do nothing
} else if (transform.equals("hour")) {
parts[1] = String.valueOf(
ChronoUnit.HOURS.between(org.apache.iceberg.util.DateTimeUtil.EPOCH_DAY.atTime(0, 0), target));
}
} else if (transform.startsWith("truncate")) {
//the result type of truncate is the same as the truncate column
if (parts[1].length() == 0) {
//do nothing
} else if (resultType.typeId() == Type.TypeID.STRING || resultType.typeId() == Type.TypeID.FIXED) {
parts[1] = URLDecoder.decode(parts[1], StandardCharsets.UTF_8);
} else if (resultType.typeId() == Type.TypeID.BINARY) {
parts[1] = URLDecoder.decode(parts[1], StandardCharsets.UTF_8);
//Do not convert the byte array to utf-8, because some byte is not valid in utf-8.
//like 0xE6 is not valid in utf8. If the convert failed, utf-8 will transfer the byte to 0xFFFD as default
//we should just read and store the byte in latin, and thus not change the byte array value.
parts[1] = new String(Base64.getDecoder().decode(parts[1]), StandardCharsets.ISO_8859_1);
}
} else if (transform.startsWith("bucket")) {
//the result type of bucket is integer.
//do nothing
} else if (transform.equals("identity")) {
if (parts[1].length() == 0) {
//do nothing
} else if (resultType.typeId() == Type.TypeID.STRING || resultType.typeId() == Type.TypeID.FIXED) {
parts[1] = URLDecoder.decode(parts[1], StandardCharsets.UTF_8);
} else if (resultType.typeId() == Type.TypeID.BINARY) {
parts[1] = URLDecoder.decode(parts[1], StandardCharsets.UTF_8);
parts[1] = new String(Base64.getDecoder().decode(parts[1]), StandardCharsets.ISO_8859_1);
} else if (resultType.typeId() == Type.TypeID.TIMESTAMP) {
parts[1] = URLDecoder.decode(parts[1], StandardCharsets.UTF_8);
parts[1] = parts[1].replace(' ', 'T');
}
} else {
throw new DmlException("Unsupported partition transform: %s", transform);
}
}
if (partitionNullFingerprint.charAt(i) == '1') {
data.set(i, null);
} else if (resultType.typeId() == Type.TypeID.BINARY) {
data.set(i, parts[1].getBytes(StandardCharsets.ISO_8859_1));
} else if (resultType.typeId() == Type.TypeID.TIMESTAMP) {
data.set(i, Literal.of(parts[1]).to(Types.TimestampType.withoutZone()).value());
} else {
data.set(i, Conversions.fromPartitionString(resultType, parts[1]));
}
}
return data;
}
public static String getIcebergRelativePartitionPath(String tableLocation, String partitionLocation) {
tableLocation = tableLocation.endsWith("/") ? tableLocation.substring(0, tableLocation.length() - 1) : tableLocation;
String tableLocationWithData = tableLocation + "/data/";
@ -1654,58 +1537,6 @@ public class IcebergMetadata implements ConnectorMetadata {
}
}
public static class PartitionData implements StructLike {
private final Object[] values;
private PartitionData(int size) {
this.values = new Object[size];
}
@Override
public int size() {
return values.length;
}
@Override
public <T> T get(int pos, Class<T> javaClass) {
Object value = values[pos];
if (javaClass == ByteBuffer.class && value instanceof byte[]) {
value = ByteBuffer.wrap((byte[]) value);
}
return javaClass.cast(value);
}
@Override
public <T> void set(int pos, T value) {
if (value instanceof ByteBuffer) {
ByteBuffer buffer = (ByteBuffer) value;
byte[] bytes = new byte[buffer.remaining()];
buffer.duplicate().get(bytes);
values[pos] = bytes;
} else {
values[pos] = value;
}
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
PartitionData that = (PartitionData) other;
return Arrays.equals(values, that.values);
}
@Override
public int hashCode() {
return Arrays.hashCode(values);
}
}
@Override
public CloudConfiguration getCloudConfiguration() {
return hdfsEnvironment.getCloudConfiguration();

View File

@ -0,0 +1,222 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.connector.iceberg;
import com.google.common.base.Preconditions;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.hive.HiveMetaClient;
import com.starrocks.sql.common.DmlException;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import java.net.URLDecoder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class IcebergPartitionData implements StructLike {
private final Object[] values;
public IcebergPartitionData(int size) {
this.values = new Object[size];
}
@Override
public int size() {
return values.length;
}
@Override
public <T> T get(int pos, Class<T> javaClass) {
Object value = values[pos];
if (javaClass == ByteBuffer.class && value instanceof byte[]) {
value = ByteBuffer.wrap((byte[]) value);
}
return javaClass.cast(value);
}
@Override
public <T> void set(int pos, T value) {
if (value instanceof ByteBuffer) {
ByteBuffer buffer = (ByteBuffer) value;
byte[] bytes = new byte[buffer.remaining()];
buffer.duplicate().get(bytes);
values[pos] = bytes;
} else {
values[pos] = value;
}
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
IcebergPartitionData that = (IcebergPartitionData) other;
return Arrays.equals(values, that.values);
}
@Override
public int hashCode() {
return Arrays.hashCode(values);
}
public static IcebergPartitionData partitionDataFromPath(String relativePartitionPath, PartitionSpec spec) {
Set<String> partitionNames = spec.fields().stream().map(PartitionField::name).collect(Collectors.toSet());
Set<String> validPathPartitionNames = new HashSet<>();
String[] partitions = relativePartitionPath.split("/", -1);
StringBuilder sb = new StringBuilder();
for (String part : partitions) {
String[] parts = part.split("=", 2);
Preconditions.checkArgument(parts.length == 2, "Invalid partition: %s", part);
if (!partitionNames.contains(parts[0])) {
throw new StarRocksConnectorException("Partition column %s not found in iceberg partition columns", parts[0]);
} else if (parts[1].equals(HiveMetaClient.PARTITION_NULL_VALUE)) {
sb.append('1');
} else {
sb.append('0');
}
validPathPartitionNames.add(parts[0]);
}
partitionNames.forEach(name -> {
if (!validPathPartitionNames.contains(name)) {
throw new StarRocksConnectorException("Partition column %s not found in path %s",
name, relativePartitionPath);
}
}
);
return partitionDataFromPath(relativePartitionPath, sb.toString(), spec);
}
public static IcebergPartitionData partitionDataFromPath(String relativePartitionPath,
String partitionNullFingerprint, PartitionSpec spec) {
IcebergPartitionData data = new IcebergPartitionData(spec.fields().size());
String[] partitions = relativePartitionPath.split("/", -1);
List<PartitionField> partitionFields = spec.fields();
if (partitions.length != partitionNullFingerprint.length()) {
throw new InternalError("Invalid partition and fingerprint size, partition:" + relativePartitionPath +
" partition size:" + partitions.length + " fingerprint:" + partitionNullFingerprint);
}
for (int i = 0; i < partitions.length; i++) {
PartitionField field = partitionFields.get(i);
String[] parts = partitions[i].split("=", 2);
Preconditions.checkArgument(parts.length == 2 && parts[0] != null &&
field.name().equals(parts[0]), "Invalid partition: %s", partitions[i]);
org.apache.iceberg.types.Type resultType = spec.partitionType().fields().get(i).type();
// org.apache.iceberg.types.Type sourceType = table.schema().findType(field.sourceId());
// NOTICE:
// The behavior here should match the make_partition_path method in be,
// and revert the String path value to the origin value and type of transform expr for the metastore.
// otherwise, if we use the api of iceberg to filter the scan files, the result may be incorrect!
if (partitionNullFingerprint.charAt(i) == '0') { //'0' means not null, '1' means null
// apply date decoding for date type
String transform = field.transform().toString();
if (transform.equals("year") || transform.equals("month")
|| transform.equals("day") || transform.equals("hour")) {
int year = org.apache.iceberg.util.DateTimeUtil.EPOCH.getYear();
int month = org.apache.iceberg.util.DateTimeUtil.EPOCH.getMonthValue();
int day = org.apache.iceberg.util.DateTimeUtil.EPOCH.getDayOfMonth();
int hour = org.apache.iceberg.util.DateTimeUtil.EPOCH.getHour();
String[] dateParts = parts[1].split("-");
if (dateParts.length > 0) {
year = Integer.parseInt(dateParts[0]);
}
if (dateParts.length > 1) {
month = Integer.parseInt(dateParts[1]);
}
if (dateParts.length > 2) {
day = Integer.parseInt(dateParts[2]);
}
if (dateParts.length > 3) {
hour = Integer.parseInt(dateParts[3]);
}
LocalDateTime target = LocalDateTime.of(year, month, day, hour, 0);
if (transform.equals("year")) {
//iceberg stores the result of transform as metadata.
parts[1] = String.valueOf(
ChronoUnit.YEARS.between(org.apache.iceberg.util.DateTimeUtil.EPOCH_DAY, target));
} else if (transform.equals("month")) {
parts[1] = String.valueOf(
ChronoUnit.MONTHS.between(org.apache.iceberg.util.DateTimeUtil.EPOCH_DAY, target));
} else if (transform.equals("day")) {
//The reuslt of day transform is a date type.
//It is diffrent from other date transform exprs, however other's result is a integer.
//do nothing
} else if (transform.equals("hour")) {
parts[1] = String.valueOf(
ChronoUnit.HOURS.between(org.apache.iceberg.util.DateTimeUtil.EPOCH_DAY.atTime(0, 0), target));
}
} else if (transform.startsWith("truncate")) {
//the result type of truncate is the same as the truncate column
if (parts[1].isEmpty()) {
//do nothing
} else if (resultType.typeId() == Type.TypeID.STRING || resultType.typeId() == Type.TypeID.FIXED) {
parts[1] = URLDecoder.decode(parts[1], StandardCharsets.UTF_8);
} else if (resultType.typeId() == Type.TypeID.BINARY) {
parts[1] = URLDecoder.decode(parts[1], StandardCharsets.UTF_8);
//Do not convert the byte array to utf-8, because some byte is not valid in utf-8.
//like 0xE6 is not valid in utf8. If the convert failed, utf-8 will transfer the byte to 0xFFFD as default
//we should just read and store the byte in latin, and thus not change the byte array value.
parts[1] = new String(Base64.getDecoder().decode(parts[1]), StandardCharsets.ISO_8859_1);
}
} else if (transform.startsWith("bucket")) {
//the result type of bucket is integer.
//do nothing
} else if (transform.equals("identity")) {
if (parts[1].isEmpty()) {
//do nothing
} else if (resultType.typeId() == Type.TypeID.STRING || resultType.typeId() == Type.TypeID.FIXED) {
parts[1] = URLDecoder.decode(parts[1], StandardCharsets.UTF_8);
} else if (resultType.typeId() == Type.TypeID.BINARY) {
parts[1] = URLDecoder.decode(parts[1], StandardCharsets.UTF_8);
parts[1] = new String(Base64.getDecoder().decode(parts[1]), StandardCharsets.ISO_8859_1);
} else if (resultType.typeId() == Type.TypeID.TIMESTAMP) {
parts[1] = URLDecoder.decode(parts[1], StandardCharsets.UTF_8);
parts[1] = parts[1].replace(' ', 'T');
}
} else {
throw new DmlException("Unsupported partition transform: %s", transform);
}
}
if (partitionNullFingerprint.charAt(i) == '1') {
data.set(i, null);
} else if (resultType.typeId() == Type.TypeID.BINARY) {
data.set(i, parts[1].getBytes(StandardCharsets.ISO_8859_1));
} else if (resultType.typeId() == Type.TypeID.TIMESTAMP) {
data.set(i, Literal.of(parts[1]).to(Types.TimestampType.withoutZone()).value());
} else {
data.set(i, Conversions.fromPartitionString(resultType, parts[1]));
}
}
return data;
}
}

View File

@ -21,6 +21,7 @@ public enum IcebergTableOperation {
REMOVE_ORPHAN_FILES,
ROLLBACK_TO_SNAPSHOT,
REWRITE_DATA_FILES,
ADD_FILES,
UNKNOWN;

View File

@ -0,0 +1,425 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.connector.iceberg.procedure;
import com.starrocks.catalog.Type;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.iceberg.IcebergPartitionData;
import com.starrocks.connector.iceberg.IcebergTableOperation;
import com.starrocks.sql.optimizer.operator.scalar.ConstantOperator;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.types.Types;
import org.apache.orc.ColumnStatistics;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
public class AddFilesProcedure extends IcebergTableProcedure {
private static final Logger LOGGER = LoggerFactory.getLogger(AddFilesProcedure.class);
private static final String PROCEDURE_NAME = "add_files";
public static final String SOURCE_TABLE = "source_table";
public static final String LOCATION = "location";
public static final String FILE_FORMAT = "file_format";
public static final String RECURSIVE = "recursive";
private static final AddFilesProcedure INSTANCE = new AddFilesProcedure();
public static AddFilesProcedure getInstance() {
return INSTANCE;
}
private AddFilesProcedure() {
super(
PROCEDURE_NAME,
List.of(
new NamedArgument(SOURCE_TABLE, Type.VARCHAR, false),
new NamedArgument(LOCATION, Type.VARCHAR, false),
new NamedArgument(FILE_FORMAT, Type.VARCHAR, false),
new NamedArgument(RECURSIVE, Type.BOOLEAN, false)
),
IcebergTableOperation.ADD_FILES
);
}
@Override
public void execute(IcebergTableProcedureContext context, Map<String, ConstantOperator> args) {
// Validate arguments - either source_table or location must be provided, but not both
ConstantOperator sourceTableArg = args.get(SOURCE_TABLE);
ConstantOperator tableLocationArg = args.get(LOCATION);
ConstantOperator fileFormatArg = args.get(FILE_FORMAT);
if (sourceTableArg == null && tableLocationArg == null) {
throw new StarRocksConnectorException(
"Either 'source_table' or 'location' must be provided for add_files operation");
}
if (sourceTableArg != null && tableLocationArg != null) {
throw new StarRocksConnectorException(
"Cannot specify both 'source_table' and 'location' for add_files operation");
}
if (tableLocationArg != null && fileFormatArg == null) {
throw new StarRocksConnectorException(
"'file_format' must be provided when 'location' is specified");
}
String fileFormat = null;
if (fileFormatArg != null) {
fileFormat = fileFormatArg.getVarchar().toLowerCase();
if (!fileFormat.equals("parquet") && !fileFormat.equals("orc")) {
throw new StarRocksConnectorException(
"Unsupported file format: %s. Supported formats are: parquet, orc", fileFormat);
}
}
boolean recursive = true;
ConstantOperator recursiveArg = args.get(RECURSIVE);
if (recursiveArg != null) {
recursive = recursiveArg.getBoolean();
}
Table table = context.table();
PartitionSpec spec = table.spec();
if (spec.isPartitioned() && spec.fields().stream().anyMatch(f -> !f.transform().isIdentity())) {
throw new StarRocksConnectorException(
"Adding files to partitioned tables with non-identity partitioning is not supported, " +
"which will cause data inconsistency");
}
Transaction transaction = context.transaction();
try {
if (tableLocationArg != null) {
// Add files from a specific location
String tableLocation = tableLocationArg.getVarchar();
addFilesFromLocation(context, table, transaction, tableLocation, recursive, fileFormat);
} else {
// Add files from source table (not implemented yet)
throw new StarRocksConnectorException(
"Adding files from source_table is not yet implemented");
}
} catch (Exception e) {
LOGGER.error("Failed to execute add_files procedure", e);
throw new StarRocksConnectorException("Failed to add files: %s", e.getMessage(), e);
}
}
private void addFilesFromLocation(IcebergTableProcedureContext context, Table table, Transaction transaction,
String location, boolean recursive, String fileFormat) throws IOException {
LOGGER.info("Adding files from location: {}", location);
// Get the file system for the location
URI locationUri = new Path(location).toUri();
FileSystem fileSystem = FileSystem.get(locationUri,
context.hdfsEnvironment().getConfiguration());
// Discover data files in the location
List<DataFile> dataFiles = discoverDataFiles(context, table, fileSystem, location, recursive, fileFormat);
if (dataFiles.isEmpty()) {
LOGGER.warn("No data files found at location: {}", location);
return;
}
// Add the files to the table using a transaction
AppendFiles appendFiles = transaction.newAppend();
for (DataFile dataFile : dataFiles) {
appendFiles.appendFile(dataFile);
}
// Commit the transaction
appendFiles.commit();
LOGGER.info("Successfully added {} files to table", dataFiles.size());
}
private List<DataFile> discoverDataFiles(IcebergTableProcedureContext context, Table table, FileSystem fileSystem,
String location, boolean recursive, String fileFormat) throws IOException {
List<DataFile> dataFiles = new ArrayList<>();
Path locationPath = new Path(location);
if (!fileSystem.exists(locationPath)) {
throw new StarRocksConnectorException("Location does not exist: %s", location);
}
FileStatus fileStatus = fileSystem.getFileStatus(locationPath);
if (fileStatus.isFile()) {
// Single file
if (isDataFile(fileStatus)) {
DataFile dataFile = createDataFile(context, table, fileStatus, fileFormat);
if (dataFile != null) {
dataFiles.add(dataFile);
}
} else {
LOGGER.warn("The specified location is a file but not a recognized data file: {}",
fileStatus.getPath());
throw new StarRocksConnectorException("No valid data files found at location: %s", location);
}
return dataFiles;
} else if (fileStatus.isDirectory()) {
// List all files recursively
FileStatus[] files = fileSystem.listStatus(locationPath);
for (FileStatus file : files) {
if (file.isFile() && isDataFile(file)) {
try {
DataFile dataFile = createDataFile(context, table, file, fileFormat);
if (dataFile != null) {
dataFiles.add(dataFile);
}
} catch (Exception e) {
LOGGER.warn("Failed to process file: {}. Error: {}",
file.getPath(), e.getMessage());
throw new StarRocksConnectorException("Failed to process file: %s, error: %s",
file.getPath(), e.getMessage(), e);
}
} else if (file.isDirectory() && recursive) {
// Recursively process subdirectories
dataFiles.addAll(discoverDataFiles(context, table, fileSystem, file.getPath().toString(),
true, fileFormat));
}
}
}
return dataFiles;
}
private boolean isDataFile(FileStatus fileStatus) {
// Support common data file formats as per Iceberg specification
// Skip hidden files and directories (starting with . or _)
String fileName = fileStatus.getPath().getName();
if (fileName.startsWith(".") || fileName.startsWith("_")) {
return false;
}
if (!fileStatus.isFile()) {
return false;
}
return fileStatus.getLen() != 0;
}
private DataFile createDataFile(IcebergTableProcedureContext context, Table table, FileStatus fileStatus,
String fileFormat) {
String filePath = fileStatus.getPath().toString();
long fileSize = fileStatus.getLen();
// Get the table's partition spec
PartitionSpec spec = table.spec();
Optional<StructLike> partition = Optional.empty();
if (spec.isPartitioned()) {
List<String> validPartitionPath = new ArrayList<>();
String[] partitions = filePath.split("/", -1);
for (String part : partitions) {
if (part.contains("=")) {
validPartitionPath.add(part);
}
}
String partitionPath = String.join("/", validPartitionPath);
if (!partitionPath.isEmpty()) {
partition = Optional.of(IcebergPartitionData.partitionDataFromPath(partitionPath, spec));
}
}
// Extract file metrics based on format
Metrics metrics = extractFileMetrics(context, table, fileStatus, fileFormat);
DataFiles.Builder builder = DataFiles.builder(spec)
.withPath(filePath)
.withFileSizeInBytes(fileSize)
.withFormat(fileFormat.toUpperCase())
.withMetrics(metrics);
partition.ifPresent(builder::withPartition);
return builder.build();
}
private Metrics extractFileMetrics(IcebergTableProcedureContext context, Table table, FileStatus fileStatus,
String fileFormat) {
try {
return switch (fileFormat.toLowerCase()) {
case "parquet" -> extractParquetMetrics(context, table, fileStatus);
case "orc" -> extractOrcMetrics(context, table, fileStatus);
default -> {
throw new StarRocksConnectorException("Unsupported file format: %s", fileFormat);
}
};
} catch (Exception e) {
LOGGER.warn("Failed to extract metrics for file: {}, error: {}",
fileStatus.getPath(), e.getMessage());
throw new StarRocksConnectorException("Failed to extract metrics for file: %s, error: %s",
fileStatus.getPath(), e.getMessage(), e);
}
}
private Metrics extractParquetMetrics(IcebergTableProcedureContext context, Table table, FileStatus fileStatus)
throws IOException {
try {
// Create Hadoop input file
HadoopInputFile inputFile = HadoopInputFile.fromStatus(fileStatus,
context.hdfsEnvironment().getConfiguration());
// Read Parquet footer metadata
try (ParquetFileReader reader = ParquetFileReader.open(inputFile)) {
ParquetMetadata metadata = reader.getFooter();
long recordCount = 0;
Map<Integer, Long> columnSizes = new HashMap<>();
Map<Integer, Long> valueCounts = new HashMap<>();
Map<Integer, Long> nullValueCounts = new HashMap<>();
Map<Integer, ByteBuffer> lowerBounds = new HashMap<>();
Map<Integer, ByteBuffer> upperBounds = new HashMap<>();
Set<Integer> missingStats = new HashSet<>();
Schema schema = table.schema();
// Aggregate statistics from all row groups
for (org.apache.parquet.hadoop.metadata.BlockMetaData blockMeta : metadata.getBlocks()) {
recordCount += blockMeta.getRowCount();
for (ColumnChunkMetaData columnMeta : blockMeta.getColumns()) {
String columnPath = columnMeta.getPath().toDotString();
Types.NestedField field = schema.findField(columnPath);
if (field != null) {
int fieldId = field.fieldId();
// Column sizes
columnSizes.merge(fieldId, columnMeta.getTotalSize(), Long::sum);
// Value counts
long valueCount = columnMeta.getValueCount();
valueCounts.merge(fieldId, valueCount, Long::sum);
// null counts
if (columnMeta.getStatistics() != null && !columnMeta.getStatistics().isEmpty()) {
if (columnMeta.getStatistics().getNumNulls() >= 0) {
nullValueCounts.merge(fieldId, columnMeta.getStatistics().getNumNulls(), Long::sum);
}
// Min/Max values
if (columnMeta.getStatistics().hasNonNullValue()) {
// Store min/max values as ByteBuffers
if (!lowerBounds.containsKey(fieldId) || ByteBuffer.wrap(columnMeta.getStatistics().
getMinBytes()).compareTo(lowerBounds.get(fieldId)) < 0) {
lowerBounds.put(fieldId, ByteBuffer.wrap(columnMeta.getStatistics().getMinBytes()));
}
if (!upperBounds.containsKey(fieldId)) {
upperBounds.put(fieldId, ByteBuffer.wrap(columnMeta.getStatistics().getMaxBytes()));
}
}
} else {
missingStats.add(fieldId);
}
}
}
}
for (Integer fieldId : missingStats) {
nullValueCounts.remove(fieldId);
lowerBounds.remove(fieldId);
upperBounds.remove(fieldId);
}
return new Metrics(recordCount, columnSizes, valueCounts, nullValueCounts,
null, lowerBounds, upperBounds);
}
} catch (Exception e) {
LOGGER.warn("Failed to read Parquet metadata for file: {}, error: {}", fileStatus.getPath(), e.getMessage());
throw new IOException("Failed to extract Parquet metrics", e);
}
}
private Metrics extractOrcMetrics(IcebergTableProcedureContext context, Table table,
FileStatus fileStatus) throws IOException {
Path hadoopPath = new Path(fileStatus.getPath().toString());
try {
// Read ORC file metadata
try (Reader orcReader = OrcFile.createReader(hadoopPath,
OrcFile.readerOptions(context.hdfsEnvironment().getConfiguration()))) {
long recordCount = orcReader.getNumberOfRows();
Map<Integer, Long> valueCounts = new HashMap<>();
Map<Integer, Long> nullValueCounts = new HashMap<>();
Schema schema = table.schema();
TypeDescription orcSchema = orcReader.getSchema();
ColumnStatistics[] columnStats = orcReader.getStatistics();
// Extract statistics for each column
for (int colId = 0; colId < columnStats.length; colId++) {
ColumnStatistics stats = columnStats[colId];
// Map ORC column to Iceberg field
String columnName = getColumnNameFromOrcSchema(orcSchema, colId);
if (columnName != null) {
Types.NestedField field = schema.findField(columnName);
if (field != null) {
int fieldId = field.fieldId();
// Value counts and null counts
valueCounts.put(fieldId, stats.getNumberOfValues());
if (stats.hasNull()) {
long nullCount = recordCount - stats.getNumberOfValues();
nullValueCounts.put(fieldId, nullCount);
}
}
}
}
return new Metrics(recordCount, null, valueCounts, nullValueCounts, null);
}
} catch (Exception e) {
LOGGER.warn("Failed to read ORC metadata for file: {}, error: {}", hadoopPath, e.getMessage());
throw new IOException("Failed to extract ORC metrics", e);
}
}
private String getColumnNameFromOrcSchema(TypeDescription orcSchema, int columnId) {
try {
if (columnId < orcSchema.getChildren().size()) {
return orcSchema.getFieldNames().get(columnId);
}
} catch (Exception e) {
LOGGER.warn("Failed to get column name for ORC column ID: {}", columnId);
}
return null;
}
}

View File

@ -0,0 +1,307 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.connector.iceberg.procedure;
import com.starrocks.catalog.Type;
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.iceberg.IcebergTableOperation;
import com.starrocks.connector.iceberg.hive.IcebergHiveCatalog;
import com.starrocks.qe.ConnectContext;
import com.starrocks.sql.ast.AlterTableOperationClause;
import com.starrocks.sql.ast.AlterTableStmt;
import com.starrocks.sql.optimizer.operator.scalar.ConstantOperator;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AddFilesProcedureTest {
public static final HdfsEnvironment HDFS_ENVIRONMENT = new HdfsEnvironment();
@Test
public void testAddFilesProcedureCreation() {
AddFilesProcedure procedure = AddFilesProcedure.getInstance();
assertNotNull(procedure);
Assertions.assertEquals("add_files", procedure.getProcedureName());
Assertions.assertEquals(IcebergTableOperation.ADD_FILES, procedure.getOperation());
Assertions.assertEquals(4, procedure.getArguments().size());
// Check argument names and types
boolean hasSourceTable = procedure.getArguments().stream()
.anyMatch(arg -> "source_table".equals(arg.getName()) && arg.getType() == Type.VARCHAR);
boolean hasLocation = procedure.getArguments().stream()
.anyMatch(arg -> "location".equals(arg.getName()) && arg.getType() == Type.VARCHAR);
boolean hasFileFormat = procedure.getArguments().stream()
.anyMatch(arg -> "file_format".equals(arg.getName()) && arg.getType() == Type.VARCHAR);
boolean hasRecursive = procedure.getArguments().stream()
.anyMatch(arg -> "recursive".equals(arg.getName()) && arg.getType() == Type.BOOLEAN);
assertTrue(hasSourceTable);
assertTrue(hasLocation);
assertTrue(hasFileFormat);
assertTrue(hasRecursive);
// Check that all arguments are optional
procedure.getArguments().forEach(arg ->
assertFalse(arg.isRequired(), "Argument " + arg.getName() + " should be optional"));
}
@Test
public void testExecuteWithoutArguments(@Mocked Table table, @Mocked IcebergHiveCatalog catalog) {
AddFilesProcedure procedure = AddFilesProcedure.getInstance();
IcebergTableProcedureContext context = createMockContext(table, catalog);
Map<String, ConstantOperator> args = new HashMap<>();
StarRocksConnectorException exception = assertThrows(StarRocksConnectorException.class,
() -> procedure.execute(context, args));
assertTrue(exception.getMessage().contains("Either 'source_table' or 'location' must be provided"));
}
@Test
public void testExecuteWithBothSourceTableAndLocation(@Mocked Table table, @Mocked IcebergHiveCatalog catalog) {
AddFilesProcedure procedure = AddFilesProcedure.getInstance();
IcebergTableProcedureContext context = createMockContext(table, catalog);
Map<String, ConstantOperator> args = new HashMap<>();
args.put("source_table", ConstantOperator.createVarchar("test_table"));
args.put("location", ConstantOperator.createVarchar("/test/location"));
StarRocksConnectorException exception = assertThrows(StarRocksConnectorException.class,
() -> procedure.execute(context, args));
assertTrue(exception.getMessage().contains("Cannot specify both 'source_table' and 'location'"));
}
@Test
public void testExecuteWithLocationButNoFileFormat(@Mocked Table table, @Mocked IcebergHiveCatalog catalog) {
AddFilesProcedure procedure = AddFilesProcedure.getInstance();
IcebergTableProcedureContext context = createMockContext(table, catalog);
Map<String, ConstantOperator> args = new HashMap<>();
args.put("location", ConstantOperator.createVarchar("/test/location"));
StarRocksConnectorException exception = assertThrows(StarRocksConnectorException.class,
() -> procedure.execute(context, args));
assertTrue(exception.getMessage().contains("'file_format' must be provided when 'location' is specified"));
}
@Test
public void testExecuteWithUnsupportedFileFormat(@Mocked Table table, @Mocked IcebergHiveCatalog catalog) {
AddFilesProcedure procedure = AddFilesProcedure.getInstance();
IcebergTableProcedureContext context = createMockContext(table, catalog);
Map<String, ConstantOperator> args = new HashMap<>();
args.put("location", ConstantOperator.createVarchar("/test/location"));
args.put("file_format", ConstantOperator.createVarchar("csv"));
StarRocksConnectorException exception = assertThrows(StarRocksConnectorException.class,
() -> procedure.execute(context, args));
assertTrue(exception.getMessage().contains("Unsupported file format: csv"));
}
@Test
public void testExecuteWithNonIdentityPartitioning(@Mocked IcebergHiveCatalog catalog) {
AddFilesProcedure procedure = AddFilesProcedure.getInstance();
// Mock partitioned table with non-identity partitioning
Table table = Mockito.mock(Table.class);
PartitionSpec spec = Mockito.mock(PartitionSpec.class);
Mockito.when(spec.isPartitioned()).thenReturn(true);
// Create a mock field with non-identity transform
org.apache.iceberg.PartitionField field = Mockito.mock(org.apache.iceberg.PartitionField.class);
org.apache.iceberg.transforms.Transform transform =
Mockito.mock(org.apache.iceberg.transforms.Transform.class);
Mockito.when(transform.isIdentity()).thenReturn(false);
Mockito.when(field.transform()).thenReturn(transform);
Mockito.when(spec.fields()).thenReturn(java.util.List.of(field));
Mockito.when(table.spec()).thenReturn(spec);
IcebergTableProcedureContext context = createMockContext(table, catalog);
Map<String, ConstantOperator> args = new HashMap<>();
args.put("location", ConstantOperator.createVarchar("/test/location"));
args.put("file_format", ConstantOperator.createVarchar("parquet"));
StarRocksConnectorException exception = assertThrows(StarRocksConnectorException.class,
() -> procedure.execute(context, args));
assertTrue(exception.getMessage().contains("non-identity partitioning is not supported"));
}
@Test
public void testExecuteWithSourceTable(@Mocked Table table, @Mocked IcebergHiveCatalog catalog) {
AddFilesProcedure procedure = AddFilesProcedure.getInstance();
IcebergTableProcedureContext context = createMockContext(table, catalog);
Map<String, ConstantOperator> args = new HashMap<>();
args.put("source_table", ConstantOperator.createVarchar("test_table"));
StarRocksConnectorException exception = assertThrows(StarRocksConnectorException.class,
() -> procedure.execute(context, args));
assertTrue(exception.getMessage().contains("Adding files from source_table is not yet implemented"));
}
@Test
public void testCreateDataFileWithMetrics() throws Exception {
AddFilesProcedure procedure = AddFilesProcedure.getInstance();
// Mock table schema and partition spec
Schema schema = new Schema(
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get())
);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table table = Mockito.mock(Table.class);
Mockito.when(table.schema()).thenReturn(schema);
Mockito.when(table.spec()).thenReturn(spec);
// Mock FileStatus
FileStatus fileStatus = Mockito.mock(FileStatus.class);
Path filePath = new Path("/test/data.parquet");
Mockito.when(fileStatus.getPath()).thenReturn(filePath);
Mockito.when(fileStatus.getLen()).thenReturn(1024L);
// Test the DataFile creation by testing the basic functionality
// Since the createDataFile method is private, we test the overall behavior
DataFile mockDataFile = Mockito.mock(DataFile.class);
Mockito.when(mockDataFile.location()).thenReturn("/test/data.parquet");
Mockito.when(mockDataFile.fileSizeInBytes()).thenReturn(1024L);
Mockito.when(mockDataFile.format()).thenReturn(org.apache.iceberg.FileFormat.PARQUET);
assertNotNull(mockDataFile);
assertEquals("/test/data.parquet", mockDataFile.location());
assertEquals(1024L, mockDataFile.fileSizeInBytes());
assertEquals(org.apache.iceberg.FileFormat.PARQUET, mockDataFile.format());
}
@Test
public void testIsDataFile() throws Exception {
AddFilesProcedure procedure = AddFilesProcedure.getInstance();
// Use reflection to call private method isDataFile
java.lang.reflect.Method method = AddFilesProcedure.class
.getDeclaredMethod("isDataFile", FileStatus.class);
method.setAccessible(true);
// Test valid data file
FileStatus validFile = Mockito.mock(FileStatus.class);
Mockito.when(validFile.getPath()).thenReturn(new Path("/test/data.parquet"));
Mockito.when(validFile.isFile()).thenReturn(true);
Mockito.when(validFile.getLen()).thenReturn(1024L);
boolean isValid = (boolean) method.invoke(procedure, validFile);
assertTrue(isValid);
// Test hidden file
FileStatus hiddenFile = Mockito.mock(FileStatus.class);
Mockito.when(hiddenFile.getPath()).thenReturn(new Path("/test/.hidden"));
Mockito.when(hiddenFile.isFile()).thenReturn(true);
Mockito.when(hiddenFile.getLen()).thenReturn(1024L);
boolean isHidden = (boolean) method.invoke(procedure, hiddenFile);
assertFalse(isHidden);
// Test zero-length file
FileStatus emptyFile = Mockito.mock(FileStatus.class);
Mockito.when(emptyFile.getPath()).thenReturn(new Path("/test/empty.parquet"));
Mockito.when(emptyFile.isFile()).thenReturn(true);
Mockito.when(emptyFile.getLen()).thenReturn(0L);
boolean isEmpty = (boolean) method.invoke(procedure, emptyFile);
assertFalse(isEmpty);
// Test directory
FileStatus directory = Mockito.mock(FileStatus.class);
Mockito.when(directory.getPath()).thenReturn(new Path("/test/dir"));
Mockito.when(directory.isFile()).thenReturn(false);
boolean isDir = (boolean) method.invoke(procedure, directory);
assertFalse(isDir);
}
@Test
public void testSupportedFileFormats(@Mocked Table table, @Mocked IcebergHiveCatalog catalog) {
AddFilesProcedure procedure = AddFilesProcedure.getInstance();
IcebergTableProcedureContext context = createMockContext(table, catalog);
// Mock file system operations to avoid actual I/O
new MockUp<FileSystem>() {
@Mock
public FileSystem get(java.net.URI uri, org.apache.hadoop.conf.Configuration conf) throws IOException {
FileSystem fs = Mockito.mock(FileSystem.class);
Mockito.when(fs.exists(Mockito.any(Path.class))).thenReturn(false);
return fs;
}
};
// Test supported formats
String[] supportedFormats = {"parquet", "orc"};
for (String format : supportedFormats) {
Map<String, ConstantOperator> args = new HashMap<>();
args.put("location", ConstantOperator.createVarchar("/test/location"));
args.put("file_format", ConstantOperator.createVarchar(format));
// This should not throw an exception for supported formats
// The exception we get should be about location not existing, not unsupported format
StarRocksConnectorException exception = assertThrows(StarRocksConnectorException.class,
() -> procedure.execute(context, args));
assertFalse(exception.getMessage().contains("Unsupported file format"),
"Format " + format + " should be supported");
}
}
private IcebergTableProcedureContext createMockContext(@Mocked Table table, @Mocked IcebergHiveCatalog catalog) {
ConnectContext ctx = Mockito.mock(ConnectContext.class);
AlterTableStmt stmt = Mockito.mock(AlterTableStmt.class);
AlterTableOperationClause clause = Mockito.mock(AlterTableOperationClause.class);
Transaction transaction = Mockito.mock(Transaction.class);
return new IcebergTableProcedureContext(catalog, table, ctx, transaction, HDFS_ENVIRONMENT, stmt, clause);
}
}

View File

@ -0,0 +1,363 @@
-- name: test_iceberg_add_files_from_location
create external catalog iceberg_add_files_${uuid0} PROPERTIES ("type"="iceberg",
"iceberg.catalog.type"="hive",
"iceberg.catalog.hive.metastore.uris"="${iceberg_catalog_hive_metastore_uris}",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}"
);
-- result:
-- !result
create external catalog hive_add_files_${uuid0} PROPERTIES ("type"="hive",
"hive.metastore.uris"="${hive_metastore_uris}",
"aws.s3.access_key"="${oss_ak}",
"aws.s3.secret_key"="${oss_sk}",
"aws.s3.endpoint"="${oss_endpoint}"
);
-- result:
-- !result
create database iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0};
-- result:
-- !result
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_tinyint(
c_smallint smallint,
c_int int,
c_tinyint tinyint
) partition by(c_tinyint) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_tinyint");
-- result:
-- !result
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_tinyint values(1,1,1),(2,2,2);
-- result:
-- !result
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_tinyint(
c_smallint smallint,
c_int int,
c_tinyint tinyint
) partition by(c_tinyint);
-- result:
-- !result
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_tinyint execute add_files(location="oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_tinyint", file_format="parquet");
-- result:
-- !result
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_tinyint order by c_tinyint;
-- result:
1 1 1
2 2 2
-- !result
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_tinyint force;
-- result:
-- !result
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_tinyint force;
-- result:
-- !result
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
c_smallint smallint,
c_int int,
c_date date
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
-- result:
-- !result
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values(1, 1, '2020-01-01'),(2, 2, '2020-01-02');
-- result:
-- !result
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
c_smallint smallint,
c_int int,
c_date date
) partition by(c_date);
-- result:
-- !result
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files(location="oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date", file_format="parquet");
-- result:
-- !result
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date order by c_date;
-- result:
1 1 2020-01-01
2 2 2020-01-02
-- !result
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date force;
-- result:
-- !result
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
-- result:
-- !result
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string(
c_smallint smallint,
c_int int,
c_date date,
c_string string
) partition by(c_date, c_string) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date_string");
-- result:
-- !result
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string
values(1, 1, '2020-01-01', '2020-01-01 00:00:00'), (1, 1, '2020-01-02', '2020-01-02 00:00:00');
-- result:
-- !result
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string(
c_smallint smallint,
c_int int,
c_date date,
c_string string
) partition by(c_date, c_string);
-- result:
-- !result
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string execute add_files(location="oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date_string", file_format="parquet");
-- result:
-- !result
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string order by c_date;
-- result:
1 1 2020-01-01 2020-01-01 00:00:00
1 1 2020-01-02 2020-01-02 00:00:00
-- !result
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string force;
-- result:
-- !result
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string force;
-- result:
-- !result
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
c_smallint smallint,
c_int int,
c_date date
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
-- result:
-- !result
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values (1, 1, '2020-01-01'),(2, 2, '2020-01-02');
-- result:
-- !result
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string(
c_smallint smallint,
c_int int,
c_date date,
c_string string
) partition by(c_date, c_string);
-- result:
-- !result
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string execute add_files(location='oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date', file_format="parquet");
-- result:
[REGEX].*Partition column c_string not found in path.*
-- !result
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string force;
-- result:
-- !result
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
-- result:
-- !result
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string(
c_smallint smallint,
c_int int,
c_date date,
c_string string
) partition by(c_date, c_string) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date_string");
-- result:
-- !result
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string
values(1, 1, '2020-01-01', '2020-01-01 00:00:00'), (1, 1, '2020-01-02', '2020-01-02 00:00:00');
-- result:
-- !result
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
c_smallint smallint,
c_int int,
c_string string,
c_date date
) partition by(c_date);
-- result:
-- !result
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files(location="oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date_string", file_format="parquet");
-- result:
[REGEX].*Partition column c_string not found in iceberg partition columns.*
-- !result
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date force;
-- result:
-- !result
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string force;
-- result:
-- !result
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
c_smallint smallint,
c_int int,
c_date date
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
-- result:
-- !result
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values (1, 1, '2020-01-01'),(2, 2, '2020-01-02');
-- result:
-- !result
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_month(
c_smallint smallint,
c_int int,
c_date date
) partition by month(c_date);
-- result:
-- !result
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_month execute add_files(location="oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date", file_format="parquet");
-- result:
E: (1064, 'Adding files to partitioned tables with non-identity partitioning is not supported, which will cause data inconsistency')
-- !result
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_month force;
-- result:
-- !result
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
-- result:
-- !result
set connector_sink_compression_codec = lz4;
-- result:
-- !result
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_unpar(
c_smallint smallint,
c_int int,
c_date date
) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_unpar");
-- result:
-- !result
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_unpar values(1,2,'2020-01-01'),(2,3,'2020-01-02');
-- result:
-- !result
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar(
c_smallint smallint,
c_int int,
c_date date
);
-- result:
-- !result
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar execute add_files (location = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_unpar", file_format="parquet");
-- result:
-- !result
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar;
-- result:
1 2 2020-01-01
2 3 2020-01-02
-- !result
set connector_sink_compression_codec = uncompressed;
-- result:
-- !result
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar force;
-- result:
-- !result
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_unpar force;
-- result:
-- !result
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
c_smallint smallint,
c_int int,
c_date date
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
-- result:
-- !result
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values(1,2,'2020-01-01'),(2,3,'2020-01-02');
-- result:
-- !result
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar(
c_smallint smallint,
c_int int,
c_date date
);
-- result:
-- !result
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar execute add_files (location = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date/c_date=2020-01-01/", file_format="parquet");
-- result:
-- !result
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar;
-- result:
1 2 None
-- !result
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar force;
-- result:
-- !result
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar(
c_smallint smallint,
c_int int
);
-- result:
-- !result
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar execute add_files (location = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date/c_date=2020-01-02/", file_format="parquet");
-- result:
-- !result
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar;
-- result:
2 3
-- !result
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar force;
-- result:
-- !result
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
-- result:
-- !result
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
c_smallint smallint,
c_int int,
c_date date
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date", "file_format" = "orc");
-- result:
-- !result
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values(1,2, '2020-01-01'), (2,3, '2020-01-02');
-- result:
-- !result
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
c_smallint smallint,
c_int int,
c_date date
) partition by(c_date);
-- result:
-- !result
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files (location = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date/", file_format="orc");
-- result:
-- !result
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date order by c_date;
-- result:
1 2 2020-01-01
2 3 2020-01-02
-- !result
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date force;
-- result:
-- !result
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
set connector_sink_compression_codec = lz4;
-- result:
-- !result
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string(
c_smallint smallint,
c_int int,
c_date date,
c_string string
) partition by(c_date, c_string) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date_string", "file_format"="orc");
-- result:
-- !result
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string values(1,2, '2020-01-01', '2020-01-01 00:00:00'), (2,3, '2020-01-02', '2020-01-02 00:00:00');
-- result:
-- !result
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string(
c_smallint smallint,
c_int int,
c_date date,
c_string string
) partition by(c_date, c_string);
-- result:
-- !result
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string execute add_files (location = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date_string/", file_format="orc");
-- result:
-- !result
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string order by c_date;
-- result:
1 2 2020-01-01 2020-01-01 00:00:00
2 3 2020-01-02 2020-01-02 00:00:00
-- !result
set connector_sink_compression_codec = uncompressed;
-- result:
-- !result
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string force;
-- result:
-- !result
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string force;
-- result:
-- !result
drop database iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0};
-- result:
-- !result
drop catalog iceberg_add_files_${uuid0};
-- result:
-- !result
drop catalog hive_add_files_${uuid0};
-- result:
-- !result

View File

@ -0,0 +1,242 @@
-- name: test_iceberg_add_files_from_location
create external catalog iceberg_add_files_${uuid0} PROPERTIES ("type"="iceberg",
"iceberg.catalog.type"="hive",
"iceberg.catalog.hive.metastore.uris"="${iceberg_catalog_hive_metastore_uris}",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}"
);
create external catalog hive_add_files_${uuid0} PROPERTIES ("type"="hive",
"hive.metastore.uris"="${hive_metastore_uris}",
"aws.s3.access_key"="${oss_ak}",
"aws.s3.secret_key"="${oss_sk}",
"aws.s3.endpoint"="${oss_endpoint}"
);
create database iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0};
-- test partitioned iceberg with single partition(tinyint)
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_tinyint(
c_smallint smallint,
c_int int,
c_tinyint tinyint
) partition by(c_tinyint) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_tinyint");
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_tinyint values(1,1,1),(2,2,2);
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_tinyint(
c_smallint smallint,
c_int int,
c_tinyint tinyint
) partition by(c_tinyint);
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_tinyint execute add_files(location="oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_tinyint", file_format="parquet");
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_tinyint order by c_tinyint;
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_tinyint force;
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_tinyint force;
-- test partitioned iceberg with single partition(date)
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
c_smallint smallint,
c_int int,
c_date date
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values(1, 1, '2020-01-01'),(2, 2, '2020-01-02');
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
c_smallint smallint,
c_int int,
c_date date
) partition by(c_date);
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files(location="oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date", file_format="parquet");
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date order by c_date;
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date force;
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
-- test partitioned iceberg with multi partition(date, string)
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string(
c_smallint smallint,
c_int int,
c_date date,
c_string string
) partition by(c_date, c_string) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date_string");
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string
values(1, 1, '2020-01-01', '2020-01-01 00:00:00'), (1, 1, '2020-01-02', '2020-01-02 00:00:00');
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string(
c_smallint smallint,
c_int int,
c_date date,
c_string string
) partition by(c_date, c_string);
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string execute add_files(location="oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date_string", file_format="parquet");
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string order by c_date;
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string force;
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string force;
-- test partitioned iceberg table which contains date string partition column, location only have one partition
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
c_smallint smallint,
c_int int,
c_date date
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values (1, 1, '2020-01-01'),(2, 2, '2020-01-02');
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string(
c_smallint smallint,
c_int int,
c_date date,
c_string string
) partition by(c_date, c_string);
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string execute add_files(location='oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date', file_format="parquet");
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string force;
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
-- test partitioned iceberg table which has date partition column, location has date and string partition column
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string(
c_smallint smallint,
c_int int,
c_date date,
c_string string
) partition by(c_date, c_string) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date_string");
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string
values(1, 1, '2020-01-01', '2020-01-01 00:00:00'), (1, 1, '2020-01-02', '2020-01-02 00:00:00');
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
c_smallint smallint,
c_int int,
c_string string,
c_date date
) partition by(c_date);
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files(location="oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date_string", file_format="parquet");
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date force;
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string force;
-- test partitioned iceberg table with transform
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
c_smallint smallint,
c_int int,
c_date date
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values (1, 1, '2020-01-01'),(2, 2, '2020-01-02');
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_month(
c_smallint smallint,
c_int int,
c_date date
) partition by month(c_date);
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_month execute add_files(location="oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date", file_format="parquet");
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_month force;
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
-- test unpartitioned table with unpartitioned lz4 data
set connector_sink_compression_codec = lz4;
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_unpar(
c_smallint smallint,
c_int int,
c_date date
) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_unpar");
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_unpar values(1,2,'2020-01-01'),(2,3,'2020-01-02');
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar(
c_smallint smallint,
c_int int,
c_date date
);
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar execute add_files (location = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_unpar", file_format="parquet");
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar;
set connector_sink_compression_codec = uncompressed;
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar force;
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_unpar force;
-- test unpartitioned iceberg table with partitioned data
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
c_smallint smallint,
c_int int,
c_date date
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values(1,2,'2020-01-01'),(2,3,'2020-01-02');
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar(
c_smallint smallint,
c_int int,
c_date date
);
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar execute add_files (location = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date/c_date=2020-01-01/", file_format="parquet");
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar;
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar force;
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar(
c_smallint smallint,
c_int int
);
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar execute add_files (location = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date/c_date=2020-01-02/", file_format="parquet");
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar;
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar force;
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
-- test partitioned iceberg table with orc file_format
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
c_smallint smallint,
c_int int,
c_date date
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date", "file_format" = "orc");
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values(1,2, '2020-01-01'), (2,3, '2020-01-02');
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
c_smallint smallint,
c_int int,
c_date date
) partition by(c_date);
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files (location = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date/", file_format="orc");
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date order by c_date;
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date force;
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
-- test partitioned iceberg table with orc file_format with lz4
set connector_sink_compression_codec = lz4;
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string(
c_smallint smallint,
c_int int,
c_date date,
c_string string
) partition by(c_date, c_string) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date_string", "file_format"="orc");
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string values(1,2, '2020-01-01', '2020-01-01 00:00:00'), (2,3, '2020-01-02', '2020-01-02 00:00:00');
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string(
c_smallint smallint,
c_int int,
c_date date,
c_string string
) partition by(c_date, c_string);
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string execute add_files (location = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date_string/", file_format="orc");
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string order by c_date;
set connector_sink_compression_codec = uncompressed;
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string force;
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string force;
drop database iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0};
drop catalog iceberg_add_files_${uuid0};
drop catalog hive_add_files_${uuid0};