[Enhancement] Delta lake support timestamp_ntz type (#47534)

Signed-off-by: trueeyu <lxhhust350@qq.com>
This commit is contained in:
trueeyu 2024-06-26 23:22:29 +08:00 committed by GitHub
parent af2037ea89
commit 892b03d0cd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 135 additions and 51 deletions

View File

@ -462,6 +462,7 @@ public class ColumnTypeConverter {
primitiveType = PrimitiveType.DATE;
break;
case TIMESTAMP:
case TIMESTAMP_NTZ:
primitiveType = PrimitiveType.DATETIME;
break;
case STRING:

View File

@ -63,12 +63,6 @@ public class DeltaUtils {
ErrorReport.reportValidateException(ErrorCode.ERR_BAD_TABLE_ERROR, ErrorType.UNSUPPORTED,
"Delta table feature [column mapping] is not supported");
}
// check timestampNtz type
if (protocol.getReaderFeatures().contains("timestampNtz")) {
LOG.error("Delta table feature timestampNtz is not supported");
ErrorReport.reportValidateException(ErrorCode.ERR_BAD_TABLE_ERROR, ErrorType.UNSUPPORTED,
"Delta table feature [timestampNtz] is not supported");
}
}
public static DeltaLakeTable convertDeltaToSRTable(String catalog, String dbName, String tblName, String path,

View File

@ -36,7 +36,6 @@ import io.delta.kernel.expressions.Literal;
import io.delta.kernel.expressions.Or;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import java.time.LocalDateTime;
import java.time.ZoneId;
@ -100,19 +99,9 @@ public class ScalarOperationToDeltaLakeExpr {
private static DeltaDataType getColumnType(String qualifiedName, DeltaLakeContext context) {
//TODO: nested type
StructType structType = context.getSchema();
StructField field = structType.get(qualifiedName);
if (field != null) {
DeltaDataType type = DeltaDataType.instanceFrom(field.getDataType().getClass());
// Note: A timestamp value in a partition value doesn't store the time zone due to historical reasons.
// It means its behavior looks similar to timestamp without time zone when it is used
// in a partition column.
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization
if (type == DeltaDataType.TIMESTAMP && context.isPartitionColumn(qualifiedName)) {
return DeltaDataType.TIMESTAMP_NTZ;
} else {
return type;
}
StructField field = context.getSchema().get(qualifiedName);
if (field != null && field.getDataType() != null) {
return DeltaDataType.instanceFrom(field.getDataType().getClass());
} else {
return DeltaDataType.OTHER;
}
@ -166,7 +155,7 @@ public class ScalarOperationToDeltaLakeExpr {
Column column = context.getColumn(columnName);
DeltaDataType resultType = getResultType(columnName, context);
Literal literal = getLiteral(operator.getChild(1), resultType);
Literal literal = getLiteral(operator.getChild(1), resultType, context.isPartitionColumn(columnName));
if (literal == null) {
return null;
@ -195,16 +184,23 @@ public class ScalarOperationToDeltaLakeExpr {
return null;
}
private static Literal getLiteral(ScalarOperator operator, DeltaDataType deltaDataType) {
private static Literal getLiteral(ScalarOperator operator, DeltaDataType deltaDataType,
boolean isPartitionCol) {
if (operator == null) {
return null;
}
return operator.accept(new ExtractLiteralValue(), deltaDataType);
return operator.accept(new ExtractLiteralValue(isPartitionCol), deltaDataType);
}
}
private static class ExtractLiteralValue extends ScalarOperatorVisitor<Literal, DeltaDataType> {
private final boolean isPartitionColumn;
public ExtractLiteralValue(boolean isPartitionColumn) {
this.isPartitionColumn = isPartitionColumn;
}
private boolean needCast(PrimitiveType srcType, DeltaDataType destType) {
switch (srcType) {
case BOOLEAN:
@ -315,17 +311,24 @@ public class ScalarOperationToDeltaLakeExpr {
case DATE:
return Literal.ofDate((int) operator.getDate().toLocalDate().toEpochDay());
case DATETIME:
LocalDateTime localDateTime = operator.getDatetime();
ZoneId zoneOffset = ZoneOffset.UTC;
if (type == DeltaDataType.TIMESTAMP) {
ZoneId zoneId = TimeUtils.getTimeZone().toZoneId();
LocalDateTime localDateTime = operator.getDatetime();
long value = localDateTime.atZone(zoneId).toEpochSecond() * 1000 * 1000
if (!isPartitionColumn) {
// Note: A timestamp value in a partition value doesn't store
// the time zone due to historical reasons.
// It means its behavior looks similar to timestamp without time zone when it is used
// in a partition column.
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization
zoneOffset = TimeUtils.getTimeZone().toZoneId();
}
long value = localDateTime.atZone(zoneOffset).toEpochSecond() * 1000 * 1000
+ localDateTime.getNano() / 1000;
return Literal.ofTimestamp(value);
} else {
LocalDateTime localDateTime = operator.getDatetime();
long value = localDateTime.atZone(ZoneOffset.UTC).toEpochSecond() * 1000 * 1000
long value = localDateTime.atZone(zoneOffset).toEpochSecond() * 1000 * 1000
+ localDateTime.getNano() / 1000;
return Literal.ofTimestamp(value);
return Literal.ofTimestampNtz(value);
}
case CHAR:
case VARCHAR:

View File

@ -71,28 +71,6 @@ public class DeltaUtilsTest {
Lists.newArrayList()), metadata);
}
@Test
public void testCheckTableFeatureSupported3(@Mocked Metadata metadata, @Mocked Protocol protocol) {
expectedEx.expect(ValidateException.class);
expectedEx.expectMessage("Delta table feature [timestampNtz] is not supported");
new Expectations() {
{
metadata.getConfiguration();
result = ImmutableMap.of(COLUMN_MAPPING_MODE_KEY, "none");
minTimes = 0;
}
{
protocol.getReaderFeatures();
result = Lists.newArrayList("timestampNtz");
minTimes = 0;
}
};
DeltaUtils.checkTableFeatureSupported(new Protocol(3, 7, Lists.newArrayList(),
Lists.newArrayList()), metadata);
}
@Test
public void testConvertDeltaToSRTableWithException1() {
expectedEx.expect(SemanticException.class);

View File

@ -8,6 +8,9 @@ create external catalog delta_test_${uuid0} PROPERTIES (
);
-- result:
-- !result
update information_schema.be_configs set value="512" where name="primary_key_limit_size";
---
-- !result
analyze table delta_test_${uuid0}.delta_oss_db.delta_lake_data_type;
-- result:
[REGEX].*analyze status OK
@ -33,6 +36,26 @@ col_struct 8 288 0
col_timestamp 8 64 6 2024-04-29 12:00:00 2024-04-24 12:00:00
col_tinyint 8 8 6 6 1
-- !result
analyze table delta_test_${uuid0}.delta_oss_db.t_timestamp_ntz;
-- result:
[REGEX].*analyze status OK
-- !result
select column_name, row_count, data_size, hll_cardinality(ndv), `max`, `min` from default_catalog._statistics_.external_column_statistics
where catalog_name="delta_test_${uuid0}" and db_name="delta_oss_db" and table_name="t_timestamp_ntz"
order by column_name;
-- result:
col_int 5 20 5 5 1
col_timestamp_ntz 5 40 4 2024-01-05 04:05:06 2024-01-02 01:02:03
-- !result
analyze table delta_test_${uuid0}.delta_oss_db.t_partition_timestamp_ntz;
-- result:
[REGEX].*analyze status OK
-- !result
select column_name, row_count, data_size, hll_cardinality(ndv), `max`, `min` from default_catalog._statistics_.external_column_statistics
where catalog_name="delta_test_${uuid0}" and db_name="delta_oss_db" and table_name="t_partition_timestamp_ntz"
order by column_name;
-- result:
-- !result
drop catalog delta_test_${uuid0}
-- result:
-- !result

View File

@ -164,6 +164,35 @@ select col_timestamp from delta_test_${uuid0}.delta_oss_db.delta_lake_data_type
2024-04-25 12:00:00
2024-04-26 12:00:00
-- !result
select * from delta_test_${uuid0}.delta_oss_db.t_timestamp_ntz order by col_int;
-- result:
1 2024-01-02 01:02:03
2 2024-01-03 04:05:06
3 None
4 2024-01-04 01:02:03
5 2024-01-05 04:05:06
-- !result
select * from delta_test_${uuid0}.delta_oss_db.t_timestamp_ntz where col_timestamp_ntz is null order by col_int;
-- result:
3 None
-- !result
select * from delta_test_${uuid0}.delta_oss_db.t_timestamp_ntz where col_timestamp_ntz is not null order by col_int;
-- result:
1 2024-01-02 01:02:03
2 2024-01-03 04:05:06
4 2024-01-04 01:02:03
5 2024-01-05 04:05:06
-- !result
select * from delta_test_${uuid0}.delta_oss_db.t_timestamp_ntz where col_timestamp_ntz = '2024-01-02 01:02:03' order by col_int;
-- result:
1 2024-01-02 01:02:03
-- !result
select * from delta_test_${uuid0}.delta_oss_db.t_timestamp_ntz where col_timestamp_ntz >= '2024-01-02 01:02:01' and col_timestamp_ntz < '2024-01-04 01:02:04' order by col_int;
-- result:
1 2024-01-02 01:02:03
2 2024-01-03 04:05:06
4 2024-01-04 01:02:03
-- !result
select * from delta_test_${uuid0}.delta_oss_db.delta_test_column_mapping;
-- result:
E: (1064, "Unknown table 'Delta table feature [column mapping] is not supported'")
@ -183,6 +212,35 @@ select * from delta_test_${uuid0}.delta_oss_db.delta_lake_par_col_timestamp wher
-- result:
2 2 2023-01-01 01:01:01
-- !result
select * from delta_test_${uuid0}.delta_oss_db.t_partition_timestamp_ntz order by col_int;
-- result:
1 2024-01-02 01:02:03
2 2024-01-03 04:05:06
3 None
4 2024-01-04 01:02:03
5 2024-01-05 04:05:06
-- !result
select * from delta_test_${uuid0}.delta_oss_db.t_partition_timestamp_ntz where col_timestamp_ntz is null order by col_int;
-- result:
3 None
-- !result
select * from delta_test_${uuid0}.delta_oss_db.t_partition_timestamp_ntz where col_timestamp_ntz is not null order by col_int;
-- result:
1 2024-01-02 01:02:03
2 2024-01-03 04:05:06
4 2024-01-04 01:02:03
5 2024-01-05 04:05:06
-- !result
select * from delta_test_${uuid0}.delta_oss_db.t_partition_timestamp_ntz where col_timestamp_ntz = '2024-01-02 01:02:03' order by col_int;
-- result:
1 2024-01-02 01:02:03
-- !result
select * from delta_test_${uuid0}.delta_oss_db.t_partition_timestamp_ntz where col_timestamp_ntz >= '2024-01-02 01:02:01' and col_timestamp_ntz < '2024-01-04 01:02:04' order by col_int;
-- result:
1 2024-01-02 01:02:03
2 2024-01-03 04:05:06
4 2024-01-04 01:02:03
-- !result
select * from delta_test_${uuid0}.delta_oss_db.delta_lake_par_col_double order by col_smallint;
-- result:
E: (1064, 'Table partition by float/double/decimal datatype is not supported')

View File

@ -9,11 +9,24 @@ create external catalog delta_test_${uuid0} PROPERTIES (
"aws.s3.endpoint"="${oss_endpoint}"
);
update information_schema.be_configs set value="512" where name="primary_key_limit_size";
-- test manual analyze
analyze table delta_test_${uuid0}.delta_oss_db.delta_lake_data_type;
select column_name, row_count, data_size, hll_cardinality(ndv), `max`, `min` from default_catalog._statistics_.external_column_statistics
where catalog_name="delta_test_${uuid0}" and db_name="delta_oss_db" and table_name="delta_lake_data_type"
order by column_name;
-- test manual analyze timestamp_ntz type
analyze table delta_test_${uuid0}.delta_oss_db.t_timestamp_ntz;
select column_name, row_count, data_size, hll_cardinality(ndv), `max`, `min` from default_catalog._statistics_.external_column_statistics
where catalog_name="delta_test_${uuid0}" and db_name="delta_oss_db" and table_name="t_timestamp_ntz"
order by column_name;
analyze table delta_test_${uuid0}.delta_oss_db.t_partition_timestamp_ntz;
select column_name, row_count, data_size, hll_cardinality(ndv), `max`, `min` from default_catalog._statistics_.external_column_statistics
where catalog_name="delta_test_${uuid0}" and db_name="delta_oss_db" and table_name="t_partition_timestamp_ntz"
order by column_name;
-- drop catalog
drop catalog delta_test_${uuid0}

View File

@ -45,6 +45,13 @@ select col_tinyint,col_array,col_map,col_struct from delta_test_${uuid0}.delta_o
select col_timestamp from delta_test_${uuid0}.delta_oss_db.delta_lake_data_type where col_timestamp = '2024-04-24 12:00:00';
select col_timestamp from delta_test_${uuid0}.delta_oss_db.delta_lake_data_type where col_timestamp >= '2024-04-24 12:00:00' and col_timestamp < '2024-04-27 12:00:00';
-- test timestamp_ntz
select * from delta_test_${uuid0}.delta_oss_db.t_timestamp_ntz order by col_int;
select * from delta_test_${uuid0}.delta_oss_db.t_timestamp_ntz where col_timestamp_ntz is null order by col_int;
select * from delta_test_${uuid0}.delta_oss_db.t_timestamp_ntz where col_timestamp_ntz is not null order by col_int;
select * from delta_test_${uuid0}.delta_oss_db.t_timestamp_ntz where col_timestamp_ntz = '2024-01-02 01:02:03' order by col_int;
select * from delta_test_${uuid0}.delta_oss_db.t_timestamp_ntz where col_timestamp_ntz >= '2024-01-02 01:02:01' and col_timestamp_ntz < '2024-01-04 01:02:04' order by col_int;
-- test column mapping
select * from delta_test_${uuid0}.delta_oss_db.delta_test_column_mapping;
@ -53,6 +60,13 @@ select * from delta_test_${uuid0}.delta_oss_db.delta_lake_par_col_timestamp orde
select * from delta_test_${uuid0}.delta_oss_db.delta_lake_par_col_timestamp where col_timestamp > '2022-01-01 01:01:01' order by col_smallint;
select * from delta_test_${uuid0}.delta_oss_db.delta_lake_par_col_timestamp where col_timestamp = '2023-01-01 01:01:01' order by col_smallint;
-- test timestamp_ntz as partition type
select * from delta_test_${uuid0}.delta_oss_db.t_partition_timestamp_ntz order by col_int;
select * from delta_test_${uuid0}.delta_oss_db.t_partition_timestamp_ntz where col_timestamp_ntz is null order by col_int;
select * from delta_test_${uuid0}.delta_oss_db.t_partition_timestamp_ntz where col_timestamp_ntz is not null order by col_int;
select * from delta_test_${uuid0}.delta_oss_db.t_partition_timestamp_ntz where col_timestamp_ntz = '2024-01-02 01:02:03' order by col_int;
select * from delta_test_${uuid0}.delta_oss_db.t_partition_timestamp_ntz where col_timestamp_ntz >= '2024-01-02 01:02:01' and col_timestamp_ntz < '2024-01-04 01:02:04' order by col_int;
-- test double as partition type
select * from delta_test_${uuid0}.delta_oss_db.delta_lake_par_col_double order by col_smallint;