[BugFix] fix iceberg read null partition bug (backport #62934) (#63040)

Signed-off-by: SevenJ <wenjun7j@gmail.com>
Co-authored-by: SevenJ <166966490+Wenjun7J@users.noreply.github.com>
This commit is contained in:
mergify[bot] 2025-09-24 09:55:35 +08:00 committed by GitHub
parent b491a3f7c0
commit ee1a9a5df9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 103 additions and 10 deletions

View File

@ -82,7 +82,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import java.util.stream.Collectors;
import static com.starrocks.connector.iceberg.IcebergApiConverter.PARTITION_NULL_VALUE;
import static org.apache.hadoop.hive.common.FileUtils.escapePathName;
import static org.apache.hadoop.hive.common.FileUtils.unescapePathName;
@ -815,11 +814,14 @@ public class PartitionUtil {
}
Class<?> clazz = spec.javaClasses()[i];
String value = partitionField.transform().toHumanString(getPartitionValue(partitionData, i, clazz));
String value = null;
if (getPartitionValue(partitionData, i, clazz) != null) {
value = partitionField.transform().toHumanString(getPartitionValue(partitionData, i, clazz));
}
// currently starrocks date literal only support local datetime
org.apache.iceberg.types.Type icebergType = spec.schema().findType(partitionField.sourceId());
if (!value.equals(PARTITION_NULL_VALUE) && partitionField.transform().isIdentity() &&
if (value != null && partitionField.transform().isIdentity() &&
icebergType.equals(Types.TimestampType.withZone())) {
value = ChronoUnit.MICROS.addTo(Instant.ofEpochSecond(0).atZone(TimeUtils.getTimeZone().toZoneId()),
getPartitionValue(partitionData, i, clazz)).toLocalDateTime().toString();

View File

@ -471,7 +471,7 @@ public class IcebergConnectorScanRangeSource extends ConnectorScanRangeSource {
Class<?> javaClass = type.typeId().javaClass();
String partitionValue;
boolean partitionValueIsNull = (PartitionUtil.getPartitionValue(partition, index, javaClass) == null);
// currently starrocks date literal only support local datetime
if (type.equals(Types.TimestampType.withZone())) {
Long value = PartitionUtil.getPartitionValue(partition, index, javaClass);
@ -485,9 +485,11 @@ public class IcebergConnectorScanRangeSource extends ConnectorScanRangeSource {
partitionValue = field.transform().toHumanString(type, PartitionUtil.getPartitionValue(
partition, index, javaClass));
}
partitionValues.add(partitionValue);
if (!partitionValueIsNull) {
partitionValues.add(partitionValue);
} else {
partitionValues.add(null);
}
cols.add(table.getColumn(field.name()));
});

View File

@ -143,7 +143,22 @@ public class IcebergPartitionTraits extends DefaultTraits {
@Override
public PartitionKey createPartitionKeyWithType(List<String> values, List<Type> types) throws AnalysisException {
PartitionKey partitionKey = super.createPartitionKeyWithType(values, types);
Preconditions.checkState(values.size() == types.size(),
"columns size is %s, but values size is %s", types.size(), values.size());
PartitionKey partitionKey = createEmptyKey();
for (int i = 0; i < values.size(); i++) {
String rawValue = values.get(i);
Type type = types.get(i);
LiteralExpr exprValue;
if (rawValue == null) {
exprValue = NullLiteral.create(type);
} else {
exprValue = LiteralExpr.create(rawValue, type);
}
partitionKey.pushColumn(exprValue, type.getPrimitiveType());
}
for (int i = 0; i < types.size(); i++) {
LiteralExpr exprValue = partitionKey.getKeys().get(i);
if (exprValue.getType().isDecimalV3()) {

View File

@ -36,7 +36,6 @@ import com.starrocks.common.StarRocksException;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.hive.HiveMetaClient;
import com.starrocks.connector.hive.HivePartitionName;
import com.starrocks.connector.iceberg.IcebergApiConverter;
import com.starrocks.server.MetadataMgr;
import mockit.Expectations;
import mockit.Mock;
@ -107,7 +106,7 @@ public class PartitionUtilTest {
@Test
public void testCreateIcebergPartitionKey() throws AnalysisException {
PartitionKey partitionKey = createPartitionKey(
Lists.newArrayList("1", "a", "3.0", IcebergApiConverter.PARTITION_NULL_VALUE), partColumns,
Lists.newArrayList("1", "a", "3.0", null), partColumns,
Table.TableType.ICEBERG);
Assertions.assertEquals("(\"1\", \"a\", \"3.0\", \"NULL\")", partitionKey.toSql());
}

View File

@ -0,0 +1,48 @@
-- name: test_iceberg_null_partition
create external catalog iceberg_sql_test_${uuid0} PROPERTIES (
"type" = "iceberg",
"iceberg.catalog.type" = "hive",
"iceberg.catalog.hive.metastore.uris"="${hive_metastore_uris}",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}",
"enable_iceberg_metadata_cache" = "false"
);
-- result:
-- !result
create database iceberg_sql_test_${uuid0}.iceberg_db_${uuid0} properties (
"location" = "oss://${oss_bucket}/iceberg_db_${uuid0}/test_iceberg_partition_transform_sink/${uuid0}"
);
-- result:
-- !result
create external table iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} (k1 int, k2 string) partition by (k2);
-- result:
-- !result
insert into iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} select 1, NULL;
-- result:
-- !result
insert into iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} select 2, "null";
-- result:
-- !result
select * from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} order by k1;
-- result:
1 None
2 null
-- !result
select * from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} where k2 is null;
-- result:
1 None
-- !result
select * from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} where k2 = "null";
-- result:
2 null
-- !result
drop table iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} force;
-- result:
-- !result
drop database iceberg_sql_test_${uuid0}.iceberg_db_${uuid0};
-- result:
-- !result
drop catalog iceberg_sql_test_${uuid0};
-- result:
-- !result

View File

@ -0,0 +1,27 @@
-- name: test_iceberg_null_partition
create external catalog iceberg_sql_test_${uuid0} PROPERTIES (
"type" = "iceberg",
"iceberg.catalog.type" = "hive",
"iceberg.catalog.hive.metastore.uris"="${hive_metastore_uris}",
"aws.s3.access_key" = "${oss_ak}",
"aws.s3.secret_key" = "${oss_sk}",
"aws.s3.endpoint" = "${oss_endpoint}",
"enable_iceberg_metadata_cache" = "false"
);
create database iceberg_sql_test_${uuid0}.iceberg_db_${uuid0} properties (
"location" = "oss://${oss_bucket}/iceberg_db_${uuid0}/test_iceberg_partition_transform_sink/${uuid0}"
);
create external table iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} (k1 int, k2 string) partition by (k2);
insert into iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} select 1, NULL;
insert into iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} select 2, "null";
select * from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} order by k1;
select * from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} where k2 is null;
select * from iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} where k2 = "null";
drop table iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} force;
drop database iceberg_sql_test_${uuid0}.iceberg_db_${uuid0};
drop catalog iceberg_sql_test_${uuid0};