[Enhancement] Support add files from hive table for iceberg procedure(part 2) (#63656)
This commit is contained in:
parent
800e861277
commit
943c6f8a57
|
|
@ -20,6 +20,7 @@ import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public class GetRemoteFilesParams {
|
||||
private List<PartitionKey> partitionKeys;
|
||||
|
|
@ -32,6 +33,7 @@ public class GetRemoteFilesParams {
|
|||
private boolean useCache = true;
|
||||
private boolean checkPartitionExistence = true;
|
||||
private boolean enableColumnStats = false;
|
||||
private Optional<Boolean> isRecursive = Optional.empty();
|
||||
|
||||
protected GetRemoteFilesParams(Builder builder) {
|
||||
this.partitionKeys = builder.partitionKeys;
|
||||
|
|
@ -44,6 +46,7 @@ public class GetRemoteFilesParams {
|
|||
this.useCache = builder.useCache;
|
||||
this.checkPartitionExistence = builder.checkPartitionExistence;
|
||||
this.enableColumnStats = builder.enableColumnStats;
|
||||
this.isRecursive = builder.isRecursive;
|
||||
}
|
||||
|
||||
public int getPartitionSize() {
|
||||
|
|
@ -57,7 +60,7 @@ public class GetRemoteFilesParams {
|
|||
}
|
||||
|
||||
public GetRemoteFilesParams copy() {
|
||||
return GetRemoteFilesParams.newBuilder()
|
||||
Builder paramsBuilder = GetRemoteFilesParams.newBuilder()
|
||||
.setPartitionKeys(partitionKeys)
|
||||
.setPartitionNames(partitionNames)
|
||||
.setPartitionAttachments(partitionAttachments)
|
||||
|
|
@ -67,8 +70,11 @@ public class GetRemoteFilesParams {
|
|||
.setLimit(limit)
|
||||
.setUseCache(useCache)
|
||||
.setCheckPartitionExistence(checkPartitionExistence)
|
||||
.setEnableColumnStats(enableColumnStats)
|
||||
.build();
|
||||
.setEnableColumnStats(enableColumnStats);
|
||||
if (isRecursive.isPresent()) {
|
||||
paramsBuilder.setIsRecursive(isRecursive.get());
|
||||
}
|
||||
return paramsBuilder.build();
|
||||
}
|
||||
|
||||
public GetRemoteFilesParams sub(int start, int end) {
|
||||
|
|
@ -135,6 +141,10 @@ public class GetRemoteFilesParams {
|
|||
return enableColumnStats;
|
||||
}
|
||||
|
||||
public Optional<Boolean> getIsRecursive() {
|
||||
return isRecursive;
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private List<PartitionKey> partitionKeys;
|
||||
private List<String> partitionNames;
|
||||
|
|
@ -146,6 +156,7 @@ public class GetRemoteFilesParams {
|
|||
private boolean useCache = true;
|
||||
private boolean checkPartitionExistence = true;
|
||||
private boolean enableColumnStats = false;
|
||||
private Optional<Boolean> isRecursive = Optional.empty();
|
||||
|
||||
public Builder setPartitionKeys(List<PartitionKey> partitionKeys) {
|
||||
this.partitionKeys = partitionKeys;
|
||||
|
|
@ -197,6 +208,11 @@ public class GetRemoteFilesParams {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setIsRecursive(boolean isRecursive) {
|
||||
this.isRecursive = Optional.of(isRecursive);
|
||||
return this;
|
||||
}
|
||||
|
||||
public GetRemoteFilesParams build() {
|
||||
return new GetRemoteFilesParams(this);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -94,6 +94,11 @@ public class RemoteFileOperations {
|
|||
}
|
||||
|
||||
public List<RemoteFileInfo> getRemoteFiles(Table table, List<Partition> partitions, GetRemoteFilesParams params) {
|
||||
boolean isRecursive = this.isRecursive;
|
||||
if (params.getIsRecursive().isPresent()) {
|
||||
// override the default recursive option
|
||||
isRecursive = params.getIsRecursive().get();
|
||||
}
|
||||
RemoteFileScanContext scanContext = new RemoteFileScanContext(table);
|
||||
Map<RemotePathKey, Partition> pathKeyToPartition = Maps.newHashMap();
|
||||
for (Partition partition : partitions) {
|
||||
|
|
|
|||
|
|
@ -14,11 +14,42 @@
|
|||
|
||||
package com.starrocks.connector.iceberg.procedure;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.starrocks.catalog.Column;
|
||||
import com.starrocks.catalog.PartitionKey;
|
||||
import com.starrocks.catalog.Type;
|
||||
import com.starrocks.common.Pair;
|
||||
import com.starrocks.connector.GetRemoteFilesParams;
|
||||
import com.starrocks.connector.PartitionUtil;
|
||||
import com.starrocks.connector.RemoteFileDesc;
|
||||
import com.starrocks.connector.RemoteFileInfo;
|
||||
import com.starrocks.connector.exception.StarRocksConnectorException;
|
||||
import com.starrocks.connector.iceberg.IcebergPartitionData;
|
||||
import com.starrocks.connector.iceberg.IcebergTableOperation;
|
||||
import com.starrocks.server.GlobalStateMgr;
|
||||
import com.starrocks.sql.analyzer.AnalyzeState;
|
||||
import com.starrocks.sql.analyzer.AstToSQLBuilder;
|
||||
import com.starrocks.sql.analyzer.ExpressionAnalyzer;
|
||||
import com.starrocks.sql.analyzer.Field;
|
||||
import com.starrocks.sql.analyzer.RelationFields;
|
||||
import com.starrocks.sql.analyzer.RelationId;
|
||||
import com.starrocks.sql.analyzer.Scope;
|
||||
import com.starrocks.sql.ast.ParseNode;
|
||||
import com.starrocks.sql.ast.expression.Expr;
|
||||
import com.starrocks.sql.ast.expression.TableName;
|
||||
import com.starrocks.sql.optimizer.OptimizerFactory;
|
||||
import com.starrocks.sql.optimizer.base.ColumnRefFactory;
|
||||
import com.starrocks.sql.optimizer.operator.Operator;
|
||||
import com.starrocks.sql.optimizer.operator.ScanOperatorPredicates;
|
||||
import com.starrocks.sql.optimizer.operator.logical.LogicalHiveScanOperator;
|
||||
import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator;
|
||||
import com.starrocks.sql.optimizer.operator.scalar.ConstantOperator;
|
||||
import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator;
|
||||
import com.starrocks.sql.optimizer.rewrite.OptExternalPartitionPruner;
|
||||
import com.starrocks.sql.optimizer.transformer.ExpressionMapping;
|
||||
import com.starrocks.sql.optimizer.transformer.SqlToScalarOperatorTranslator;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
|
@ -51,8 +82,10 @@ import java.util.HashMap;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
public class AddFilesProcedure extends IcebergTableProcedure {
|
||||
|
|
@ -114,6 +147,7 @@ public class AddFilesProcedure extends IcebergTableProcedure {
|
|||
"Unsupported file format: %s. Supported formats are: parquet, orc", fileFormat);
|
||||
}
|
||||
}
|
||||
|
||||
boolean recursive = true;
|
||||
ConstantOperator recursiveArg = args.get(RECURSIVE);
|
||||
if (recursiveArg != null) {
|
||||
|
|
@ -135,9 +169,9 @@ public class AddFilesProcedure extends IcebergTableProcedure {
|
|||
String tableLocation = tableLocationArg.getVarchar();
|
||||
addFilesFromLocation(context, table, transaction, tableLocation, recursive, fileFormat);
|
||||
} else {
|
||||
// Add files from source table (not implemented yet)
|
||||
throw new StarRocksConnectorException(
|
||||
"Adding files from source_table is not yet implemented");
|
||||
// Add files from source table
|
||||
String sourceTable = sourceTableArg.getVarchar();
|
||||
addFilesFromSourceTable(context, table, transaction, sourceTable, recursive);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Failed to execute add_files procedure", e);
|
||||
|
|
@ -186,7 +220,7 @@ public class AddFilesProcedure extends IcebergTableProcedure {
|
|||
if (fileStatus.isFile()) {
|
||||
// Single file
|
||||
if (isDataFile(fileStatus)) {
|
||||
DataFile dataFile = createDataFile(context, table, fileStatus, fileFormat);
|
||||
DataFile dataFile = createDataFileFromLocation(context, table, fileStatus, fileFormat);
|
||||
if (dataFile != null) {
|
||||
dataFiles.add(dataFile);
|
||||
}
|
||||
|
|
@ -202,7 +236,7 @@ public class AddFilesProcedure extends IcebergTableProcedure {
|
|||
for (FileStatus file : files) {
|
||||
if (file.isFile() && isDataFile(file)) {
|
||||
try {
|
||||
DataFile dataFile = createDataFile(context, table, file, fileFormat);
|
||||
DataFile dataFile = createDataFileFromLocation(context, table, file, fileFormat);
|
||||
if (dataFile != null) {
|
||||
dataFiles.add(dataFile);
|
||||
}
|
||||
|
|
@ -238,14 +272,13 @@ public class AddFilesProcedure extends IcebergTableProcedure {
|
|||
return fileStatus.getLen() != 0;
|
||||
}
|
||||
|
||||
private DataFile createDataFile(IcebergTableProcedureContext context, Table table, FileStatus fileStatus,
|
||||
String fileFormat) {
|
||||
private DataFile createDataFileFromLocation(IcebergTableProcedureContext context, Table table, FileStatus fileStatus,
|
||||
String fileFormat) {
|
||||
String filePath = fileStatus.getPath().toString();
|
||||
long fileSize = fileStatus.getLen();
|
||||
|
||||
// Get the table's partition spec
|
||||
PartitionSpec spec = table.spec();
|
||||
Optional<StructLike> partition = Optional.empty();
|
||||
String partitionPath = "";
|
||||
if (spec.isPartitioned()) {
|
||||
List<String> validPartitionPath = new ArrayList<>();
|
||||
String[] partitions = filePath.split("/", -1);
|
||||
|
|
@ -254,12 +287,23 @@ public class AddFilesProcedure extends IcebergTableProcedure {
|
|||
validPartitionPath.add(part);
|
||||
}
|
||||
}
|
||||
String partitionPath = String.join("/", validPartitionPath);
|
||||
if (!partitionPath.isEmpty()) {
|
||||
partition = Optional.of(IcebergPartitionData.partitionDataFromPath(partitionPath, spec));
|
||||
}
|
||||
partitionPath = String.join("/", validPartitionPath);
|
||||
}
|
||||
|
||||
return buildDataFile(context, table, partitionPath, fileStatus, fileFormat);
|
||||
}
|
||||
|
||||
private DataFile buildDataFile(IcebergTableProcedureContext context, Table table, String partitionPath,
|
||||
FileStatus fileStatus, String fileFormat) {
|
||||
Optional<StructLike> partition = Optional.empty();
|
||||
String filePath = fileStatus.getPath().toString();
|
||||
long fileSize = fileStatus.getLen();
|
||||
|
||||
// Get the table's partition spec
|
||||
PartitionSpec spec = table.spec();
|
||||
if (!Strings.isNullOrEmpty(partitionPath)) {
|
||||
partition = Optional.of(IcebergPartitionData.partitionDataFromPath(partitionPath, spec));
|
||||
}
|
||||
// Extract file metrics based on format
|
||||
Metrics metrics = extractFileMetrics(context, table, fileStatus, fileFormat);
|
||||
|
||||
|
|
@ -270,6 +314,27 @@ public class AddFilesProcedure extends IcebergTableProcedure {
|
|||
.withMetrics(metrics);
|
||||
partition.ifPresent(builder::withPartition);
|
||||
return builder.build();
|
||||
|
||||
}
|
||||
|
||||
private List<DataFile> createDataFilesFromPartition(IcebergTableProcedureContext context, Table table, String partitionName,
|
||||
RemoteFileInfo remoteFileInfo) {
|
||||
String partitionFullPath = remoteFileInfo.getFullPath();
|
||||
List<RemoteFileDesc> remoteFiles = remoteFileInfo.getFiles();
|
||||
if (remoteFiles == null || remoteFiles.isEmpty()) {
|
||||
LOGGER.warn("No files found in RemoteFileInfo for partition: {}", partitionName);
|
||||
return null;
|
||||
}
|
||||
|
||||
return remoteFiles.stream().map(remoteFile -> {
|
||||
String filePath = remoteFile.getFullPath() != null ? remoteFile.getFullPath() :
|
||||
(partitionFullPath.endsWith("/") ? partitionFullPath + remoteFile.getFileName() :
|
||||
partitionFullPath + "/" + remoteFile.getFileName());
|
||||
FileStatus fileStatus = new FileStatus(remoteFile.getLength(), false,
|
||||
1, 0, remoteFile.getModificationTime(), new Path(filePath));
|
||||
return buildDataFile(context, table, table.spec().isPartitioned() ? partitionName : "", fileStatus,
|
||||
remoteFileInfo.getFormat().name());
|
||||
}).filter(Objects::nonNull).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private Metrics extractFileMetrics(IcebergTableProcedureContext context, Table table, FileStatus fileStatus,
|
||||
|
|
@ -422,4 +487,217 @@ public class AddFilesProcedure extends IcebergTableProcedure {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void addFilesFromSourceTable(IcebergTableProcedureContext context, Table table, Transaction transaction,
|
||||
String sourceTable, boolean recursive) throws Exception {
|
||||
ParseNode where = context.clause().getWhere();
|
||||
LOGGER.info("Adding files from source Hive table: {} with partition filter: {}", sourceTable,
|
||||
where != null ? AstToSQLBuilder.toSQL(where) : "none");
|
||||
|
||||
// Parse source table name to get database and table
|
||||
String[] parts = sourceTable.split("\\.");
|
||||
String catalogName;
|
||||
String dbName;
|
||||
String tableName;
|
||||
|
||||
if (parts.length == 3) {
|
||||
// Catalog.database.table format
|
||||
catalogName = parts[0];
|
||||
dbName = parts[1];
|
||||
tableName = parts[2];
|
||||
} else {
|
||||
throw new StarRocksConnectorException(
|
||||
"Invalid source table format: %s. Expected format: catalog.database.table", sourceTable);
|
||||
}
|
||||
|
||||
com.starrocks.catalog.Table sourceHiveTable;
|
||||
try {
|
||||
sourceHiveTable = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable(context.context(), catalogName,
|
||||
dbName, tableName);
|
||||
} catch (Exception e) {
|
||||
throw new StarRocksConnectorException(
|
||||
"Failed to access source table %s: %s", sourceTable, e.getMessage(), e);
|
||||
}
|
||||
|
||||
if (sourceHiveTable == null) {
|
||||
throw new StarRocksConnectorException(
|
||||
"Source table %s not found", sourceTable);
|
||||
}
|
||||
|
||||
// Ensure the source table is a Hive table
|
||||
if (!sourceHiveTable.isHiveTable()) {
|
||||
throw new StarRocksConnectorException(
|
||||
"Source table %s is not a Hive table. Only Hive tables are supported as source tables.", sourceTable);
|
||||
}
|
||||
|
||||
// Use Hive table partition pruning to get filtered partition names
|
||||
List<PartitionKey> filteredPartitionKeys = getFilteredPartitionKeys(context, sourceHiveTable);
|
||||
GetRemoteFilesParams.Builder paramsBuilder;
|
||||
List<GetRemoteFilesParams> paramsList = new ArrayList<>();
|
||||
if (filteredPartitionKeys == null) {
|
||||
// un-partitioned table
|
||||
paramsBuilder = GetRemoteFilesParams.newBuilder()
|
||||
.setUseCache(false)
|
||||
.setIsRecursive(recursive);
|
||||
paramsList.add(paramsBuilder.build());
|
||||
} else if (filteredPartitionKeys.isEmpty()) {
|
||||
// all partitions are pruned
|
||||
LOGGER.warn("No partitions match the specified filter in source Hive table: {}", sourceTable);
|
||||
return;
|
||||
} else {
|
||||
paramsList.addAll(filteredPartitionKeys.stream().map(p -> GetRemoteFilesParams.newBuilder()
|
||||
.setUseCache(false)
|
||||
.setPartitionKeys(List.of(p))
|
||||
.setIsRecursive(recursive)
|
||||
.build()).toList());
|
||||
}
|
||||
|
||||
|
||||
List<Pair<String, List<RemoteFileInfo>>> partitionRemoteFiles = new ArrayList<>();
|
||||
List<String> partitionColumnNames = sourceHiveTable.getPartitionColumnNames();
|
||||
try {
|
||||
for (GetRemoteFilesParams params : paramsList) {
|
||||
List<RemoteFileInfo> remoteFiles = GlobalStateMgr.getCurrentState().getMetadataMgr().
|
||||
getRemoteFiles(sourceHiveTable, params);
|
||||
partitionRemoteFiles.add(Pair.create(params.getPartitionKeys() == null ? "" :
|
||||
params.getPartitionKeys().stream().map(partitionKey ->
|
||||
PartitionUtil.toHivePartitionName(partitionColumnNames, partitionKey)).
|
||||
collect(Collectors.joining()),
|
||||
remoteFiles));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new StarRocksConnectorException(
|
||||
"Failed to get files from source Hive table %s: %s", sourceTable, e.getMessage(), e);
|
||||
}
|
||||
|
||||
if (partitionRemoteFiles.isEmpty()) {
|
||||
LOGGER.warn("No files found in source Hive table: {}", sourceTable);
|
||||
return;
|
||||
}
|
||||
|
||||
// Convert remote files to Iceberg DataFiles
|
||||
List<DataFile> dataFiles = new ArrayList<>();
|
||||
for (Pair<String, List<RemoteFileInfo>> partitionRemoteFileInfo : partitionRemoteFiles) {
|
||||
String partitionName = partitionRemoteFileInfo.first;
|
||||
List<RemoteFileInfo> remoteFileInfos = partitionRemoteFileInfo.second;
|
||||
|
||||
for (RemoteFileInfo remoteFileInfo : remoteFileInfos) {
|
||||
List<DataFile> partitionDataFiles = createDataFilesFromPartition(context, table, partitionName, remoteFileInfo);
|
||||
if (partitionDataFiles != null) {
|
||||
dataFiles.addAll(partitionDataFiles);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (dataFiles.isEmpty()) {
|
||||
LOGGER.warn("No valid data files found after filtering in source Hive table: {}", sourceTable);
|
||||
return;
|
||||
}
|
||||
|
||||
// Add the files to the target Iceberg table
|
||||
AppendFiles appendFiles = transaction.newAppend();
|
||||
for (DataFile dataFile : dataFiles) {
|
||||
appendFiles.appendFile(dataFile);
|
||||
}
|
||||
|
||||
// Commit the transaction
|
||||
appendFiles.commit();
|
||||
LOGGER.info("Successfully added {} files from source Hive table {} to Iceberg table",
|
||||
dataFiles.size(), sourceTable);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get filtered partition keys based on WHERE clause predicate using Hive partition pruning
|
||||
* Reuses key logic from OptExternalPartitionPruner
|
||||
*
|
||||
* @param context The procedure context containing WHERE clause
|
||||
* @param sourceHiveTable The source Hive table
|
||||
* @return List of filtered partition keys, or null if no filtering is needed
|
||||
*/
|
||||
private List<PartitionKey> getFilteredPartitionKeys(IcebergTableProcedureContext context,
|
||||
com.starrocks.catalog.Table sourceHiveTable) {
|
||||
try {
|
||||
Expr whereExpr = context.clause().getWhere();
|
||||
List<Column> partitionColumns = sourceHiveTable.getPartitionColumns();
|
||||
|
||||
if (partitionColumns.isEmpty()) {
|
||||
LOGGER.info("Source table is not partitioned, WHERE clause will be ignored");
|
||||
return null;
|
||||
}
|
||||
|
||||
// Create column reference mappings for reusing OptExternalPartitionPruner logic
|
||||
ColumnRefFactory columnRefFactory = new ColumnRefFactory();
|
||||
Map<Column, ColumnRefOperator> columnToColRefMap = new HashMap<>();
|
||||
Map<ColumnRefOperator, Column> colRefToColumnMap = new HashMap<>();
|
||||
|
||||
// Create a simple mock operator that provides the necessary interface
|
||||
LogicalHiveScanOperator hiveScanOperator = makeSourceTableHiveScanOperator(sourceHiveTable, columnToColRefMap,
|
||||
colRefToColumnMap, columnRefFactory, whereExpr, context);
|
||||
|
||||
OptExternalPartitionPruner.prunePartitions(OptimizerFactory.initContext(context.context(), columnRefFactory),
|
||||
hiveScanOperator);
|
||||
|
||||
ScanOperatorPredicates scanOperatorPredicates = hiveScanOperator.getScanOperatorPredicates();
|
||||
if (!scanOperatorPredicates.getNonPartitionConjuncts().isEmpty() ||
|
||||
!scanOperatorPredicates.getNoEvalPartitionConjuncts().isEmpty()) {
|
||||
LOGGER.warn("WHERE clause contains non-partition predicates or can not eval predicates, " +
|
||||
"non-partition predicates: {}, no-eval partition predicates: {}. ",
|
||||
Joiner.on(", ").join(scanOperatorPredicates.getNonPartitionConjuncts()),
|
||||
Joiner.on(", ").join(scanOperatorPredicates.getNoEvalPartitionConjuncts()));
|
||||
throw new StarRocksConnectorException("WHERE clause contains non-partition predicates or can not eval " +
|
||||
"predicates, only simple partition predicates are supported for partition pruning. " +
|
||||
"Non-partition predicates: %s, no-eval partition predicates: %s",
|
||||
Joiner.on(", ").join(scanOperatorPredicates.getNonPartitionConjuncts()),
|
||||
Joiner.on(", ").join(scanOperatorPredicates.getNoEvalPartitionConjuncts()));
|
||||
}
|
||||
|
||||
List<PartitionKey> partitionKeys = Lists.newArrayList();
|
||||
scanOperatorPredicates.getSelectedPartitionIds().stream()
|
||||
.map(id -> scanOperatorPredicates.getIdToPartitionKey().get(id))
|
||||
.filter(Objects::nonNull)
|
||||
.forEach(partitionKeys::add);
|
||||
LOGGER.info("Partition pruning selected {} partitions, select partitions : {}", partitionKeys.size(),
|
||||
Joiner.on(", ").join(partitionKeys));
|
||||
|
||||
return partitionKeys;
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Failed to perform partition pruning, Error: {}", e.getMessage());
|
||||
throw new StarRocksConnectorException("Failed to perform partition pruning: %s", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private LogicalHiveScanOperator makeSourceTableHiveScanOperator(com.starrocks.catalog.Table sourceTable,
|
||||
Map<Column, ColumnRefOperator> columnToColRefMap,
|
||||
Map<ColumnRefOperator, Column> colRefToColumnMap,
|
||||
ColumnRefFactory columnRefFactory,
|
||||
Expr whereExpr, IcebergTableProcedureContext context) {
|
||||
List<ColumnRefOperator> columnRefOperators = new ArrayList<>();
|
||||
for (Column column : sourceTable.getBaseSchema()) {
|
||||
ColumnRefOperator columnRef = columnRefFactory.create(column.getName(),
|
||||
column.getType(), column.isAllowNull());
|
||||
colRefToColumnMap.put(columnRef, column);
|
||||
columnToColRefMap.put(column, columnRef);
|
||||
columnRefOperators.add(columnRef);
|
||||
}
|
||||
|
||||
ScalarOperator scalarOperator = null;
|
||||
if (whereExpr != null) {
|
||||
// Create scope with table columns for expression analysis
|
||||
TableName sourceTableName = new TableName(sourceTable.getCatalogName(),
|
||||
sourceTable.getCatalogDBName(), sourceTable.getCatalogTableName());
|
||||
Scope scope = new Scope(RelationId.anonymous(), new RelationFields(columnRefOperators.stream()
|
||||
.map(col -> new Field(col.getName(), col.getType(), sourceTableName, null))
|
||||
.collect(Collectors.toList())));
|
||||
// Analyze the WHERE expression
|
||||
ExpressionAnalyzer.analyzeExpression(whereExpr, new AnalyzeState(), scope, context.context());
|
||||
|
||||
// Create expression mapping for conversion
|
||||
ExpressionMapping expressionMapping = new ExpressionMapping(scope, columnRefOperators);
|
||||
|
||||
// Convert Expr to ScalarOperator
|
||||
scalarOperator = SqlToScalarOperatorTranslator.translate(whereExpr, expressionMapping, columnRefFactory);
|
||||
}
|
||||
return new LogicalHiveScanOperator(sourceTable, colRefToColumnMap, columnToColRefMap,
|
||||
Operator.DEFAULT_LIMIT, scalarOperator);
|
||||
}
|
||||
}
|
||||
|
|
@ -47,6 +47,7 @@ import com.starrocks.common.util.DynamicPartitionUtil;
|
|||
import com.starrocks.common.util.PropertyAnalyzer;
|
||||
import com.starrocks.common.util.TimeUtils;
|
||||
import com.starrocks.common.util.WriteQuorum;
|
||||
import com.starrocks.connector.iceberg.IcebergTableOperation;
|
||||
import com.starrocks.connector.iceberg.procedure.IcebergTableProcedure;
|
||||
import com.starrocks.connector.iceberg.procedure.NamedArgument;
|
||||
import com.starrocks.lake.LakeTable;
|
||||
|
|
@ -1625,7 +1626,7 @@ public class AlterTableClauseAnalyzer implements AstVisitorExtendInterface<Void,
|
|||
}
|
||||
clause.setAnalyzedArgs(constantArgs);
|
||||
|
||||
if (clause.getWhere() != null) {
|
||||
if (clause.getWhere() != null && icebergTableProcedure.getOperation() == IcebergTableOperation.REWRITE_DATA_FILES) {
|
||||
ColumnRefFactory columnRefFactory = new ColumnRefFactory();
|
||||
List<ColumnRefOperator> columnRefOperators = table.getBaseSchema()
|
||||
.stream()
|
||||
|
|
|
|||
|
|
@ -222,6 +222,10 @@ public class AlterTableOperationStmtTest {
|
|||
|
||||
new Expectations() {
|
||||
{
|
||||
icebergTable.getTableProcedure(anyString);
|
||||
result = RewriteDataFilesProcedure.getInstance();
|
||||
minTimes = 0;
|
||||
|
||||
icebergTable.getPartitionColumnsIncludeTransformed();
|
||||
result = List.of(new Column("k1", Type.INT, true),
|
||||
new Column("partition_date", Type.DATE, true));
|
||||
|
|
|
|||
|
|
@ -176,16 +176,21 @@ public class AddFilesProcedureTest {
|
|||
IcebergTableProcedureContext context = createMockContext(table, catalog);
|
||||
|
||||
Map<String, ConstantOperator> args = new HashMap<>();
|
||||
args.put("source_table", ConstantOperator.createVarchar("test_table"));
|
||||
args.put("source_table", ConstantOperator.createVarchar("test_catalog.test_db.test_table"));
|
||||
|
||||
// This test will now validate the new source table functionality
|
||||
// The test should fail due to missing Hive table access, not "not implemented"
|
||||
StarRocksConnectorException exception = assertThrows(StarRocksConnectorException.class,
|
||||
() -> procedure.execute(context, args));
|
||||
|
||||
assertTrue(exception.getMessage().contains("Adding files from source_table is not yet implemented"));
|
||||
// The exception should be about Hive table access, not "not implemented"
|
||||
assertTrue(exception.getMessage().contains("Failed to access source table") ||
|
||||
exception.getMessage().contains("not found") ||
|
||||
exception.getMessage().contains("not a Hive table"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDataFileWithMetrics() throws Exception {
|
||||
public void testCreateDataFileFromLocationWithMetrics() throws Exception {
|
||||
AddFilesProcedure procedure = AddFilesProcedure.getInstance();
|
||||
|
||||
// Mock table schema and partition spec
|
||||
|
|
|
|||
|
|
@ -0,0 +1,350 @@
|
|||
-- name: test_iceberg_add_files_from_tbl
|
||||
create external catalog iceberg_add_files_${uuid0} PROPERTIES ("type"="iceberg",
|
||||
"iceberg.catalog.type"="hive",
|
||||
"iceberg.catalog.hive.metastore.uris"="${iceberg_catalog_hive_metastore_uris}",
|
||||
"aws.s3.access_key" = "${oss_ak}",
|
||||
"aws.s3.secret_key" = "${oss_sk}",
|
||||
"aws.s3.endpoint" = "${oss_endpoint}"
|
||||
);
|
||||
-- result:
|
||||
-- !result
|
||||
create external catalog hive_add_files_${uuid0} PROPERTIES ("type"="hive",
|
||||
"hive.metastore.uris"="${hive_metastore_uris}",
|
||||
"aws.s3.access_key"="${oss_ak}",
|
||||
"aws.s3.secret_key"="${oss_sk}",
|
||||
"aws.s3.endpoint"="${oss_endpoint}"
|
||||
);
|
||||
-- result:
|
||||
-- !result
|
||||
create database iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0};
|
||||
-- result:
|
||||
-- !result
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_tinyint(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_tinyint tinyint
|
||||
) partition by(c_tinyint) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_tinyint");
|
||||
-- result:
|
||||
-- !result
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_tinyint values(1,1,1),(2,2,2);
|
||||
-- result:
|
||||
-- !result
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_tinyint(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_tinyint tinyint
|
||||
) partition by(c_tinyint);
|
||||
-- result:
|
||||
-- !result
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_tinyint execute add_files(source_table="hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_tinyint");
|
||||
-- result:
|
||||
-- !result
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_tinyint order by c_tinyint;
|
||||
-- result:
|
||||
1 1 1
|
||||
2 2 2
|
||||
-- !result
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_tinyint force;
|
||||
-- result:
|
||||
-- !result
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_tinyint force;
|
||||
-- result:
|
||||
-- !result
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
|
||||
-- result:
|
||||
-- !result
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values(1, 1, '2020-01-01'),(2, 2, '2020-01-02'),(3,3,'2020-01-03'),(4,4,'2020-01-04'),(5,5,'2020-01-05');
|
||||
-- result:
|
||||
-- !result
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date);
|
||||
-- result:
|
||||
-- !result
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files(source_table="hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date");
|
||||
-- result:
|
||||
-- !result
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date order by c_date;
|
||||
-- result:
|
||||
1 1 2020-01-01
|
||||
2 2 2020-01-02
|
||||
3 3 2020-01-03
|
||||
4 4 2020-01-04
|
||||
5 5 2020-01-05
|
||||
-- !result
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date;
|
||||
-- result:
|
||||
-- !result
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date);
|
||||
-- result:
|
||||
-- !result
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files(source_table="hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date") where c_date>='2020-01-03';
|
||||
-- result:
|
||||
-- !result
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date order by c_date;
|
||||
-- result:
|
||||
3 3 2020-01-03
|
||||
4 4 2020-01-04
|
||||
5 5 2020-01-05
|
||||
-- !result
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date force;
|
||||
-- result:
|
||||
-- !result
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
|
||||
-- result:
|
||||
-- !result
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date,
|
||||
c_string string
|
||||
) partition by(c_date, c_string) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date_string");
|
||||
-- result:
|
||||
-- !result
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string
|
||||
values(1, 1, '2020-01-01', '2020-01-01 00:00:00'), (1, 1, '2020-01-02', '2020-01-02 00:00:00');
|
||||
-- result:
|
||||
-- !result
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date,
|
||||
c_string string
|
||||
) partition by(c_date, c_string);
|
||||
-- result:
|
||||
-- !result
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string execute add_files(source_table="hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string") where c_date='2020-01-01';
|
||||
-- result:
|
||||
-- !result
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string order by c_date;
|
||||
-- result:
|
||||
1 1 2020-01-01 2020-01-01 00:00:00
|
||||
-- !result
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string force;
|
||||
-- result:
|
||||
-- !result
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string force;
|
||||
-- result:
|
||||
-- !result
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
|
||||
-- result:
|
||||
-- !result
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values (1, 1, '2020-01-01'),(2, 2, '2020-01-02');
|
||||
-- result:
|
||||
-- !result
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date,
|
||||
c_string string
|
||||
) partition by(c_date, c_string);
|
||||
-- result:
|
||||
-- !result
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string execute add_files(source_table='hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date');
|
||||
-- result:
|
||||
E: (1064, 'Failed to add files: Partition column c_string not found in path c_date=2020-01-01')
|
||||
-- !result
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string force;
|
||||
-- result:
|
||||
-- !result
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
|
||||
-- result:
|
||||
-- !result
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date,
|
||||
c_string string
|
||||
) partition by(c_date, c_string) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date_string");
|
||||
-- result:
|
||||
-- !result
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string
|
||||
values(1, 1, '2020-01-01', '2020-01-01 00:00:00'), (1, 1, '2020-01-02', '2020-01-02 00:00:00');
|
||||
-- result:
|
||||
-- !result
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_string string,
|
||||
c_date date
|
||||
) partition by(c_date);
|
||||
-- result:
|
||||
-- !result
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files(source_table="hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string");
|
||||
-- result:
|
||||
E: (1064, 'Failed to add files: Partition column c_string not found in iceberg partition columns')
|
||||
-- !result
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date force;
|
||||
-- result:
|
||||
-- !result
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string force;
|
||||
-- result:
|
||||
-- !result
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
|
||||
-- result:
|
||||
-- !result
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values (1, 1, '2020-01-01'),(2, 2, '2020-01-02');
|
||||
-- result:
|
||||
-- !result
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_month(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by month(c_date);
|
||||
-- result:
|
||||
-- !result
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_month execute add_files(source_table="hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date");
|
||||
-- result:
|
||||
E: (1064, 'Adding files to partitioned tables with non-identity partitioning is not supported, which will cause data inconsistency')
|
||||
-- !result
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_month force;
|
||||
-- result:
|
||||
-- !result
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
|
||||
-- result:
|
||||
-- !result
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
|
||||
-- result:
|
||||
-- !result
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values(1,2,'2020-01-01'),(2,3,'2020-01-02');
|
||||
-- result:
|
||||
-- !result
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
);
|
||||
-- result:
|
||||
-- !result
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar execute add_files (source_table = "hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date") where c_date='2020-01-01';
|
||||
-- result:
|
||||
-- !result
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar;
|
||||
-- result:
|
||||
1 2 None
|
||||
-- !result
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar force;
|
||||
-- result:
|
||||
-- !result
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar(
|
||||
c_smallint smallint,
|
||||
c_int int
|
||||
);
|
||||
-- result:
|
||||
-- !result
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar execute add_files (source_table = "hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date") where c_date='2020-01-02';
|
||||
-- result:
|
||||
-- !result
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar;
|
||||
-- result:
|
||||
2 3
|
||||
-- !result
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar force;
|
||||
-- result:
|
||||
-- !result
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
|
||||
-- result:
|
||||
-- !result
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date", "file_format" = "orc");
|
||||
-- result:
|
||||
-- !result
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values(1,2, '2020-01-01'), (2,3, '2020-01-02');
|
||||
-- result:
|
||||
-- !result
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date);
|
||||
-- result:
|
||||
-- !result
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files (source_table = "hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date");
|
||||
-- result:
|
||||
-- !result
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date order by c_date;
|
||||
-- result:
|
||||
1 2 2020-01-01
|
||||
2 3 2020-01-02
|
||||
-- !result
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date force;
|
||||
-- result:
|
||||
-- !result
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
|
||||
|
||||
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
|
||||
-- result:
|
||||
-- !result
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values(1, 1, '2020-01-01'),(2, 2, '2020-01-02'),(3,3,'2020-01-03'),(4,4,'2020-01-04'),(5,5,'2020-01-05');
|
||||
-- result:
|
||||
-- !result
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date);
|
||||
-- result:
|
||||
-- !result
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files(source_table="hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date") where c_date>='2020-01-03';
|
||||
-- result:
|
||||
-- !result
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date order by c_date;
|
||||
-- result:
|
||||
3 3 2020-01-03
|
||||
4 4 2020-01-04
|
||||
5 5 2020-01-05
|
||||
-- !result
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date;
|
||||
-- result:
|
||||
-- !result
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date);
|
||||
-- result:
|
||||
-- !result
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files(source_table="hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date") where days_add(c_date, 2) >= '2020-01-06';
|
||||
-- result:
|
||||
-- !result
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date order by c_date;
|
||||
-- result:
|
||||
4 4 2020-01-04
|
||||
5 5 2020-01-05
|
||||
-- !result
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date force;
|
||||
-- result:
|
||||
-- !result
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
|
||||
-- result:
|
||||
-- !result
|
||||
|
|
@ -0,0 +1,233 @@
|
|||
-- name: test_iceberg_add_files_from_tbl
|
||||
|
||||
create external catalog iceberg_add_files_${uuid0} PROPERTIES ("type"="iceberg",
|
||||
"iceberg.catalog.type"="hive",
|
||||
"iceberg.catalog.hive.metastore.uris"="${iceberg_catalog_hive_metastore_uris}",
|
||||
"aws.s3.access_key" = "${oss_ak}",
|
||||
"aws.s3.secret_key" = "${oss_sk}",
|
||||
"aws.s3.endpoint" = "${oss_endpoint}"
|
||||
);
|
||||
|
||||
create external catalog hive_add_files_${uuid0} PROPERTIES ("type"="hive",
|
||||
"hive.metastore.uris"="${hive_metastore_uris}",
|
||||
"aws.s3.access_key"="${oss_ak}",
|
||||
"aws.s3.secret_key"="${oss_sk}",
|
||||
"aws.s3.endpoint"="${oss_endpoint}"
|
||||
);
|
||||
|
||||
create database iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0};
|
||||
|
||||
-- test partitioned iceberg with single partition(tinyint)
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_tinyint(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_tinyint tinyint
|
||||
) partition by(c_tinyint) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_tinyint");
|
||||
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_tinyint values(1,1,1),(2,2,2);
|
||||
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_tinyint(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_tinyint tinyint
|
||||
) partition by(c_tinyint);
|
||||
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_tinyint execute add_files(source_table="hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_tinyint");
|
||||
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_tinyint order by c_tinyint;
|
||||
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_tinyint force;
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_tinyint force;
|
||||
|
||||
-- test partitioned iceberg with single partition(date)
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
|
||||
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values(1, 1, '2020-01-01'),(2, 2, '2020-01-02'),(3,3,'2020-01-03'),(4,4,'2020-01-04'),(5,5,'2020-01-05');
|
||||
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date);
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files(source_table="hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date");
|
||||
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date order by c_date;
|
||||
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date;
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date);
|
||||
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files(source_table="hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date") where c_date>='2020-01-03';
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date order by c_date;
|
||||
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date force;
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
|
||||
|
||||
-- test partitioned iceberg with multi partition(date, string)
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date,
|
||||
c_string string
|
||||
) partition by(c_date, c_string) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date_string");
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string
|
||||
values(1, 1, '2020-01-01', '2020-01-01 00:00:00'), (1, 1, '2020-01-02', '2020-01-02 00:00:00');
|
||||
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date,
|
||||
c_string string
|
||||
) partition by(c_date, c_string);
|
||||
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string execute add_files(source_table="hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string") where c_date='2020-01-01';
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string order by c_date;
|
||||
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string force;
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string force;
|
||||
|
||||
-- test partitioned iceberg table which contains date string partition column, location only have one partition
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
|
||||
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values (1, 1, '2020-01-01'),(2, 2, '2020-01-02');
|
||||
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date,
|
||||
c_string string
|
||||
) partition by(c_date, c_string);
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string execute add_files(source_table='hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date');
|
||||
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_string force;
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
|
||||
|
||||
-- test partitioned iceberg table which has date partition column, location has date and string partition column
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date,
|
||||
c_string string
|
||||
) partition by(c_date, c_string) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date_string");
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string
|
||||
values(1, 1, '2020-01-01', '2020-01-01 00:00:00'), (1, 1, '2020-01-02', '2020-01-02 00:00:00');
|
||||
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_string string,
|
||||
c_date date
|
||||
) partition by(c_date);
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files(source_table="hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string");
|
||||
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date force;
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date_string force;
|
||||
|
||||
-- test partitioned iceberg table with transform
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
|
||||
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values (1, 1, '2020-01-01'),(2, 2, '2020-01-02');
|
||||
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_month(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by month(c_date);
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_month execute add_files(source_table="hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date");
|
||||
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date_month force;
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
|
||||
|
||||
-- test unpartitioned iceberg table with partitioned data
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values(1,2,'2020-01-01'),(2,3,'2020-01-02');
|
||||
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
);
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar execute add_files (source_table = "hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date") where c_date='2020-01-01';
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar;
|
||||
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar force;
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar(
|
||||
c_smallint smallint,
|
||||
c_int int
|
||||
);
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar execute add_files (source_table = "hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date") where c_date='2020-01-02';
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar;
|
||||
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_unpar force;
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
|
||||
|
||||
|
||||
-- test partitioned iceberg table with orc file_format
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date", "file_format" = "orc");
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values(1,2, '2020-01-01'), (2,3, '2020-01-02');
|
||||
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date);
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files (source_table = "hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date");
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date order by c_date;
|
||||
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date force;
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
|
||||
|
||||
|
||||
-- test partitioned iceberg with partition pruning
|
||||
create table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date) properties ("location" = "oss://${oss_bucket}/iceberg_add_files_db_${uuid0}/hive_par_date");
|
||||
|
||||
insert into hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date values(1, 1, '2020-01-01'),(2, 2, '2020-01-02'),(3,3,'2020-01-03'),(4,4,'2020-01-04'),(5,5,'2020-01-05');
|
||||
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date);
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files(source_table="hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date") where c_date>='2020-01-03';
|
||||
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date order by c_date;
|
||||
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date;
|
||||
create table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date(
|
||||
c_smallint smallint,
|
||||
c_int int,
|
||||
c_date date
|
||||
) partition by(c_date);
|
||||
|
||||
alter table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date execute add_files(source_table="hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date") where days_add(c_date, 2) >= '2020-01-06';
|
||||
select * from iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date order by c_date;
|
||||
|
||||
drop table iceberg_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.iceberg_par_date force;
|
||||
drop table hive_add_files_${uuid0}.iceberg_add_files_db_${uuid0}.hive_par_date force;
|
||||
Loading…
Reference in New Issue