Signed-off-by: Kevin Cai <kevin.cai@celerdata.com> Co-authored-by: Kevin Cai <caixiaohua@starrocks.com> Co-authored-by: Kevin Cai <kevin.cai@celerdata.com>
This commit is contained in:
parent
50e3b621c4
commit
e82a6491a5
|
|
@ -23,6 +23,7 @@ import com.starrocks.analysis.IsNullPredicate;
|
|||
import com.starrocks.analysis.LiteralExpr;
|
||||
import com.starrocks.analysis.Predicate;
|
||||
import com.starrocks.analysis.SlotRef;
|
||||
import com.starrocks.catalog.Column;
|
||||
import com.starrocks.catalog.Database;
|
||||
import com.starrocks.catalog.MaterializedIndex;
|
||||
import com.starrocks.catalog.Partition;
|
||||
|
|
@ -47,6 +48,7 @@ import com.starrocks.rpc.BrpcProxy;
|
|||
import com.starrocks.rpc.LakeService;
|
||||
import com.starrocks.server.GlobalStateMgr;
|
||||
import com.starrocks.sql.ast.DeleteStmt;
|
||||
import com.starrocks.sql.common.DmlException;
|
||||
import com.starrocks.system.ComputeNode;
|
||||
import com.starrocks.system.SystemInfoService;
|
||||
import com.starrocks.transaction.TabletCommitInfo;
|
||||
|
|
@ -102,7 +104,7 @@ public class LakeDeleteJob extends DeleteJob {
|
|||
|
||||
// create delete predicate
|
||||
List<Predicate> conditions = getDeleteConditions();
|
||||
DeletePredicatePB deletePredicate = createDeletePredicate(conditions);
|
||||
DeletePredicatePB deletePredicate = createDeletePredicate(table, conditions);
|
||||
|
||||
// send delete data request to BE
|
||||
try {
|
||||
|
|
@ -141,7 +143,15 @@ public class LakeDeleteJob extends DeleteJob {
|
|||
commit(db, getTimeoutMs());
|
||||
}
|
||||
|
||||
private DeletePredicatePB createDeletePredicate(List<Predicate> conditions) {
|
||||
private static String getTableColumnId(Table table, String columnName) throws DmlException {
|
||||
Column column = table.getColumn(columnName);
|
||||
if (column == null) {
|
||||
throw new DmlException(String.format("Cannot find column by name: %s", columnName));
|
||||
}
|
||||
return column.getColumnId().getId();
|
||||
}
|
||||
|
||||
private DeletePredicatePB createDeletePredicate(Table table, List<Predicate> conditions) {
|
||||
DeletePredicatePB deletePredicate = new DeletePredicatePB();
|
||||
deletePredicate.version = -1; // Required but unused
|
||||
deletePredicate.binaryPredicates = Lists.newArrayList();
|
||||
|
|
@ -151,20 +161,23 @@ public class LakeDeleteJob extends DeleteJob {
|
|||
if (condition instanceof BinaryPredicate) {
|
||||
BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
|
||||
BinaryPredicatePB binaryPredicatePB = new BinaryPredicatePB();
|
||||
binaryPredicatePB.columnName = ((SlotRef) binaryPredicate.getChild(0)).getColumnName();
|
||||
String columnName = ((SlotRef) binaryPredicate.getChild(0)).getColumnName();
|
||||
binaryPredicatePB.columnName = getTableColumnId(table, columnName);
|
||||
binaryPredicatePB.op = binaryPredicate.getOp().toString();
|
||||
binaryPredicatePB.value = ((LiteralExpr) binaryPredicate.getChild(1)).getStringValue();
|
||||
deletePredicate.binaryPredicates.add(binaryPredicatePB);
|
||||
} else if (condition instanceof IsNullPredicate) {
|
||||
IsNullPredicate isNullPredicate = (IsNullPredicate) condition;
|
||||
IsNullPredicatePB isNullPredicatePB = new IsNullPredicatePB();
|
||||
isNullPredicatePB.columnName = ((SlotRef) isNullPredicate.getChild(0)).getColumnName();
|
||||
String columnName = ((SlotRef) isNullPredicate.getChild(0)).getColumnName();
|
||||
isNullPredicatePB.columnName = getTableColumnId(table, columnName);
|
||||
isNullPredicatePB.isNotNull = isNullPredicate.isNotNull();
|
||||
deletePredicate.isNullPredicates.add(isNullPredicatePB);
|
||||
} else if (condition instanceof InPredicate) {
|
||||
InPredicate inPredicate = (InPredicate) condition;
|
||||
InPredicatePB inPredicatePB = new InPredicatePB();
|
||||
inPredicatePB.columnName = ((SlotRef) inPredicate.getChild(0)).getColumnName();
|
||||
String columnName = ((SlotRef) inPredicate.getChild(0)).getColumnName();
|
||||
inPredicatePB.columnName = getTableColumnId(table, columnName);
|
||||
inPredicatePB.isNotIn = inPredicate.isNotIn();
|
||||
inPredicatePB.values = Lists.newArrayList();
|
||||
for (int i = 1; i <= inPredicate.getInElementNum(); i++) {
|
||||
|
|
@ -212,4 +225,4 @@ public class LakeDeleteJob extends DeleteJob {
|
|||
protected List<TabletFailInfo> getTabletFailInfos() {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,152 @@
|
|||
// Copyright 2021-present StarRocks, Inc. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// https://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package com.starrocks.lake.delete;
|
||||
|
||||
import com.starrocks.analysis.TableName;
|
||||
import com.starrocks.catalog.Column;
|
||||
import com.starrocks.catalog.Table;
|
||||
import com.starrocks.common.DdlException;
|
||||
import com.starrocks.common.FeConstants;
|
||||
import com.starrocks.proto.BinaryPredicatePB;
|
||||
import com.starrocks.proto.DeleteDataRequest;
|
||||
import com.starrocks.proto.DeleteDataResponse;
|
||||
import com.starrocks.proto.DeletePredicatePB;
|
||||
import com.starrocks.proto.InPredicatePB;
|
||||
import com.starrocks.proto.IsNullPredicatePB;
|
||||
import com.starrocks.qe.ConnectContext;
|
||||
import com.starrocks.rpc.BrpcProxy;
|
||||
import com.starrocks.rpc.LakeService;
|
||||
import com.starrocks.rpc.LakeServiceWithMetrics;
|
||||
import com.starrocks.rpc.RpcException;
|
||||
import com.starrocks.server.GlobalStateMgr;
|
||||
import com.starrocks.server.RunMode;
|
||||
import com.starrocks.sql.ast.DeleteStmt;
|
||||
import com.starrocks.system.ComputeNode;
|
||||
import com.starrocks.system.SystemInfoService;
|
||||
import com.starrocks.utframe.StarRocksAssert;
|
||||
import com.starrocks.utframe.UtFrameUtils;
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import mockit.Mocked;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class LakeDeleteJobTest {
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeClass() {
|
||||
FeConstants.runningUnitTest = true;
|
||||
UtFrameUtils.createMinStarRocksCluster(true, RunMode.SHARED_DATA);
|
||||
GlobalStateMgr.getCurrentState().getTabletStatMgr().setStop();
|
||||
GlobalStateMgr.getCurrentState().getStarMgrMetaSyncer().setStop();
|
||||
SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
|
||||
for (ComputeNode node : systemInfoService.getBackends()) {
|
||||
node.setAlive(true);
|
||||
}
|
||||
for (ComputeNode node : systemInfoService.getComputeNodes()) {
|
||||
node.setAlive(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteConditionsColumnIdAfterRename(@Mocked LakeService lakeService) {
|
||||
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
|
||||
ctx.setThreadLocalInfo();
|
||||
StarRocksAssert starRocksAssert = new StarRocksAssert(ctx);
|
||||
TableName tblName = new TableName("test", "dup_table");
|
||||
String createTableSql = "CREATE TABLE " + tblName + " (colName String) DUPLICATE KEY(colName) " +
|
||||
"DISTRIBUTED BY HASH(colName) BUCKETS 1 " + "PROPERTIES('replication_num' = '1');";
|
||||
|
||||
// create test.dup_table
|
||||
Assertions.assertDoesNotThrow(() -> starRocksAssert.withDatabase(tblName.getDb()).withTable(createTableSql));
|
||||
// rename column colName to colNameNew
|
||||
Assertions.assertDoesNotThrow(
|
||||
() -> starRocksAssert.alterTable("ALTER TABLE " + tblName + " RENAME COLUMN colName TO colNameNew;"));
|
||||
|
||||
// A single DELETE statement to cover all the possible predicates, may not be reasonable in reality.
|
||||
// Expect the delete condition to use columnId instead of logical name
|
||||
String deleteSql =
|
||||
"DELETE FROM " + tblName + " WHERE colNameNew = 'a' AND colNameNew IS NOT NULL AND colNameNew IN ('a');";
|
||||
DeleteStmt deleteStmt;
|
||||
try {
|
||||
deleteStmt = (DeleteStmt) UtFrameUtils.parseStmtWithNewParser(deleteSql, ctx);
|
||||
} catch (Exception e) {
|
||||
Assertions.fail("Don't expect exception: " + e.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
Table table = starRocksAssert.getTable(tblName.getDb(), tblName.getTbl());
|
||||
Assertions.assertNotNull(table);
|
||||
Column col = table.getColumn("colNameNew");
|
||||
Assertions.assertNotNull(col);
|
||||
Assertions.assertEquals("colNameNew", col.getName());
|
||||
Assertions.assertEquals("colName", col.getColumnId().getId());
|
||||
|
||||
// Simulate one tablet failed, our purpose is to verify the request only.
|
||||
// return a failed response to fail the job so that the following commit() will be skipped.
|
||||
DeleteDataResponse response = new DeleteDataResponse();
|
||||
response.failedTablets = List.of(1002L);
|
||||
|
||||
AtomicReference<DeleteDataRequest> capturedRequest = new AtomicReference<>();
|
||||
LakeServiceWithMetrics wrappedLakeService = new LakeServiceWithMetrics(lakeService);
|
||||
// Avoid directly mocking the LakeService, which is implemented by BrpcProxy reflection proxy.
|
||||
new MockUp<LakeServiceWithMetrics>() {
|
||||
@Mock
|
||||
Future<DeleteDataResponse> deleteData(DeleteDataRequest request) {
|
||||
capturedRequest.set(request);
|
||||
return CompletableFuture.completedFuture(response);
|
||||
}
|
||||
};
|
||||
|
||||
new MockUp<BrpcProxy>() {
|
||||
@Mock
|
||||
public LakeService getLakeService(String host, int port) throws RpcException {
|
||||
return wrappedLakeService;
|
||||
}
|
||||
};
|
||||
DdlException exception = Assertions.assertThrows(DdlException.class,
|
||||
() -> GlobalStateMgr.getCurrentState().getDeleteMgr().process(deleteStmt));
|
||||
Assertions.assertEquals("Failed to execute delete. failed tablet num: 1", exception.getMessage());
|
||||
|
||||
Assertions.assertNotNull(capturedRequest.get());
|
||||
DeletePredicatePB deletePredicate = capturedRequest.get().getDeletePredicate();
|
||||
|
||||
// colNameNew = 'a'
|
||||
Assertions.assertEquals(1, deletePredicate.getBinaryPredicates().size());
|
||||
BinaryPredicatePB binaryPredicatePB = deletePredicate.getBinaryPredicates().get(0);
|
||||
Assertions.assertEquals("colName", binaryPredicatePB.getColumnName());
|
||||
Assertions.assertEquals("=", binaryPredicatePB.getOp());
|
||||
Assertions.assertEquals("a", binaryPredicatePB.getValue());
|
||||
|
||||
// colNameNew IS NOT NULL
|
||||
Assertions.assertEquals(1, deletePredicate.getIsNullPredicates().size());
|
||||
IsNullPredicatePB isNullPredicatePB = deletePredicate.getIsNullPredicates().get(0);
|
||||
Assertions.assertEquals("colName", isNullPredicatePB.getColumnName());
|
||||
Assertions.assertTrue(isNullPredicatePB.isIsNotNull());
|
||||
|
||||
// colNameNew IN ('a')
|
||||
Assertions.assertEquals(1, deletePredicate.getInPredicates().size());
|
||||
InPredicatePB inPredicatePB = deletePredicate.getInPredicates().get(0);
|
||||
Assertions.assertEquals("colName", inPredicatePB.getColumnName());
|
||||
Assertions.assertFalse(inPredicatePB.isIsNotIn());
|
||||
Assertions.assertEquals(List.of("a"), inPredicatePB.getValues());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,47 @@
|
|||
-- name: test_delete_dupkey_rename
|
||||
CREATE TABLE `test_delete_dup_rename` (
|
||||
`name1` varchar(255)
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`name1`)
|
||||
DISTRIBUTED BY HASH(`name1`) BUCKETS 2
|
||||
PROPERTIES (
|
||||
"replication_num" = "1"
|
||||
);
|
||||
-- result:
|
||||
-- !result
|
||||
INSERT INTO test_delete_dup_rename VALUES ("mon"), ("tue");
|
||||
-- result:
|
||||
-- !result
|
||||
SELECT name1 FROM test_delete_dup_rename;
|
||||
-- result:
|
||||
mon
|
||||
tue
|
||||
-- !result
|
||||
DELETE FROM test_delete_dup_rename where name1 = "mon";
|
||||
-- result:
|
||||
-- !result
|
||||
SELECT name1 from test_delete_dup_rename;
|
||||
-- result:
|
||||
tue
|
||||
-- !result
|
||||
ALTER TABLE test_delete_dup_rename RENAME COLUMN name1 TO name2;
|
||||
-- result:
|
||||
-- !result
|
||||
INSERT INTO test_delete_dup_rename VALUES ("wed"), ("thu");
|
||||
-- result:
|
||||
-- !result
|
||||
DELETE FROM test_delete_dup_rename WHERE name2 = "tue";
|
||||
-- result:
|
||||
-- !result
|
||||
SELECT * FROM test_delete_dup_rename;
|
||||
-- result:
|
||||
thu
|
||||
wed
|
||||
-- !result
|
||||
DELETE FROM test_delete_dup_rename WHERE name2 = "wed";
|
||||
-- result:
|
||||
-- !result
|
||||
SELECT * FROM test_delete_dup_rename;
|
||||
-- result:
|
||||
thu
|
||||
-- !result
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
-- name: test_delete_dupkey_rename
|
||||
CREATE TABLE `test_delete_dup_rename` (
|
||||
`name1` varchar(255)
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`name1`)
|
||||
DISTRIBUTED BY HASH(`name1`) BUCKETS 2
|
||||
PROPERTIES (
|
||||
"replication_num" = "1"
|
||||
);
|
||||
|
||||
INSERT INTO test_delete_dup_rename VALUES ("mon"), ("tue");
|
||||
SELECT name1 FROM test_delete_dup_rename;
|
||||
DELETE FROM test_delete_dup_rename where name1 = "mon";
|
||||
SELECT name1 from test_delete_dup_rename;
|
||||
ALTER TABLE test_delete_dup_rename RENAME COLUMN name1 TO name2;
|
||||
INSERT INTO test_delete_dup_rename VALUES ("wed"), ("thu");
|
||||
DELETE FROM test_delete_dup_rename WHERE name2 = "tue";
|
||||
SELECT * FROM test_delete_dup_rename;
|
||||
DELETE FROM test_delete_dup_rename WHERE name2 = "wed";
|
||||
SELECT * FROM test_delete_dup_rename;
|
||||
Loading…
Reference in New Issue