[BugFix] fix duplicate key table delete issue in shared-data mode (#63296)

Signed-off-by: Kevin Cai <kevin.cai@celerdata.com>
This commit is contained in:
Kevin Cai 2025-09-22 09:29:03 +08:00 committed by GitHub
parent da9c0e84ed
commit bfb83e7e9c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 238 additions and 6 deletions

View File

@ -17,6 +17,7 @@ package com.starrocks.lake.delete;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.Partition;
@ -47,6 +48,7 @@ import com.starrocks.sql.ast.expression.IsNullPredicate;
import com.starrocks.sql.ast.expression.LiteralExpr;
import com.starrocks.sql.ast.expression.Predicate;
import com.starrocks.sql.ast.expression.SlotRef;
import com.starrocks.sql.common.DmlException;
import com.starrocks.system.ComputeNode;
import com.starrocks.system.SystemInfoService;
import com.starrocks.transaction.TabletCommitInfo;
@ -102,7 +104,7 @@ public class LakeDeleteJob extends DeleteJob {
// create delete predicate
List<Predicate> conditions = getDeleteConditions();
DeletePredicatePB deletePredicate = createDeletePredicate(conditions);
DeletePredicatePB deletePredicate = createDeletePredicate(table, conditions);
// send delete data request to BE
try {
@ -141,7 +143,15 @@ public class LakeDeleteJob extends DeleteJob {
commit(db, getTimeoutMs());
}
private DeletePredicatePB createDeletePredicate(List<Predicate> conditions) {
private static String getTableColumnId(Table table, String columnName) throws DmlException {
Column column = table.getColumn(columnName);
if (column == null) {
throw new DmlException(String.format("Cannot find column by name: %s", columnName));
}
return column.getColumnId().getId();
}
private DeletePredicatePB createDeletePredicate(Table table, List<Predicate> conditions) {
DeletePredicatePB deletePredicate = new DeletePredicatePB();
deletePredicate.version = -1; // Required but unused
deletePredicate.binaryPredicates = Lists.newArrayList();
@ -151,20 +161,23 @@ public class LakeDeleteJob extends DeleteJob {
if (condition instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
BinaryPredicatePB binaryPredicatePB = new BinaryPredicatePB();
binaryPredicatePB.columnName = ((SlotRef) binaryPredicate.getChild(0)).getColumnName();
String columnName = ((SlotRef) binaryPredicate.getChild(0)).getColumnName();
binaryPredicatePB.columnName = getTableColumnId(table, columnName);
binaryPredicatePB.op = binaryPredicate.getOp().toString();
binaryPredicatePB.value = ((LiteralExpr) binaryPredicate.getChild(1)).getStringValue();
deletePredicate.binaryPredicates.add(binaryPredicatePB);
} else if (condition instanceof IsNullPredicate) {
IsNullPredicate isNullPredicate = (IsNullPredicate) condition;
IsNullPredicatePB isNullPredicatePB = new IsNullPredicatePB();
isNullPredicatePB.columnName = ((SlotRef) isNullPredicate.getChild(0)).getColumnName();
String columnName = ((SlotRef) isNullPredicate.getChild(0)).getColumnName();
isNullPredicatePB.columnName = getTableColumnId(table, columnName);
isNullPredicatePB.isNotNull = isNullPredicate.isNotNull();
deletePredicate.isNullPredicates.add(isNullPredicatePB);
} else if (condition instanceof InPredicate) {
InPredicate inPredicate = (InPredicate) condition;
InPredicatePB inPredicatePB = new InPredicatePB();
inPredicatePB.columnName = ((SlotRef) inPredicate.getChild(0)).getColumnName();
String columnName = ((SlotRef) inPredicate.getChild(0)).getColumnName();
inPredicatePB.columnName = getTableColumnId(table, columnName);
inPredicatePB.isNotIn = inPredicate.isNotIn();
inPredicatePB.values = Lists.newArrayList();
for (int i = 1; i <= inPredicate.getInElementNum(); i++) {
@ -212,4 +225,4 @@ public class LakeDeleteJob extends DeleteJob {
protected List<TabletFailInfo> getTabletFailInfos() {
return Collections.emptyList();
}
}
}

View File

@ -0,0 +1,152 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.lake.delete;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Table;
import com.starrocks.common.DdlException;
import com.starrocks.common.FeConstants;
import com.starrocks.proto.BinaryPredicatePB;
import com.starrocks.proto.DeleteDataRequest;
import com.starrocks.proto.DeleteDataResponse;
import com.starrocks.proto.DeletePredicatePB;
import com.starrocks.proto.InPredicatePB;
import com.starrocks.proto.IsNullPredicatePB;
import com.starrocks.qe.ConnectContext;
import com.starrocks.rpc.BrpcProxy;
import com.starrocks.rpc.LakeService;
import com.starrocks.rpc.LakeServiceWithMetrics;
import com.starrocks.rpc.RpcException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.sql.ast.DeleteStmt;
import com.starrocks.sql.ast.expression.TableName;
import com.starrocks.system.ComputeNode;
import com.starrocks.system.SystemInfoService;
import com.starrocks.utframe.StarRocksAssert;
import com.starrocks.utframe.UtFrameUtils;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
public class LakeDeleteJobTest {
@BeforeAll
public static void beforeClass() {
FeConstants.runningUnitTest = true;
UtFrameUtils.createMinStarRocksCluster(true, RunMode.SHARED_DATA);
GlobalStateMgr.getCurrentState().getTabletStatMgr().setStop();
GlobalStateMgr.getCurrentState().getStarMgrMetaSyncer().setStop();
SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
for (ComputeNode node : systemInfoService.getBackends()) {
node.setAlive(true);
}
for (ComputeNode node : systemInfoService.getComputeNodes()) {
node.setAlive(true);
}
}
@Test
public void testDeleteConditionsColumnIdAfterRename(@Mocked LakeService lakeService) {
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
ctx.setThreadLocalInfo();
StarRocksAssert starRocksAssert = new StarRocksAssert(ctx);
TableName tblName = new TableName("test", "dup_table");
String createTableSql = "CREATE TABLE " + tblName + " (colName String) DUPLICATE KEY(colName) " +
"DISTRIBUTED BY HASH(colName) BUCKETS 1 " + "PROPERTIES('replication_num' = '1');";
// create test.dup_table
Assertions.assertDoesNotThrow(() -> starRocksAssert.withDatabase(tblName.getDb()).withTable(createTableSql));
// rename column colName to colNameNew
Assertions.assertDoesNotThrow(
() -> starRocksAssert.alterTable("ALTER TABLE " + tblName + " RENAME COLUMN colName TO colNameNew;"));
// A single DELETE statement to cover all the possible predicates, may not be reasonable in reality.
// Expect the delete condition to use columnId instead of logical name
String deleteSql =
"DELETE FROM " + tblName + " WHERE colNameNew = 'a' AND colNameNew IS NOT NULL AND colNameNew IN ('a');";
DeleteStmt deleteStmt;
try {
deleteStmt = (DeleteStmt) UtFrameUtils.parseStmtWithNewParser(deleteSql, ctx);
} catch (Exception e) {
Assertions.fail("Don't expect exception: " + e.getMessage());
return;
}
Table table = starRocksAssert.getTable(tblName.getDb(), tblName.getTbl());
Assertions.assertNotNull(table);
Column col = table.getColumn("colNameNew");
Assertions.assertNotNull(col);
Assertions.assertEquals("colNameNew", col.getName());
Assertions.assertEquals("colName", col.getColumnId().getId());
// Simulate one tablet failed, our purpose is to verify the request only.
// return a failed response to fail the job so that the following commit() will be skipped.
DeleteDataResponse response = new DeleteDataResponse();
response.failedTablets = List.of(1002L);
AtomicReference<DeleteDataRequest> capturedRequest = new AtomicReference<>();
LakeServiceWithMetrics wrappedLakeService = new LakeServiceWithMetrics(lakeService);
// Avoid directly mocking the LakeService, which is implemented by BrpcProxy reflection proxy.
new MockUp<LakeServiceWithMetrics>() {
@Mock
Future<DeleteDataResponse> deleteData(DeleteDataRequest request) {
capturedRequest.set(request);
return CompletableFuture.completedFuture(response);
}
};
new MockUp<BrpcProxy>() {
@Mock
public LakeService getLakeService(String host, int port) throws RpcException {
return wrappedLakeService;
}
};
DdlException exception = Assertions.assertThrows(DdlException.class,
() -> GlobalStateMgr.getCurrentState().getDeleteMgr().process(deleteStmt));
Assertions.assertEquals("Failed to execute delete. failed tablet num: 1", exception.getMessage());
Assertions.assertNotNull(capturedRequest.get());
DeletePredicatePB deletePredicate = capturedRequest.get().getDeletePredicate();
// colNameNew = 'a'
Assertions.assertEquals(1, deletePredicate.getBinaryPredicates().size());
BinaryPredicatePB binaryPredicatePB = deletePredicate.getBinaryPredicates().get(0);
Assertions.assertEquals("colName", binaryPredicatePB.getColumnName());
Assertions.assertEquals("=", binaryPredicatePB.getOp());
Assertions.assertEquals("a", binaryPredicatePB.getValue());
// colNameNew IS NOT NULL
Assertions.assertEquals(1, deletePredicate.getIsNullPredicates().size());
IsNullPredicatePB isNullPredicatePB = deletePredicate.getIsNullPredicates().get(0);
Assertions.assertEquals("colName", isNullPredicatePB.getColumnName());
Assertions.assertTrue(isNullPredicatePB.isIsNotNull());
// colNameNew IN ('a')
Assertions.assertEquals(1, deletePredicate.getInPredicates().size());
InPredicatePB inPredicatePB = deletePredicate.getInPredicates().get(0);
Assertions.assertEquals("colName", inPredicatePB.getColumnName());
Assertions.assertFalse(inPredicatePB.isIsNotIn());
Assertions.assertEquals(List.of("a"), inPredicatePB.getValues());
}
}

View File

@ -0,0 +1,47 @@
-- name: test_delete_dupkey_rename
CREATE TABLE `test_delete_dup_rename` (
`name1` varchar(255)
) ENGINE=OLAP
DUPLICATE KEY(`name1`)
DISTRIBUTED BY HASH(`name1`) BUCKETS 2
PROPERTIES (
"replication_num" = "1"
);
-- result:
-- !result
INSERT INTO test_delete_dup_rename VALUES ("mon"), ("tue");
-- result:
-- !result
SELECT name1 FROM test_delete_dup_rename;
-- result:
mon
tue
-- !result
DELETE FROM test_delete_dup_rename where name1 = "mon";
-- result:
-- !result
SELECT name1 from test_delete_dup_rename;
-- result:
tue
-- !result
ALTER TABLE test_delete_dup_rename RENAME COLUMN name1 TO name2;
-- result:
-- !result
INSERT INTO test_delete_dup_rename VALUES ("wed"), ("thu");
-- result:
-- !result
DELETE FROM test_delete_dup_rename WHERE name2 = "tue";
-- result:
-- !result
SELECT * FROM test_delete_dup_rename;
-- result:
thu
wed
-- !result
DELETE FROM test_delete_dup_rename WHERE name2 = "wed";
-- result:
-- !result
SELECT * FROM test_delete_dup_rename;
-- result:
thu
-- !result

View File

@ -0,0 +1,20 @@
-- name: test_delete_dupkey_rename
CREATE TABLE `test_delete_dup_rename` (
`name1` varchar(255)
) ENGINE=OLAP
DUPLICATE KEY(`name1`)
DISTRIBUTED BY HASH(`name1`) BUCKETS 2
PROPERTIES (
"replication_num" = "1"
);
INSERT INTO test_delete_dup_rename VALUES ("mon"), ("tue");
SELECT name1 FROM test_delete_dup_rename;
DELETE FROM test_delete_dup_rename where name1 = "mon";
SELECT name1 from test_delete_dup_rename;
ALTER TABLE test_delete_dup_rename RENAME COLUMN name1 TO name2;
INSERT INTO test_delete_dup_rename VALUES ("wed"), ("thu");
DELETE FROM test_delete_dup_rename WHERE name2 = "tue";
SELECT * FROM test_delete_dup_rename;
DELETE FROM test_delete_dup_rename WHERE name2 = "wed";
SELECT * FROM test_delete_dup_rename;