[BugFix] Fix error base version in schema change job with lake rollup (backport #62046) (#62089)

Signed-off-by: sevev <qiangzh95@gmail.com>
Co-authored-by: zhangqiang <qiangzh95@gmail.com>
This commit is contained in:
mergify[bot] 2025-08-19 11:11:50 +00:00 committed by GitHub
parent 0df4fb0522
commit 5b41a92084
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 132 additions and 4 deletions

View File

@ -839,8 +839,8 @@ public class LakeTableSchemaChangeJob extends LakeTableSchemaChangeJobBase {
}
if (!isFileBundling) {
Utils.publishVersion(allOtherPartitionTablets, originTxnInfo, 1, commitVersion, computeResource,
isFileBundling);
Utils.publishVersion(allOtherPartitionTablets, originTxnInfo, commitVersion - 1, commitVersion,
computeResource, isFileBundling);
} else {
Utils.createSubRequestForAggregatePublish(allOtherPartitionTablets, Lists.newArrayList(originTxnInfo),
commitVersion - 1, commitVersion, null, computeResource, request);

View File

@ -404,7 +404,7 @@ public class LakeTableSchemaChangeJobTest {
}
@Test
public void testAlterTabletSuccessEnablePartitionAgg() throws Exception {
public void testAlterTabletSuccessEnableFileBundling() throws Exception {
new MockUp<LakeTableSchemaChangeJob>() {
@Mock
public void sendAgentTask(AgentBatchTask batchTask) {
@ -448,6 +448,51 @@ public class LakeTableSchemaChangeJobTest {
}
}
@Test
public void testAlterTabletSuccessDisableFileBundling() throws Exception {
new MockUp<LakeTableSchemaChangeJob>() {
@Mock
public void sendAgentTask(AgentBatchTask batchTask) {
batchTask.getAllTasks().forEach(t -> t.setFinished(true));
}
};
LakeTable table1 = createTable(connectContext,
"CREATE TABLE t1(c0 INT) duplicate key(c0) distributed by hash(c0) buckets 3 " +
"PROPERTIES('file_bundling'='false')");
Config.enable_fast_schema_evolution_in_share_data_mode = false;
alterTable(connectContext, "ALTER TABLE t1 ADD COLUMN c1 BIGINT AS c0 + 2");
LakeTableSchemaChangeJob schemaChangeJob1 = getAlterJob(table1);
schemaChangeJob1.runPendingJob();
Assertions.assertEquals(AlterJobV2.JobState.WAITING_TXN, schemaChangeJob1.getJobState());
schemaChangeJob1.runWaitingTxnJob();
Assertions.assertEquals(AlterJobV2.JobState.RUNNING, schemaChangeJob1.getJobState());
schemaChangeJob1.runRunningJob();
Assertions.assertEquals(AlterJobV2.JobState.FINISHED_REWRITING, schemaChangeJob1.getJobState());
Assertions.assertTrue(schemaChangeJob1.getFinishedTimeMs() > System.currentTimeMillis() - 10_000L);
Collection<Partition> partitions = table1.getPartitions();
Assertions.assertEquals(1, partitions.size());
Partition partition = partitions.stream().findFirst().orElse(null);
Assertions.assertNotNull(partition);
Assertions.assertEquals(3, partition.getDefaultPhysicalPartition().getNextVersion());
List<MaterializedIndex> shadowIndexes =
partition.getDefaultPhysicalPartition().getMaterializedIndices(MaterializedIndex.IndexExtState.SHADOW);
Assertions.assertEquals(1, shadowIndexes.size());
// Does not support cancel job in FINISHED_REWRITING state.
schemaChangeJob.cancel("test");
Assertions.assertEquals(AlterJobV2.JobState.FINISHED_REWRITING, schemaChangeJob1.getJobState());
while (schemaChangeJob1.getJobState() != AlterJobV2.JobState.FINISHED) {
schemaChangeJob1.runFinishedRewritingJob();
Thread.sleep(100);
}
}
@Test
public void testPublishVersion() throws AlterCancelException, InterruptedException {
new MockUp<LakeTableSchemaChangeJob>() {

View File

@ -49,4 +49,57 @@ SELECT v1 FROM rollup1 [_SYNC_MV_] ORDER BY v1;
-- result:
100
300
-- !result
-- name: test_lake_rollup2 @cloud
CREATE TABLE tbl2 (
k1 date,
k2 int,
v1 int sum
) PARTITION BY RANGE(k1) (
PARTITION p1 values [('2020-01-01'),('2020-02-01')),
PARTITION p2 values [('2020-02-01'),('2020-03-01')))
DISTRIBUTED BY HASH(k1) BUCKETS 3
PROPERTIES('replication_num' = '1', 'file_bundling' = 'false');
-- result:
-- !result
INSERT INTO tbl2
VALUES
("2020-01-12",4,100),
("2020-01-11",5,100),
("2020-01-11",4,100);
-- result:
-- !result
ALTER TABLE tbl2 ADD ROLLUP rollup2 (k1, v1) FROM tbl2;
-- result:
-- !result
function: wait_alter_table_finish("ROLLUP")
-- result:
None
-- !result
INSERT INTO tbl2 VALUES("2020-01-11",6,100);
-- result:
-- !result
SELECT v1 FROM rollup2 [_SYNC_MV_] ORDER BY v1;
-- result:
100
300
-- !result
ALTER TABLE tbl2 MODIFY COLUMN k2 BIGINT;
-- result:
-- !result
function: wait_alter_table_finish("COLUMN")
-- result:
None
-- !result
SELECT v1 FROM rollup2 [_SYNC_MV_] ORDER BY v1;
-- result:
100
300
-- !result

View File

@ -25,4 +25,34 @@ SELECT v1 FROM rollup1 [_SYNC_MV_] ORDER BY v1;
ALTER TABLE tbl1 MODIFY COLUMN k2 BIGINT;
function: wait_alter_table_finish("COLUMN")
SELECT v1 FROM rollup1 [_SYNC_MV_] ORDER BY v1;
SELECT v1 FROM rollup1 [_SYNC_MV_] ORDER BY v1;
drop table tbl1;
-- name: test_lake_rollup_2 @cloud
CREATE TABLE tbl2 (
k1 date,
k2 int,
v1 int sum
) PARTITION BY RANGE(k1) (
PARTITION p1 values [('2020-01-01'),('2020-02-01')),
PARTITION p2 values [('2020-02-01'),('2020-03-01')))
DISTRIBUTED BY HASH(k1) BUCKETS 3
PROPERTIES('replication_num' = '1', 'file_bundling' = 'false');
INSERT INTO tbl2
VALUES
("2020-01-12",4,100),
("2020-01-11",5,100),
("2020-01-11",4,100);
ALTER TABLE tbl2 ADD ROLLUP rollup2 (k1, v1) FROM tbl2;
function: wait_alter_table_finish("ROLLUP")
INSERT INTO tbl2 VALUES("2020-01-11",6,100);
SELECT v1 FROM rollup2 [_SYNC_MV_] ORDER BY v1;
ALTER TABLE tbl2 MODIFY COLUMN k2 BIGINT;
function: wait_alter_table_finish("COLUMN")
SELECT v1 FROM rollup2 [_SYNC_MV_] ORDER BY v1;