[BugFix] Fix bug where reserved words in iceberg partitions break toThrift (backport #63243) (#63476)
Co-authored-by: kyle-goodale-klaviyo <kyle.goodale@klaviyo.com>
This commit is contained in:
parent
418b332b49
commit
0f43bd91ae
|
|
@ -566,28 +566,29 @@ public class IcebergApiConverter {
|
|||
|
||||
public static String toPartitionField(PartitionSpec spec, PartitionField field, Boolean withTransfomPrefix) {
|
||||
String name = spec.schema().findColumnName(field.sourceId());
|
||||
String escapedName = "`" + name + "`";
|
||||
String transform = field.transform().toString();
|
||||
String prefix = withTransfomPrefix ? FeConstants.ICEBERG_TRANSFORM_EXPRESSION_PREFIX : "";
|
||||
|
||||
switch (transform) {
|
||||
case "identity":
|
||||
return name;
|
||||
return escapedName;
|
||||
case "year":
|
||||
case "month":
|
||||
case "day":
|
||||
case "hour":
|
||||
case "void":
|
||||
return prefix + format("%s(%s)", transform, name);
|
||||
return prefix + format("%s(%s)", transform, escapedName);
|
||||
}
|
||||
|
||||
Matcher matcher = ICEBERG_BUCKET_PATTERN.matcher(transform);
|
||||
if (matcher.matches()) {
|
||||
return prefix + format("bucket(%s, %s)", name, matcher.group(1));
|
||||
return prefix + format("bucket(%s, %s)", escapedName, matcher.group(1));
|
||||
}
|
||||
|
||||
matcher = ICEBERG_TRUNCATE_PATTERN.matcher(transform);
|
||||
if (matcher.matches()) {
|
||||
return prefix + format("truncate(%s, %s)", name, matcher.group(1));
|
||||
return prefix + format("truncate(%s, %s)", escapedName, matcher.group(1));
|
||||
}
|
||||
|
||||
throw new StarRocksConnectorException("Unsupported partition transform: " + field);
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import com.google.common.collect.Lists;
|
|||
import com.google.common.collect.Maps;
|
||||
import com.starrocks.analysis.BinaryType;
|
||||
import com.starrocks.analysis.ColumnPosition;
|
||||
import com.starrocks.analysis.DescriptorTable;
|
||||
import com.starrocks.analysis.IntLiteral;
|
||||
import com.starrocks.analysis.NullLiteral;
|
||||
import com.starrocks.analysis.TableName;
|
||||
|
|
@ -95,8 +96,10 @@ import com.starrocks.statistic.ExternalAnalyzeJob;
|
|||
import com.starrocks.statistic.StatsConstants;
|
||||
import com.starrocks.thrift.TIcebergColumnStats;
|
||||
import com.starrocks.thrift.TIcebergDataFile;
|
||||
import com.starrocks.thrift.TIcebergTable;
|
||||
import com.starrocks.thrift.TResultSinkType;
|
||||
import com.starrocks.thrift.TSinkCommitInfo;
|
||||
import com.starrocks.thrift.TTableDescriptor;
|
||||
import com.starrocks.utframe.StarRocksAssert;
|
||||
import com.starrocks.utframe.UtFrameUtils;
|
||||
import mockit.Expectations;
|
||||
|
|
@ -110,6 +113,7 @@ import org.apache.iceberg.DataFiles;
|
|||
import org.apache.iceberg.FileScanTask;
|
||||
import org.apache.iceberg.MetricsModes;
|
||||
import org.apache.iceberg.NullOrder;
|
||||
import org.apache.iceberg.PartitionSpec;
|
||||
import org.apache.iceberg.Schema;
|
||||
import org.apache.iceberg.Snapshot;
|
||||
import org.apache.iceberg.SortOrder;
|
||||
|
|
@ -123,6 +127,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
|
|||
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
|
||||
import org.apache.iceberg.hive.HiveCatalog;
|
||||
import org.apache.iceberg.hive.HiveTableOperations;
|
||||
import org.apache.iceberg.types.Types;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
|
@ -153,6 +158,7 @@ import static com.starrocks.connector.iceberg.IcebergMetadata.FILE_FORMAT;
|
|||
import static com.starrocks.connector.iceberg.IcebergMetadata.LOCATION_PROPERTY;
|
||||
import static com.starrocks.connector.iceberg.IcebergTableOperation.REMOVE_ORPHAN_FILES;
|
||||
import static com.starrocks.connector.iceberg.IcebergTableOperation.ROLLBACK_TO_SNAPSHOT;
|
||||
import static org.apache.iceberg.types.Types.NestedField.required;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
public class IcebergMetadataTest extends TableTestBase {
|
||||
|
|
@ -1490,6 +1496,57 @@ public class IcebergMetadataTest extends TableTestBase {
|
|||
Assertions.assertTrue(partitions.stream().anyMatch(x -> x.getModifiedTime() == -1));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testPartitionWithReservedName() {
|
||||
// Create a schema with a column named "partition" (which is a reserved word)
|
||||
Schema schemaWithPartitionColumn = new Schema(
|
||||
required(1, "id", Types.IntegerType.get()),
|
||||
required(2, "partition", Types.StringType.get())
|
||||
);
|
||||
|
||||
PartitionSpec specWithPartitionColumn = PartitionSpec.builderFor(schemaWithPartitionColumn)
|
||||
.identity("partition")
|
||||
.bucket("partition", 32)
|
||||
.truncate("partition", 32)
|
||||
.build();
|
||||
|
||||
TestTables.TestTable testTable = create(schemaWithPartitionColumn, specWithPartitionColumn, "test_partition_table", 1);
|
||||
|
||||
List<Column> columns = Lists.newArrayList(
|
||||
new Column("id", INT),
|
||||
new Column("partition", STRING)
|
||||
);
|
||||
|
||||
IcebergTable icebergTable = new IcebergTable(1, "srTableName", CATALOG_NAME, "resource_name", "db_name",
|
||||
"table_name", "", columns, testTable, Maps.newHashMap());
|
||||
|
||||
List<String> partitionColumnNames = icebergTable.getPartitionColumnNames();
|
||||
|
||||
Assertions.assertNotNull(partitionColumnNames);
|
||||
Assertions.assertEquals(3, partitionColumnNames.size());
|
||||
Assertions.assertEquals("partition", partitionColumnNames.get(0));
|
||||
Assertions.assertEquals("partition", partitionColumnNames.get(1));
|
||||
Assertions.assertEquals("partition", partitionColumnNames.get(2));
|
||||
|
||||
List<String> partitionColumnNamesWithTransform = icebergTable.getPartitionColumnNamesWithTransform();
|
||||
Assertions.assertNotNull(partitionColumnNamesWithTransform);
|
||||
Assertions.assertEquals(3, partitionColumnNamesWithTransform.size());
|
||||
Assertions.assertEquals("`partition`", partitionColumnNamesWithTransform.get(0));
|
||||
Assertions.assertEquals("bucket(`partition`, 32)", partitionColumnNamesWithTransform.get(1));
|
||||
Assertions.assertEquals("truncate(`partition`, 32)", partitionColumnNamesWithTransform.get(2));
|
||||
|
||||
// convert the icebergTable into a thrift value
|
||||
List<DescriptorTable.ReferencedPartitionInfo> partitions = Lists.newArrayList();
|
||||
TTableDescriptor tableDescriptor = icebergTable.toThrift(partitions);
|
||||
Assertions.assertNotNull(tableDescriptor);
|
||||
TIcebergTable tIcebergTable = tableDescriptor.icebergTable;
|
||||
Assertions.assertEquals(3, tIcebergTable.getPartition_column_names().size());
|
||||
Assertions.assertEquals("partition", tIcebergTable.getPartition_column_names().get(0));
|
||||
Assertions.assertEquals("partition", tIcebergTable.getPartition_column_names().get(1));
|
||||
Assertions.assertEquals("partition", tIcebergTable.getPartition_column_names().get(2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshTableException(@Mocked CachingIcebergCatalog icebergCatalog) {
|
||||
new Expectations() {
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ partition_transform_table CREATE TABLE `partition_transform_table` (
|
|||
`p1` varchar(1073741824) DEFAULT NULL,
|
||||
`p2` varchar(1073741824) DEFAULT NULL
|
||||
)
|
||||
PARTITION BY (year(t1), month(t2), day(t3), hour(t4), truncate(p1, 5), bucket(p2, 3))
|
||||
PARTITION BY (year(`t1`), month(`t2`), day(`t3`), hour(`t4`), truncate(`p1`, 5), bucket(`p2`, 3))
|
||||
PROPERTIES ("owner" = "root", "location" = "oss://starrocks-ci-test/iceberg_ci_db/partition_transform_table");
|
||||
-- !result
|
||||
drop catalog iceberg_sql_test_${uuid0};
|
||||
|
|
|
|||
Loading…
Reference in New Issue