[Enhancement] revise iceberg rewrite data (backport #61851) (#61913)

Signed-off-by: SevenJ <wenjun7j@gmail.com>
Co-authored-by: SevenJ <166966490+Wenjun7J@users.noreply.github.com>
This commit is contained in:
mergify[bot] 2025-08-20 10:18:55 +08:00 committed by GitHub
parent 4217260158
commit b68721abdc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 521 additions and 302 deletions

View File

@ -16,6 +16,9 @@ package com.starrocks.connector.iceberg;
import com.google.common.annotations.VisibleForTesting;
import com.starrocks.analysis.ColumnPosition;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.SlotRef;
import com.starrocks.analysis.TableName;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Type;
import com.starrocks.common.DdlException;
@ -63,10 +66,13 @@ import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.velocity.VelocityContext;
import org.apache.velocity.app.VelocityEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.StringWriter;
import java.net.URI;
import java.time.Duration;
import java.time.LocalDateTime;
@ -97,6 +103,7 @@ public class IcebergAlterTableExecutor extends ConnectorAlterTableExecutor {
private IcebergCatalog icebergCatalog;
private Transaction transaction;
private HdfsEnvironment hdfsEnvironment;
private ConnectContext context;
private static final int DELETE_BATCH_SIZE = 1000;
@ -113,6 +120,18 @@ public class IcebergAlterTableExecutor extends ConnectorAlterTableExecutor {
this.hdfsEnvironment = hdfsEnvironment;
}
public IcebergAlterTableExecutor(AlterTableStmt stmt,
Table table,
IcebergCatalog icebergCatalog,
ConnectContext context,
HdfsEnvironment hdfsEnvironment) {
super(stmt);
this.table = table;
this.icebergCatalog = icebergCatalog;
this.context = context;
this.hdfsEnvironment = hdfsEnvironment;
}
@Override
public void applyClauses() throws DdlException {
transaction = table.newTransaction();
@ -471,6 +490,7 @@ public class IcebergAlterTableExecutor extends ConnectorAlterTableExecutor {
rollbackToSnapshot(args);
break;
case REWRITE_DATA_FILES:
rewriteDataFiles(clause, this.context);
break;
default:
throw new StarRocksConnectorException("Unsupported table operation %s", op);
@ -529,6 +549,54 @@ public class IcebergAlterTableExecutor extends ConnectorAlterTableExecutor {
});
}
public void rewriteDataFiles(AlterTableOperationClause clause, ConnectContext context) {
boolean rewriteAll = clause.isRewriteAll();
long minFileSizeBytes = clause.getMinFileSizeBytes();
long batchSize = clause.getBatchSize();
Expr partitionFilter = clause.getWhere();
String catalogName = stmt.getCatalogName();
String dbName = stmt.getDbName();
String tableName = stmt.getTableName();
VelocityContext velCtx = new VelocityContext();
velCtx.put("catalogName", catalogName);
velCtx.put("dbName", dbName);
velCtx.put("tableName", tableName);
String partitionFilterSql = null;
if (partitionFilter != null) {
List<SlotRef> slots = new ArrayList<>();
partitionFilter.collect(SlotRef.class, slots);
for (SlotRef slot : slots) {
slot.setTblName(new TableName(dbName, tableName));
}
partitionFilterSql = partitionFilter.toSql();
}
velCtx.put("partitionFilterSql", partitionFilterSql);
VelocityEngine defaultVelocityEngine = new VelocityEngine();
defaultVelocityEngine.setProperty(VelocityEngine.RUNTIME_LOG_REFERENCE_LOG_INVALID, false);
StringWriter writer = new StringWriter();
defaultVelocityEngine.evaluate(velCtx, writer, "InsertSelectTemplate",
"INSERT INTO $catalogName.$dbName.$tableName" +
" SELECT * FROM $catalogName.$dbName.$tableName" +
" #if ($partitionFilterSql)" +
" WHERE $partitionFilterSql" +
" #end"
);
String executeStmt = writer.toString();
IcebergRewriteDataJob job = new IcebergRewriteDataJob(executeStmt, rewriteAll,
minFileSizeBytes, batchSize, context, stmt);
try {
job.prepare();
job.execute();
} catch (Exception e) {
LOGGER.error("failed to rewrite data files for iceberg table {}.{}",
stmt.getDbName(), stmt.getTableName(), e);
throw new StarRocksConnectorException("execute rewrite data files for iceberg table %s.%s failed: %s",
stmt.getDbName(), stmt.getTableName(), e.getMessage(), e);
}
}
private void expireSnapshots(List<ConstantOperator> args) {
if (args.size() > 1) {
throw new StarRocksConnectorException("invalid args. only support `older_than` in the expire snapshot operation");

View File

@ -355,7 +355,7 @@ public class IcebergMetadata implements ConnectorMetadata {
"Failed to load iceberg table: " + stmt.getTbl().toString());
}
IcebergAlterTableExecutor executor = new IcebergAlterTableExecutor(stmt, table, icebergCatalog, hdfsEnvironment);
IcebergAlterTableExecutor executor = new IcebergAlterTableExecutor(stmt, table, icebergCatalog, context, hdfsEnvironment);
executor.execute();
synchronized (this) {

View File

@ -0,0 +1,111 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.connector.iceberg;
import com.starrocks.common.util.UUIDUtil;
import com.starrocks.connector.RemoteFileInfo;
import com.starrocks.connector.iceberg.IcebergRewriteData;
import com.starrocks.planner.IcebergScanNode;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.StmtExecutor;
import com.starrocks.sql.StatementPlanner;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.ast.AlterTableStmt;
import com.starrocks.sql.ast.IcebergRewriteStmt;
import com.starrocks.sql.ast.InsertStmt;
import com.starrocks.sql.ast.StatementBase;
import com.starrocks.sql.plan.ExecPlan;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.stream.Collectors;
public class IcebergRewriteDataJob {
protected static final Logger LOG = LogManager.getLogger(IcebergRewriteDataJob.class);
private final String insertSql;
private final boolean rewriteAll;
private final long minFileSizeBytes;
private final long batchSize;
private final ConnectContext context;
private final AlterTableStmt originAlterStmt;
private IcebergRewriteStmt rewriteStmt;
private ExecPlan execPlan;
private List<IcebergScanNode> scanNodes;
private IcebergRewriteData rewriteData;
public IcebergRewriteDataJob(String insertSql,
boolean rewriteAll,
long minFileSizeBytes,
long batchSize,
ConnectContext context,
AlterTableStmt stmt) {
this.insertSql = insertSql;
this.rewriteAll = rewriteAll;
this.minFileSizeBytes = minFileSizeBytes;
this.batchSize = batchSize;
this.context = context;
this.originAlterStmt = stmt;
}
public void prepare() throws Exception {
StatementBase parsedStmt = com.starrocks.sql.parser.SqlParser
.parse(insertSql, context.getSessionVariable())
.get(0);
rewriteStmt = new IcebergRewriteStmt((InsertStmt) parsedStmt, rewriteAll);
this.execPlan = StatementPlanner.plan(parsedStmt, context);
this.scanNodes = execPlan.getFragments().stream()
.flatMap(fragment -> fragment.collectScanNodes().values().stream())
.filter(scan -> scan instanceof IcebergScanNode && "IcebergScanNode".equals(scan.getPlanNodeName()))
.map(scan -> (IcebergScanNode) scan)
.collect(Collectors.toList());
IcebergScanNode targetNode = scanNodes.stream().findFirst().orElse(null);
if (targetNode == null) {
throw new SemanticException("No valid IcebergScanNode found for rewrite.");
}
this.rewriteData = new IcebergRewriteData();
this.rewriteData.setSource(targetNode.getSourceRange());
this.rewriteData.setBatchSize(batchSize);
this.rewriteData.buildNewScanNodeRange(minFileSizeBytes, rewriteAll);
}
public void execute() throws Exception {
if (rewriteStmt == null || execPlan == null || rewriteData == null) {
throw new IllegalStateException("Must call prepare() before execute()");
}
StmtExecutor executor = StmtExecutor.newInternalExecutor(context, rewriteStmt);
try {
while (rewriteData.hasMoreTaskGroup()) {
List<RemoteFileInfo> res = rewriteData.nextTaskGroup();
for (IcebergScanNode scanNode : scanNodes) {
scanNode.rebuildScanRange(res);
}
context.setQueryId(UUIDUtil.genUUID());
context.setExecutionId(UUIDUtil.toTUniqueId(context.getQueryId()));
executor.handleDMLStmt(execPlan, rewriteStmt);
}
} catch (Exception e) {
LOG.warn("Failed to rewrite iceberg table: {}, catalog: {}, db: {}, table: {}",
e.getMessage(), originAlterStmt.getCatalogName(),
originAlterStmt.getDbName(), originAlterStmt.getTableName());
context.getState().setError(e.getMessage());
return;
}
}
}

View File

@ -34,9 +34,9 @@ public enum IcebergTableOperation {
}
public enum RewriteFileOption {
REWRITE_ALL,
MIN_FILE_SIZE_BYTES,
BATCH_SIZE,
REWRITE_ALL, // rewrite all the files under the specified partitions, ignore other param like min_file_size_bytes, default false
MIN_FILE_SIZE_BYTES, // to filter data file by size, default 256MB
BATCH_SIZE, // the max size of total data files to rewrite at one time, default 10GB
UNKNOWN;
public static RewriteFileOption fromString(String catStr) {

View File

@ -48,7 +48,6 @@ import com.starrocks.analysis.HintNode;
import com.starrocks.analysis.Parameter;
import com.starrocks.analysis.RedirectStatus;
import com.starrocks.analysis.SetVarHint;
import com.starrocks.analysis.SlotRef;
import com.starrocks.analysis.StringLiteral;
import com.starrocks.analysis.TableName;
import com.starrocks.analysis.UserVariableHint;
@ -93,9 +92,7 @@ import com.starrocks.common.util.TimeUtils;
import com.starrocks.common.util.UUIDUtil;
import com.starrocks.common.util.concurrent.lock.LockType;
import com.starrocks.common.util.concurrent.lock.Locker;
import com.starrocks.connector.RemoteFileInfo;
import com.starrocks.connector.iceberg.IcebergMetadata;
import com.starrocks.connector.iceberg.IcebergRewriteData;
import com.starrocks.failpoint.FailPointExecutor;
import com.starrocks.http.HttpConnectContext;
import com.starrocks.http.HttpResultSender;
@ -158,9 +155,6 @@ import com.starrocks.sql.ast.AddBackendBlackListStmt;
import com.starrocks.sql.ast.AddComputeNodeBlackListStmt;
import com.starrocks.sql.ast.AddSqlBlackListStmt;
import com.starrocks.sql.ast.AdminSetConfigStmt;
import com.starrocks.sql.ast.AlterClause;
import com.starrocks.sql.ast.AlterTableOperationClause;
import com.starrocks.sql.ast.AlterTableStmt;
import com.starrocks.sql.ast.AnalyzeProfileStmt;
import com.starrocks.sql.ast.AnalyzeStmt;
import com.starrocks.sql.ast.AnalyzeTypeDesc;
@ -182,6 +176,7 @@ import com.starrocks.sql.ast.ExecuteAsStmt;
import com.starrocks.sql.ast.ExecuteScriptStmt;
import com.starrocks.sql.ast.ExecuteStmt;
import com.starrocks.sql.ast.ExportStmt;
import com.starrocks.sql.ast.IcebergRewriteStmt;
import com.starrocks.sql.ast.InsertStmt;
import com.starrocks.sql.ast.KillAnalyzeStmt;
import com.starrocks.sql.ast.KillStmt;
@ -2121,86 +2116,8 @@ public class StmtExecutor {
return explainString;
}
public void handleIcebergRewrite(
boolean rewriteAll, long minFileSizeBytes, long batchSize, Expr partitionFilter) {
AlterTableStmt stmt = (AlterTableStmt) parsedStmt;
String catalogName = stmt.getCatalogName();
String dbName = stmt.getDbName();
String tableName = stmt.getTableName();
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("INSERT INTO ")
.append(catalogName).append(".")
.append(dbName).append(".")
.append(tableName)
.append(" SELECT * FROM ")
.append(catalogName).append(".")
.append(dbName).append(".")
.append(tableName);
if (partitionFilter != null) {
List<SlotRef> slots = new ArrayList<>();
partitionFilter.collect(SlotRef.class, slots);
for (SlotRef slot : slots) {
slot.setTblName(new TableName(dbName, tableName));
}
sqlBuilder.append(" WHERE ").append(partitionFilter.toSql());
}
String insertSelect = sqlBuilder.toString();
StatementBase statementBase =
com.starrocks.sql.parser.SqlParser.parse(insertSelect, context.getSessionVariable()).get(0);
((InsertStmt) statementBase).setRewrite(true);
((InsertStmt) statementBase).setRewriteAll(rewriteAll);
ExecPlan execPlan = StatementPlanner.plan(statementBase, context);
List<IcebergScanNode> scanNodes = execPlan.getFragments().stream()
.flatMap(fragment -> fragment.collectScanNodes().values().stream())
.filter(scan -> scan instanceof IcebergScanNode && "IcebergScanNode".equals(scan.getPlanNodeName()))
.map(scan -> (IcebergScanNode) scan)
.collect(Collectors.toList());
IcebergScanNode targetNode = scanNodes.stream()
.filter(s -> "IcebergScanNode".equals(s.getPlanNodeName()))
.findFirst()
.orElse(null);
IcebergRewriteData rewriteData = new IcebergRewriteData();
rewriteData.setSource(targetNode.getSourceRange());
rewriteData.setBatchSize(batchSize);
rewriteData.buildNewScanNodeRange(minFileSizeBytes, rewriteAll);
try {
while (rewriteData.hasMoreTaskGroup()) {
List<RemoteFileInfo> res = rewriteData.nextTaskGroup();
for (IcebergScanNode scanNode : scanNodes) {
scanNode.rebuildScanRange(res);
}
handleDMLStmt(execPlan, (DmlStmt) statementBase);
}
} catch (Exception e) {
LOG.warn("Failed to rewrite iceberg table: {}, catalog: {}, db: {}, table: {}",
e.getMessage(), catalogName, dbName, tableName);
context.getState().setError(e.getMessage());
return;
}
}
public boolean tryHandleIcebergRewriteData() {
if (parsedStmt instanceof AlterTableStmt) {
AlterTableStmt stmt = (AlterTableStmt) parsedStmt;
List<AlterClause> clauses = stmt.getAlterClauseList();
if (clauses.size() == 1 && clauses.get(0) instanceof AlterTableOperationClause) {
AlterTableOperationClause c = (AlterTableOperationClause) clauses.get(0);
if (c.getTableOperationName().equalsIgnoreCase("REWRITE_DATA_FILES")) {
handleIcebergRewrite(c.isRewriteAll(), c.getMinFileSizeBytes(), c.getBatchSize(), c.getWhere());
return true;
}
}
}
return false;
}
private void handleDdlStmt() throws DdlException {
try {
if (tryHandleIcebergRewriteData()) {
return;
}
ShowResultSet resultSet = DDLStmtExecutor.execute(parsedStmt, context);
if (resultSet == null) {
context.getState().setOk();
@ -2463,9 +2380,9 @@ public class StmtExecutor {
}
}
public void fillRewriteFiles(DmlStmt stmt, ExecPlan execPlan,
public IcebergMetadata.IcebergSinkExtra fillRewriteFiles(DmlStmt stmt, ExecPlan execPlan,
List<TSinkCommitInfo> commitInfos, IcebergMetadata.IcebergSinkExtra extra) {
if (stmt instanceof InsertStmt && ((InsertStmt) stmt).isRewrite()) {
if (stmt instanceof IcebergRewriteStmt) {
for (TSinkCommitInfo commitInfo : commitInfos) {
commitInfo.setIs_rewrite(true);
}
@ -2477,13 +2394,14 @@ public class StmtExecutor {
if (scan instanceof IcebergScanNode && scan.getPlanNodeName().equals("IcebergScanNode")) {
extra.addAppliedDeleteFiles(((IcebergScanNode) scan).getPosAppliedDeleteFiles());
extra.addScannedDataFiles(((IcebergScanNode) scan).getScannedDataFiles());
if (((InsertStmt) stmt).rewriteAll()) {
if (((IcebergRewriteStmt) stmt).rewriteAll()) {
extra.addAppliedDeleteFiles(((IcebergScanNode) scan).getEqualAppliedDeleteFiles());
}
}
}
}
}
return extra;
}
/**
@ -2781,7 +2699,7 @@ public class StmtExecutor {
}
IcebergTableSink sink = (IcebergTableSink) execPlan.getFragments().get(0).getSink();
IcebergMetadata.IcebergSinkExtra extra = null;
fillRewriteFiles(stmt, execPlan, commitInfos, extra);
extra = fillRewriteFiles(stmt, execPlan, commitInfos, extra);
context.getGlobalStateMgr().getMetadataMgr().finishSink(
catalogName, dbName, tableName, commitInfos, sink.getTargetBranch(), (Object) extra);

View File

@ -1525,18 +1525,21 @@ public class AlterTableClauseAnalyzer implements AstVisitor<Void, ConnectContext
if (IcebergTableOperation.fromString(clause.getTableOperationName())
== IcebergTableOperation.REWRITE_DATA_FILES) {
if (!(expr instanceof BinaryPredicate)) {
throw new SemanticException("Invalid arg: " + expr);
throw new SemanticException("Invalid parameter format: expected 'param = value', but got: " + expr);
}
BinaryPredicate binExpr = (BinaryPredicate) expr;
if (binExpr.getOp() != BinaryType.EQ) {
throw new SemanticException("Invalid arg: " + expr);
throw new SemanticException("Invalid expression:" +
"only equality comparisons ('param = value') are supported, but got: " + expr);
} else if (!(binExpr.getChild(0) instanceof LiteralExpr) ||
!(binExpr.getChild(1) instanceof LiteralExpr)) {
throw new SemanticException("Invalid arg: " + expr);
throw new SemanticException("Invalid expression:" +
"both sides of the predicate must be literals, but got: " + expr);
}
LiteralExpr constExpr = (LiteralExpr) binExpr.getChild(0);
if (!(constExpr instanceof StringLiteral)) {
throw new SemanticException("Invalid arg: " + constExpr);
throw new SemanticException("Invalid expression:" +
"the left side of the predicate must be a string literal, but got: " + constExpr);
}
IcebergTableOperation.RewriteFileOption option =
IcebergTableOperation.RewriteFileOption.fromString(((StringLiteral) constExpr).getValue());

View File

@ -0,0 +1,33 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.sql.ast;
import com.starrocks.sql.ast.InsertStmt;
public class IcebergRewriteStmt extends InsertStmt {
private final boolean rewriteAll;
public IcebergRewriteStmt(InsertStmt base, boolean rewriteAll) {
super(base.getTableName(), base.getTargetPartitionNames(), base.getLabel(), base.getTargetColumnNames(),
base.getQueryStatement(), base.isOverwrite(), base.getProperties(), base.getPos());
super.setOrigStmt(base.getOrigStmt());
this.rewriteAll = rewriteAll;
}
public boolean rewriteAll() {
return rewriteAll;
}
}

View File

@ -111,10 +111,6 @@ public class InsertStmt extends DmlStmt {
// if isFromOverwrite is true, it means that the insert statement is generated by the overwrite statement
private boolean isFromOverwrite = false;
private boolean isRewrite = false;
private boolean rewriteAll = false;
public InsertStmt(TableName tblName, PartitionNames targetPartitionNames, String label, List<String> cols,
QueryStatement queryStatement, boolean isOverwrite, Map<String, String> insertProperties,
NodePosition pos) {
@ -184,22 +180,6 @@ public class InsertStmt extends DmlStmt {
public void setOverwrite(boolean overwrite) {
isOverwrite = overwrite;
}
public boolean isRewrite() {
return isRewrite;
}
public void setRewrite(boolean rewrite) {
isRewrite = rewrite;
}
public boolean rewriteAll() {
return rewriteAll;
}
public void setRewriteAll(boolean rewriteAll) {
this.rewriteAll = rewriteAll;
}
public void setOverwriteJobId(long overwriteJobId) {
this.overwriteJobId = overwriteJobId;

View File

@ -1826,6 +1826,7 @@ public class IcebergMetadataTest extends TableTestBase {
tableName,
List.of(clause)),
icebergHiveCatalog.getTable(connectContext, tableName.getDb(), tableName.getTbl()), icebergHiveCatalog,
connectContext,
HDFS_ENVIRONMENT);
executor.execute();
@ -1838,6 +1839,7 @@ public class IcebergMetadataTest extends TableTestBase {
tableName,
List.of(clause)),
icebergHiveCatalog.getTable(connectContext, tableName.getDb(), tableName.getTbl()), icebergHiveCatalog,
connectContext,
HDFS_ENVIRONMENT);
IcebergAlterTableExecutor finalExecutor = executor;
Assertions.assertThrows(DdlException.class, finalExecutor::execute);
@ -1851,6 +1853,7 @@ public class IcebergMetadataTest extends TableTestBase {
tableName,
List.of(clause)),
icebergHiveCatalog.getTable(connectContext, tableName.getDb(), tableName.getTbl()), icebergHiveCatalog,
connectContext,
HDFS_ENVIRONMENT);
finalExecutor = executor;
Assertions.assertThrows(DdlException.class, finalExecutor::execute);
@ -1891,6 +1894,7 @@ public class IcebergMetadataTest extends TableTestBase {
tableName,
List.of(clause)),
icebergHiveCatalog.getTable(connectContext, tableName.getDb(), tableName.getTbl()), icebergHiveCatalog,
connectContext,
HDFS_ENVIRONMENT);
executor.execute();
}
@ -1945,11 +1949,12 @@ public class IcebergMetadataTest extends TableTestBase {
AlterTableOperationClause clause = new AlterTableOperationClause(
NodePosition.ZERO, ROLLBACK_TO_SNAPSHOT.toString(),
List.of(new IntLiteral(1, NodePosition.ZERO)), null);
clause.setArgs(List.of(ConstantOperator.createBigint(1))); // 建议改用 bigint
clause.setArgs(List.of(ConstantOperator.createBigint(1)));
IcebergAlterTableExecutor executor = new IcebergAlterTableExecutor(
new AlterTableStmt(tableName, List.of(clause)),
icebergHiveCatalog.getTable(connectContext, tableName.getDb(), tableName.getTbl()),
icebergHiveCatalog,
connectContext,
HDFS_ENVIRONMENT);
executor.execute();
}

View File

@ -6,6 +6,7 @@ import com.starrocks.analysis.BoolLiteral;
import com.starrocks.analysis.DescriptorTable;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.IntLiteral;
import com.starrocks.analysis.SlotRef;
import com.starrocks.analysis.StringLiteral;
import com.starrocks.analysis.TableName;
import com.starrocks.analysis.TupleDescriptor;
@ -19,9 +20,11 @@ import com.starrocks.connector.CatalogConnector;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.ConnectorMgr;
import com.starrocks.connector.ConnectorTblMetaInfoMgr;
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.RemoteFileInfo;
import com.starrocks.connector.RemoteFileInfoSource;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.iceberg.IcebergAlterTableExecutor;
import com.starrocks.connector.iceberg.IcebergConnectorScanRangeSource;
import com.starrocks.connector.iceberg.IcebergMORParams;
import com.starrocks.connector.iceberg.IcebergMetadata;
@ -32,8 +35,10 @@ import com.starrocks.connector.iceberg.IcebergMetadata.IcebergSinkExtra;
import com.starrocks.connector.iceberg.IcebergMetadata.RewriteData;
import com.starrocks.connector.iceberg.IcebergRemoteFileInfo;
import com.starrocks.connector.iceberg.IcebergRewriteData;
import com.starrocks.connector.iceberg.IcebergRewriteDataJob;
import com.starrocks.connector.iceberg.IcebergTableMORParams;
import com.starrocks.connector.iceberg.IcebergTableOperation;
import com.starrocks.connector.iceberg.hive.IcebergHiveCatalog;
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.credential.CloudConfigurationFactory;
import com.starrocks.qe.ConnectContext;
@ -49,6 +54,7 @@ import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.ast.AlterTableOperationClause;
import com.starrocks.sql.ast.AlterTableStmt;
import com.starrocks.sql.ast.DmlStmt;
import com.starrocks.sql.ast.IcebergRewriteStmt;
import com.starrocks.sql.ast.InsertStmt;
import com.starrocks.sql.ast.QueryStatement;
import com.starrocks.sql.ast.StatementBase;
@ -101,8 +107,11 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
public class IcebergScanNodeTest {
import javax.swing.text.html.Option;
import static com.starrocks.common.util.Util.executeCommand;
public class IcebergScanNodeTest {
public static final HdfsEnvironment HDFS_ENVIRONMENT = new HdfsEnvironment();
class TestableIcebergConnectorScanRangeSource extends IcebergConnectorScanRangeSource {
public TestableIcebergConnectorScanRangeSource(IcebergConnectorScanRangeSource original) {
super(
@ -139,13 +148,6 @@ public class IcebergScanNodeTest {
String catalog = "XXX";
CloudConfiguration cc = CloudConfigurationFactory.buildCloudConfigurationForStorage(new HashMap<>());
// new MockUp<IcebergConnectorScanRangeSource>() {
// @Mock
// long addPartition(FileScanTask fileScanTask) {
// return 123L;
// }
// };
List<DeleteFile> delFiles = new ArrayList<>();
delFiles.add(mockPosDelFile);
delFiles.add(mockEqDelFile);
@ -336,175 +338,6 @@ public class IcebergScanNodeTest {
IcebergTableOperation.RewriteFileOption.fromString(null));
}
static class TestableStmtExecutor extends StmtExecutor {
boolean dmlCalled = false;
public TestableStmtExecutor(@Mocked QueryStatement queryStmt) {
super(new ConnectContext(), new InsertStmt(new TableName("test_db", "test_tbl"), queryStmt));
}
@Override
public void handleDMLStmt(ExecPlan plan, DmlStmt stmt) {
dmlCalled = true;
}
}
@Test
public void testTryHandleIcebergRewriteData_FullFlow(
@Mocked AlterTableStmt stmt,
@Mocked AlterTableOperationClause clause,
@Mocked SqlParser parser,
@Mocked StatementPlanner planner,
@Mocked IcebergTable table,
@Mocked QueryStatement queryStmt,
@Mocked RemoteFileInfo remoteFileInfo) throws Exception {
new Expectations() {{
stmt.getAlterClauseList(); result = Collections.singletonList(clause); minTimes = 0;
clause.getTableOperationName(); result = "REWRITE_DATA_FILES"; minTimes = 0;
clause.isRewriteAll(); result = true; minTimes = 0;
clause.getMinFileSizeBytes(); result = 1024L; minTimes = 0;
clause.getBatchSize(); result = 2048L; minTimes = 0;
clause.getWhere(); result = null; minTimes = 0;
}};
new MockUp<IcebergScanNode>() {
@Mock
public String getPlanNodeName() {
return "IcebergScanNode";
}
@Mock
public void rebuildScanRange(List<RemoteFileInfo> res) {
// no-op
}
@Mock
public IcebergConnectorScanRangeSource getSourceRange() {
TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
return Deencapsulation.newInstance(IcebergConnectorScanRangeSource.class,
table, // mock IcebergTable
new RemoteFileInfoSource() {
private boolean called = false;
@Override
public List<RemoteFileInfo> getAllOutputs() {
return Collections.singletonList(remoteFileInfo);
}
@Override
public RemoteFileInfo getOutput() {
return remoteFileInfo;
}
@Override
public boolean hasMoreOutput() {
if (!called) {
called = true;
return true;
}
return false;
}
},
IcebergMORParams.DATA_FILE_WITHOUT_EQ_DELETE, // IcebergMORParams
desc, // TupleDescriptor
Optional.empty(), // Optional<List<BucketProperty>>
true
);
}
};
new MockUp<IcebergRewriteData>() {
@Mock
public void buildNewScanNodeRange(long fileSizeThreshold, boolean allFiles) {
return;
}
};
new MockUp<StatementPlanner>() {
@Mock
public ExecPlan plan(StatementBase stmt, ConnectContext ctx) {
TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
desc.setTable(table);
new Expectations(table) {{
table.getCatalogName(); result = null; minTimes = 0;
table.getCatalogDBName(); result = "mock_db"; minTimes = 0;
table.getCatalogTableName(); result = "mock_table"; minTimes = 0;
table.getUUID(); result = "mock_table:uuid"; minTimes = 0;
}};
IcebergScanNode scanNode = new IcebergScanNode(new PlanNodeId(0), desc, null, null, null);
scanNode.setPlanNodeName("IcebergScanNode");
// TupleDescriptor tupleDesc = new TupleDescriptor(new TupleId(0));
// EmptySetNode node = new EmptySetNode(new PlanNodeId(0),
// new ArrayList<>(Collections.singletonList(new TupleId(0))));
Map<PlanNodeId, ScanNode> scanNodeMap = new HashMap<>();
scanNodeMap.put(scanNode.getId(), scanNode);
PlanFragment fragment = new PlanFragment(
new PlanFragmentId(0),
scanNode,
DataPartition.UNPARTITIONED
);
new MockUp<PlanFragment>() {
@Mock
public Map<PlanNodeId, ScanNode> collectScanNodes() {
return scanNodeMap;
}
};
return new ExecPlan(ctx, Collections.singletonList(fragment));
}
};
InsertStmt fakeInsert = new InsertStmt(new TableName("db", "table"), queryStmt) {
@Override
public void setRewrite(boolean rewrite) {
// mock behavior
}
@Override
public void setRewriteAll(boolean rewriteAll) {
// mock behavior
}
};
new Expectations() {{
SqlParser.parse(anyString, (SessionVariable) any); result = Collections.singletonList(fakeInsert); minTimes = 0;
}};
ExecPlan fakePlan = new ExecPlan(new ConnectContext(), Collections.emptyList()) {
@Override
public ArrayList<PlanFragment> getFragments() {
return new ArrayList<>();
}
};
new Expectations() {{
StatementPlanner.plan((StatementBase) any, (ConnectContext) any);
result = fakePlan;
minTimes = 0;
}};
new MockUp<IcebergRewriteData>() {
int count = 0;
@Mock public void setSource(Object src) {}
@Mock public void setBatchSize(long size) {}
@Mock public void buildNewScanNodeRange(long min, boolean all) {}
@Mock public boolean hasMoreTaskGroup() { return count++ == 0; }
@Mock public List<RemoteFileInfo> nextTaskGroup() { return Collections.emptyList(); }
};
new MockUp<IcebergScanNode>() {
@Mock public String getPlanNodeName() { return "IcebergScanNode"; }
@Mock public void rebuildScanRange(List<RemoteFileInfo> res) {}
};
TestableStmtExecutor executor = new TestableStmtExecutor(queryStmt);
Deencapsulation.setField(executor, "parsedStmt", stmt);
boolean result = executor.tryHandleIcebergRewriteData();
Assertions.assertTrue(result, "should return true");
Assertions.assertTrue(executor.dmlCalled, "handleDMLStmt should be called");
}
@Test
public void testVisitAlterTableOperationClause_rewriteDataFiles2(@Mocked IcebergTable table) {
AlterTableClauseAnalyzer analyzer = new AlterTableClauseAnalyzer(table);
@ -634,7 +467,7 @@ public class IcebergScanNodeTest {
SemanticException ex = Assertions.assertThrows(SemanticException.class, () ->
analyzer.visitAlterTableOperationClause(clause, new ConnectContext()));
Assertions.assertTrue(ex.getMessage().contains("Invalid arg"));
Assertions.assertTrue(ex.getMessage().contains("Invalid"));
}
@Test
@ -648,7 +481,7 @@ public class IcebergScanNodeTest {
SemanticException ex = Assertions.assertThrows(SemanticException.class, () ->
analyzer.visitAlterTableOperationClause(clause, new ConnectContext()));
Assertions.assertTrue(ex.getMessage().contains("Invalid arg"));
Assertions.assertTrue(ex.getMessage().contains("Invalid"));
}
@Test
@ -663,7 +496,7 @@ public class IcebergScanNodeTest {
SemanticException ex = Assertions.assertThrows(SemanticException.class, () ->
analyzer.visitAlterTableOperationClause(clause, new ConnectContext()));
Assertions.assertTrue(ex.getMessage().contains("Invalid arg"));
Assertions.assertTrue(ex.getMessage().contains("Invalid"));
}
@Test
@ -677,7 +510,7 @@ public class IcebergScanNodeTest {
SemanticException ex = Assertions.assertThrows(SemanticException.class, () ->
analyzer.visitAlterTableOperationClause(clause, new ConnectContext()));
Assertions.assertTrue(ex.getMessage().contains("Invalid arg"));
Assertions.assertTrue(ex.getMessage().contains("Invalid"));
}
@Test
@ -934,9 +767,8 @@ public class IcebergScanNodeTest {
@Test
public void testFillRewriteFiles_shouldFillExtraCorrectly() throws Exception {
// 1. Mock InsertStmt
InsertStmt insertStmt = Mockito.mock(InsertStmt.class);
Mockito.when(insertStmt.isRewrite()).thenReturn(true);
Mockito.when(insertStmt.rewriteAll()).thenReturn(true);
IcebergRewriteStmt rewriteStmt = Mockito.mock(IcebergRewriteStmt.class);
Mockito.when(rewriteStmt.rewriteAll()).thenReturn(true);
// 2. Mock IcebergScanNode
IcebergScanNode scanNode = Mockito.mock(IcebergScanNode.class);
@ -975,7 +807,7 @@ public class IcebergScanNodeTest {
IcebergMetadata.IcebergSinkExtra extra = new IcebergMetadata.IcebergSinkExtra();
// 7. Call target method
executor.fillRewriteFiles(insertStmt, execPlan, commitInfos, extra);
executor.fillRewriteFiles(rewriteStmt, execPlan, commitInfos, extra);
// 8. Assert
Assertions.assertTrue(info1.isIs_rewrite());
@ -1119,4 +951,273 @@ public class IcebergScanNodeTest {
metadataMgr.finishSink("unknownCatalog", "db", "tbl", new ArrayList<>(), "branch", new Object());
}
@Test
void execute_shouldRunNormally_whenPreparedStateAndTasksExist() throws Exception {
ConnectContext context = Mockito.mock(ConnectContext.class, Mockito.RETURNS_DEEP_STUBS);
AlterTableStmt alter = Mockito.mock(AlterTableStmt.class);
IcebergRewriteStmt rewriteStmt = Mockito.mock(IcebergRewriteStmt.class);
ExecPlan execPlan = Mockito.mock(ExecPlan.class);
IcebergScanNode scanNode = Mockito.mock(IcebergScanNode.class);
IcebergRewriteData rewriteData = Mockito.mock(IcebergRewriteData.class);
PlanFragment fragment = Mockito.mock(PlanFragment.class);
ArrayList<PlanFragment> fragments = new ArrayList<>();
Map<PlanNodeId, ScanNode> scanMap = new HashMap<>();
scanMap.put(new PlanNodeId(1), scanNode);
fragments.add(fragment);
Mockito.when(execPlan.getFragments()).thenReturn(fragments);
Mockito.when(fragment.collectScanNodes()).thenReturn(scanMap);
Mockito.when(rewriteData.hasMoreTaskGroup())
.thenReturn(true)
.thenReturn(false);
List<RemoteFileInfo> oneGroup = Collections.emptyList();
Mockito.when(rewriteData.nextTaskGroup()).thenReturn(oneGroup);
Mockito.when(scanNode.getPlanNodeName()).thenReturn("IcebergScanNode");
RemoteFileInfo remoteFileInfo = Mockito.mock(RemoteFileInfo.class);
// --- Mock DataFile ---
DataFile dataFile = Mockito.mock(DataFile.class);
Mockito.when(dataFile.fileSizeInBytes()).thenReturn(500L);
// --- Mock FileScanTask ---
FileScanTask fileScanTask = Mockito.mock(FileScanTask.class);
Mockito.when(fileScanTask.file()).thenReturn(dataFile);
Mockito.when(fileScanTask.deletes()).thenReturn(Collections.emptyList());
// --- Mock IcebergRemoteFileInfo ---
IcebergRemoteFileInfo icebergRemoteFileInfo = Mockito.mock(IcebergRemoteFileInfo.class);
Mockito.when(icebergRemoteFileInfo.getFileScanTask()).thenReturn(fileScanTask);
Mockito.when(remoteFileInfo.cast()).thenReturn(icebergRemoteFileInfo);
RemoteFileInfoSource remoteFileInfoSource = new RemoteFileInfoSource() {
int count = 0;
@Override
public RemoteFileInfo getOutput() {
count++;
return remoteFileInfo;
}
@Override
public boolean hasMoreOutput() {
return count == 0;
}
};
IcebergTable icebergTable = Mockito.mock(IcebergTable.class);
IcebergMORParams morParams = Mockito.mock(IcebergMORParams.class);
TupleDescriptor tupleDesc = Mockito.mock(TupleDescriptor.class);
IcebergConnectorScanRangeSource fakeSourceRange = new IcebergConnectorScanRangeSource(
icebergTable,
remoteFileInfoSource,
morParams,
tupleDesc,
Optional.empty()
);
Mockito.when(scanNode.getSourceRange()).thenReturn(fakeSourceRange);
StmtExecutor executor = Mockito.mock(StmtExecutor.class);
new MockUp<StmtExecutor>() {
@Mock
public StmtExecutor newInternalExecutor(ConnectContext c, StatementBase s) {
return executor;
}
};
new MockUp<com.starrocks.sql.parser.SqlParser>() {
@Mock
public List<com.starrocks.sql.ast.StatementBase> parse(String sql, SessionVariable sessionVariable) {
return Collections.singletonList(Mockito.mock(com.starrocks.sql.ast.InsertStmt.class));
}
};
new MockUp<StatementPlanner>() {
@Mock
public ExecPlan plan(StatementBase stmt, ConnectContext session) {
return execPlan;
}
};
new MockUp<IcebergScanNode>() {
@Mock
public void rebuildScanRange(List<RemoteFileInfo> splits) {
return;
}
};
new MockUp<StmtExecutor>() {
@Mock
public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) {
return;
}
};
new MockUp<IcebergRewriteData>() {
@Mock
public void buildNewScanNodeRange(long fileSizeThreshold, boolean allFiles) {
return;
}
};
new MockUp<IcebergRewriteStmt>() {
@Mock
public void $init(InsertStmt base, boolean rewriteAll) {
//do nothing
}
};
IcebergRewriteDataJob job = new IcebergRewriteDataJob(
"insert into t select * from t", false, 0L, 10L, context, alter);
job.prepare();
Deencapsulation.setField(job, "execPlan", execPlan);
Deencapsulation.setField(job, "scanNodes", Arrays.asList(scanNode));
Deencapsulation.setField(job, "rewriteStmt", rewriteStmt);
Deencapsulation.setField(job, "rewriteData", rewriteData);
job.execute();
Mockito.verify(rewriteData, Mockito.times(2)).hasMoreTaskGroup();
Mockito.verify(rewriteData, Mockito.times(1)).nextTaskGroup();
Mockito.verify(scanNode, Mockito.times(1)).rebuildScanRange(oneGroup);
Mockito.verify(executor, Mockito.times(1)).handleDMLStmt(execPlan, rewriteStmt);
}
@Test
void execute_shouldSetErrorAndReturn_whenExecutorThrows() throws Exception {
ConnectContext context = Mockito.mock(ConnectContext.class, Mockito.RETURNS_DEEP_STUBS);
AlterTableStmt alter = Mockito.mock(AlterTableStmt.class);
IcebergRewriteStmt rewriteStmt = Mockito.mock(IcebergRewriteStmt.class);
ExecPlan execPlan = Mockito.mock(ExecPlan.class);
IcebergScanNode scanNode = Mockito.mock(IcebergScanNode.class);
IcebergRewriteData rewriteData = Mockito.mock(IcebergRewriteData.class);
Mockito.when(rewriteData.hasMoreTaskGroup())
.thenReturn(true)
.thenReturn(false);
Mockito.when(rewriteData.nextTaskGroup()).thenReturn(Collections.emptyList());
StmtExecutor executor = Mockito.mock(StmtExecutor.class);
Mockito.doThrow(new RuntimeException("boom")).when(executor).handleDMLStmt(execPlan, rewriteStmt);
new MockUp<StmtExecutor>() {
@Mock
public StmtExecutor newInternalExecutor(ConnectContext c, StatementBase s) {
return executor;
}
};
new MockUp<IcebergScanNode>() {
@Mock
public void rebuildScanRange(List<RemoteFileInfo> splits) {
return;
}
};
IcebergRewriteDataJob job = new IcebergRewriteDataJob(
"insert into t select 1", false, 0L, 10L, context, alter);
Deencapsulation.setField(job, "rewriteStmt", rewriteStmt);
Deencapsulation.setField(job, "execPlan", execPlan);
Deencapsulation.setField(job, "scanNodes", Arrays.asList(scanNode));
Deencapsulation.setField(job, "rewriteData", rewriteData);
job.execute();
Mockito.verify(context.getState(), Mockito.times(1)).setError("boom");
Mockito.verify(scanNode, Mockito.times(1)).rebuildScanRange(Mockito.anyList());
}
@Test
void rewriteDataFiles_shouldBuildSQL_andRunJob(@Mocked org.apache.iceberg.Table table,
@Mocked IcebergHiveCatalog icebergHiveCatalog) throws Exception {
// --- arrange
String catalog = "c1";
String db = "db";
String tbl = "table";
AlterTableStmt stmt = Mockito.mock(AlterTableStmt.class);
Mockito.when(stmt.getCatalogName()).thenReturn(catalog);
Mockito.when(stmt.getDbName()).thenReturn(db);
Mockito.when(stmt.getTableName()).thenReturn(tbl);
AlterTableOperationClause clause = Mockito.mock(AlterTableOperationClause.class);
Mockito.when(clause.isRewriteAll()).thenReturn(true);
Mockito.when(clause.getMinFileSizeBytes()).thenReturn(128L);
Mockito.when(clause.getBatchSize()).thenReturn(10L);
Expr where = Mockito.mock(Expr.class);
SlotRef slot = Mockito.mock(SlotRef.class);
Mockito.doAnswer(inv -> {
@SuppressWarnings("unchecked")
List<SlotRef> list = (List<SlotRef>) inv.getArgument(1);
list.add(slot);
return null;
}).when(where).collect(Mockito.eq(SlotRef.class), Mockito.anyList());
Mockito.when(where.toSql()).thenReturn("k1 = 1");
Mockito.when(clause.getWhere()).thenReturn(where);
ConnectContext ctx = Mockito.mock(ConnectContext.class);
new MockUp<IcebergRewriteDataJob>() {
@Mock public void prepare() {}
@Mock public void execute() {}
};
IcebergAlterTableExecutor target = new IcebergAlterTableExecutor(new AlterTableStmt(
new TableName("db", "table"),
List.of(clause)),
table, icebergHiveCatalog,
ctx,
HDFS_ENVIRONMENT);
// --- act
target.rewriteDataFiles(clause, ctx);
}
@Test
void rewriteDataFiles_shouldWrapAndThrow_whenJobExecuteFails(@Mocked org.apache.iceberg.Table table,
@Mocked IcebergHiveCatalog icebergHiveCatalog) throws Exception {
// --- arrange
String catalog = "c1";
String db = "db";
String tbl = "table";
AlterTableStmt stmt = Mockito.mock(AlterTableStmt.class);
Mockito.when(stmt.getCatalogName()).thenReturn(catalog);
Mockito.when(stmt.getDbName()).thenReturn(db);
Mockito.when(stmt.getTableName()).thenReturn(tbl);
AlterTableOperationClause clause = Mockito.mock(AlterTableOperationClause.class);
Mockito.when(clause.isRewriteAll()).thenReturn(true);
Mockito.when(clause.getMinFileSizeBytes()).thenReturn(128L);
Mockito.when(clause.getBatchSize()).thenReturn(10L);
Expr where = Mockito.mock(Expr.class);
SlotRef slot = Mockito.mock(SlotRef.class);
Mockito.doAnswer(inv -> {
@SuppressWarnings("unchecked")
List<SlotRef> list = (List<SlotRef>) inv.getArgument(1);
list.add(slot);
return null;
}).when(where).collect(Mockito.eq(SlotRef.class), Mockito.anyList());
Mockito.when(where.toSql()).thenReturn("k1 = 1");
Mockito.when(clause.getWhere()).thenReturn(where);
ConnectContext ctx = Mockito.mock(ConnectContext.class);
new MockUp<IcebergRewriteDataJob>() {
@Mock public void prepare() {}
@Mock public void execute() { throw new RuntimeException("boom"); }
};
IcebergAlterTableExecutor target = new IcebergAlterTableExecutor(new AlterTableStmt(
new TableName("db", "table"),
List.of(clause)),
table, icebergHiveCatalog,
ctx,
HDFS_ENVIRONMENT);
// --- act + assert
StarRocksConnectorException ex =
Assertions.assertThrows(StarRocksConnectorException.class, () -> target.rewriteDataFiles(clause, ctx));
Assertions.assertTrue(ex.getMessage().contains("db.table"));
Assertions.assertTrue(ex.getMessage().contains("boom"));
}
}