[BugFix] Use mariadb connector (#41279)

Signed-off-by: predator4ann <yunlong.sun@hotmail.com>
This commit is contained in:
predator4ann 2024-02-21 15:53:01 +08:00 committed by GitHub
parent 9290cb3a7e
commit 0bba6aff8a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 65 additions and 90 deletions

View File

@ -109,9 +109,9 @@
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
@ -298,13 +298,6 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>1.16.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>

View File

@ -20,7 +20,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Closer;
import com.mysql.cj.jdbc.JdbcStatement;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.trino.plugin.base.aggregation.AggregateFunctionRewriter;
@ -120,8 +119,6 @@ import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.mysql.cj.exceptions.MysqlErrorNumbers.ER_UNKNOWN_TABLE;
import static com.mysql.cj.exceptions.MysqlErrorNumbers.SQL_STATE_ER_TABLE_EXISTS_ERROR;
import static com.starrocks.data.load.stream.StreamLoadConstants.TABLE_MODEL_PRIMARY_KEYS;
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.airlift.slice.Slices.utf8Slice;
@ -194,24 +191,13 @@ public class StarRocksClient
extends BaseJdbcClient
{
private static final Logger log = Logger.get(StarRocksClient.class);
private static final int MAX_SUPPORTED_DATE_TIME_PRECISION = 6;
// MySQL driver returns width of timestamp types instead of precision.
// 19 characters are used for zero-precision timestamps while others
// require 19 + precision + 1 characters with the additional character
// required for the decimal separator.
private static final int ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE = 19;
// MySQL driver returns width of time types instead of precision, same as the above timestamp type.
private static final int ZERO_PRECISION_TIME_COLUMN_SIZE = 8;
// An empty character means that the table doesn't have a comment in MySQL
private static final String NO_COMMENT = "";
public static final String SQL_STATE_ER_TABLE_EXISTS_ERROR = "42S01";
public static final Integer ER_UNKNOWN_TABLE = 1109;
private static final JsonCodec<ColumnHistogram> HISTOGRAM_CODEC = jsonCodec(ColumnHistogram.class);
// We don't know null fraction, but having no null fraction will make CBO useless. Assume some arbitrary value.
private static final Estimate UNKNOWN_NULL_FRACTION_REPLACEMENT = Estimate.of(0.1);
private final Type jsonType;
private final boolean statisticsEnabled;
private final ConnectorExpressionRewriter<ParameterizedExpression> connectorExpressionRewriter;
@ -260,7 +246,6 @@ public class StarRocksClient
@Override
public Optional<JdbcExpression> implementAggregation(ConnectorSession session, AggregateFunction aggregate, Map<String, ColumnHandle> assignments)
{
// TODO support complex ConnectorExpressions
return aggregateFunctionRewriter.rewrite(session, aggregate, assignments);
}
@ -279,7 +264,6 @@ public class StarRocksClient
@Override
public Collection<String> listSchemas(Connection connection)
{
// for MySQL, we need to list catalogs instead of schemas
try (ResultSet resultSet = connection.getMetaData().getCatalogs()) {
ImmutableSet.Builder<String> schemaNames = ImmutableSet.builder();
while (resultSet.next()) {
@ -299,8 +283,7 @@ public class StarRocksClient
@Override
protected boolean filterSchema(String schemaName)
{
if (schemaName.equalsIgnoreCase("mysql")
|| schemaName.equalsIgnoreCase("sys")) {
if (schemaName.equalsIgnoreCase("sys")) {
return false;
}
return super.filterSchema(schemaName);
@ -311,8 +294,6 @@ public class StarRocksClient
throws SQLException
{
if (!resultSet.isAfterLast()) {
// Abort connection before closing. Without this, the MySQL driver
// attempts to drain the connection by reading all the results.
connection.abort(directExecutor());
}
}
@ -321,18 +302,13 @@ public class StarRocksClient
public PreparedStatement getPreparedStatement(Connection connection, String sql, Optional<Integer> columnCount)
throws SQLException
{
PreparedStatement statement = connection.prepareStatement(sql);
if (statement.isWrapperFor(JdbcStatement.class)) {
statement.unwrap(JdbcStatement.class).enableStreamingResults();
}
return statement;
return connection.prepareStatement(sql);
}
@Override
public ResultSet getTables(Connection connection, Optional<String> schemaName, Optional<String> tableName)
throws SQLException
{
// MySQL maps their "database" to SQL catalogs and does not have schemas
DatabaseMetaData metadata = connection.getMetaData();
return metadata.getTables(
schemaName.orElse(null),
@ -345,7 +321,6 @@ public class StarRocksClient
public Optional<String> getTableComment(ResultSet resultSet)
throws SQLException
{
// Empty remarks means that the table doesn't have a comment in MySQL
return Optional.ofNullable(emptyToNull(resultSet.getString("REMARKS")));
}
@ -355,7 +330,7 @@ public class StarRocksClient
String sql = format(
"ALTER TABLE %s COMMENT = %s",
quoted(handle.asPlainTable().getRemoteTableName()),
mysqlVarcharLiteral(comment.orElse(NO_COMMENT))); // An empty character removes the existing comment in MySQL
starRocksVarcharLiteral(comment.orElse(NO_COMMENT)));
execute(session, sql);
}
@ -363,7 +338,6 @@ public class StarRocksClient
protected String getTableSchemaName(ResultSet resultSet)
throws SQLException
{
// MySQL uses catalogs instead of schemas
return resultSet.getString("TABLE_CAT");
}
@ -372,10 +346,10 @@ public class StarRocksClient
{
checkArgument(tableMetadata.getProperties().isEmpty(), "Unsupported table properties: %s", tableMetadata.getProperties());
String columnName = columns.get(0).split(" ")[0];
return format("CREATE TABLE %s (%s) COMMENT %s DISTRIBUTED by hash(%s) PROPERTIES (\"replication_num\" = \"1\")", quoted(remoteTableName), join(", ", columns), mysqlVarcharLiteral(tableMetadata.getComment().orElse(NO_COMMENT)), columnName);
return format("CREATE TABLE %s (%s) COMMENT %s DISTRIBUTED by hash(%s) PROPERTIES (\"replication_num\" = \"1\")", quoted(remoteTableName), join(", ", columns), starRocksVarcharLiteral(tableMetadata.getComment().orElse(NO_COMMENT)), columnName);
}
private static String mysqlVarcharLiteral(String value)
private static String starRocksVarcharLiteral(String value)
{
requireNonNull(value, "value is null");
return "'" + value.replace("'", "''").replace("\\", "\\\\") + "'";
@ -439,7 +413,6 @@ public class StarRocksClient
int scale = min(decimalDigits, getDecimalDefaultScale(session));
return Optional.of(decimalColumnMapping(createDecimalType(Decimals.MAX_PRECISION, scale), getDecimalRoundingMode(session)));
}
// TODO does mysql support negative scale?
precision = precision + max(-decimalDigits, 0); // Map decimal(p, -s) (negative scale) to decimal(p+s, 0).
if (precision > Decimals.MAX_PRECISION) {
break;
@ -465,7 +438,7 @@ public class StarRocksClient
return Optional.of(ColumnMapping.longMapping(
DATE,
dateReadFunctionUsingLocalDate(),
mySqlDateWriteFunctionUsingLocalDate()));
starRocksDateWriteFunctionUsingLocalDate()));
case Types.TIME:
TimeType timeType = createTimeType(getTimePrecision(typeHandle.getRequiredColumnSize()));
@ -473,7 +446,7 @@ public class StarRocksClient
checkArgument(timeType.getPrecision() <= 9, "Unsupported type precision: %s", timeType);
return Optional.of(ColumnMapping.longMapping(
timeType,
mySqlTimeReadFunction(timeType),
starRocksTimeReadFunction(timeType),
timeWriteFunction(timeType.getPrecision())));
case Types.TIMESTAMP:
@ -481,7 +454,7 @@ public class StarRocksClient
checkArgument(timestampType.getPrecision() <= TimestampType.MAX_SHORT_PRECISION, "Precision is out of range: %s", timestampType.getPrecision());
return Optional.of(ColumnMapping.longMapping(
timestampType,
mySqlTimestampReadFunction(timestampType),
starRocksTimestampReadFunction(timestampType),
timestampWriteFunction(timestampType)));
}
@ -491,7 +464,7 @@ public class StarRocksClient
return Optional.empty();
}
private LongWriteFunction mySqlDateWriteFunctionUsingLocalDate()
private LongWriteFunction starRocksDateWriteFunctionUsingLocalDate()
{
return new LongWriteFunction() {
@Override
@ -509,7 +482,7 @@ public class StarRocksClient
};
}
private static LongReadFunction mySqlTimestampReadFunction(TimestampType timestampType)
private static LongReadFunction starRocksTimestampReadFunction(TimestampType timestampType)
{
return new LongReadFunction()
{
@ -531,7 +504,7 @@ public class StarRocksClient
};
}
private static LongReadFunction mySqlTimeReadFunction(TimeType timeType)
private static LongReadFunction starRocksTimeReadFunction(TimeType timeType)
{
return new LongReadFunction()
{
@ -607,7 +580,7 @@ public class StarRocksClient
}
if (type == DATE) {
return WriteMapping.longMapping("date", mySqlDateWriteFunctionUsingLocalDate());
return WriteMapping.longMapping("date", starRocksDateWriteFunctionUsingLocalDate());
}
if (type instanceof TimestampType timestampType) {
@ -675,7 +648,7 @@ public class StarRocksClient
verify(tableHandle.getAuthorization().isEmpty(), "Unexpected authorization is required for table: %s".formatted(tableHandle));
try (Connection connection = connectionFactory.openConnection(session)) {
verify(connection.getAutoCommit());
connection.setAutoCommit(true);
String remoteSchema = getIdentifierMapping().toRemoteSchemaName(identity, connection, schemaTableName.getSchemaName());
String remoteTable = getIdentifierMapping().toRemoteTableName(identity, connection, remoteSchema, schemaTableName.getTableName());
String catalog = connection.getCatalog();
@ -746,7 +719,7 @@ public class StarRocksClient
closer.register(() -> dropTable(session, temporaryTable, true));
try (Connection connection = getConnection(session, handle)) {
verify(connection.getAutoCommit());
connection.setAutoCommit(true);
String columns = handle.getColumnNames().stream()
.map(this::quoted)
.collect(joining(", "));
@ -791,8 +764,6 @@ public class StarRocksClient
@Override
public void renameTable(ConnectorSession session, JdbcTableHandle handle, SchemaTableName newTableName)
{
// MySQL doesn't support specifying the catalog name in a rename. By setting the
// catalogName parameter to null, it will be omitted in the ALTER TABLE statement.
RemoteTableName remoteTableName = handle.asPlainTable().getRemoteTableName();
verify(remoteTableName.getSchemaName().isEmpty());
renameTable(session, null, remoteTableName.getCatalogName().orElse(null), remoteTableName.getTableName(), newTableName);
@ -834,9 +805,7 @@ public class StarRocksClient
switch (sortItem.getSortOrder()) {
case ASC_NULLS_FIRST:
// In MySQL ASC implies NULLS FIRST
case DESC_NULLS_LAST:
// In MySQL DESC implies NULLS LAST
return Stream.of(columnSorting);
case ASC_NULLS_LAST:
@ -873,7 +842,6 @@ public class StarRocksClient
JoinStatistics statistics)
{
if (joinType == JoinType.FULL_OUTER) {
// Not supported in MySQL
return Optional.empty();
}
return implementJoinCostAware(
@ -889,7 +857,6 @@ public class StarRocksClient
protected boolean isSupportedJoinCondition(ConnectorSession session, JdbcJoinCondition joinCondition)
{
if (joinCondition.getOperator() == JoinCondition.Operator.IS_DISTINCT_FROM) {
// Not supported in MySQL
return false;
}
@ -1090,7 +1057,6 @@ public class StarRocksClient
}
catch (UnableToExecuteStatementException e) {
if (e.getCause() instanceof SQLSyntaxErrorException && ((SQLSyntaxErrorException) e.getCause()).getErrorCode() == ER_UNKNOWN_TABLE) {
// The table is available since MySQL 8
log.debug("INFORMATION_SCHEMA.COLUMN_STATISTICS table is not available: %s", e);
return ImmutableMap.of();
}
@ -1133,7 +1099,6 @@ public class StarRocksClient
}
}
// See https://dev.mysql.com/doc/refman/8.0/en/optimizer-statistics.html
public static class ColumnHistogram
{
private final Optional<Double> nullFraction;

View File

@ -19,7 +19,6 @@ import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
import com.mysql.jdbc.Driver;
import com.starrocks.data.load.stream.StreamLoadDataFormat;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
@ -42,9 +41,12 @@ import io.trino.spi.ptf.ConnectorTableFunction;
import java.sql.SQLException;
import java.util.Properties;
import org.mariadb.jdbc.Driver;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.plugin.starrocks.StarRocksJdbcConfig.transConnectionUrl;
public class StarRocksClientModule
extends AbstractConfigurationAwareModule
@ -84,7 +86,7 @@ public class StarRocksClientModule
{
return new DriverConnectionFactory(
new Driver(),
config.getConnectionUrl(),
transConnectionUrl(config.getConnectionUrl()),
getConnectionProperties(starRocksJdbcConfig),
credentialProvider);
}
@ -124,7 +126,7 @@ public class StarRocksClientModule
.build();
return StreamLoadProperties.builder()
.loadUrls(starRocksConfig.getLoadUrls().toArray(new String[0]))
.jdbcUrl(baseJdbcConfig.getConnectionUrl())
.jdbcUrl(transConnectionUrl(baseJdbcConfig.getConnectionUrl()))
.tableProperties(streamLoadTableProperties)
.cacheMaxBytes(starRocksConfig.getMaxCacheBytes())
.connectTimeout(starRocksConfig.getConnectTimeout())

View File

@ -14,33 +14,34 @@
package io.trino.plugin.starrocks;
import com.mysql.cj.conf.ConnectionUrlParser;
import com.mysql.cj.exceptions.CJException;
import com.mysql.jdbc.Driver;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.Duration;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.validation.constraints.AssertTrue;
import javax.validation.constraints.Min;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import org.mariadb.jdbc.Driver;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.mysql.cj.conf.ConnectionUrlParser.parseConnectionString;
public class StarRocksJdbcConfig
extends BaseJdbcConfig
{
public static final Pattern CONNECTION_STRING_PTRN = Pattern.compile("(?<scheme>[\\w\\+:%]+)\\s*"
+ "(?://(?<authority>[^/?#]*))?\\s*"
+ "(?:/(?!\\s*/)(?<path>[^?#]*))?"
+ "(?:\\?(?!\\s*\\?)(?<query>[^#]*))?"
+ "(?:\\s*#(?<fragment>.*))?");
private boolean autoReconnect = true;
private int maxReconnects = 3;
private Duration connectionTimeout = new Duration(10, TimeUnit.SECONDS);
// Using `useInformationSchema=true` prevents race condition inside MySQL driver's java.sql.DatabaseMetaData.getColumns
// implementation, which throw SQL exception when a table disappears during listing.
// Using `useInformationSchema=false` may provide more diagnostic information (see https://github.com/trinodb/trino/issues/1597)
private boolean driverUseInformationSchema = true;
public boolean isAutoReconnect()
@ -86,34 +87,48 @@ public class StarRocksJdbcConfig
}
@Config("starrocks.jdbc.use-information-schema")
@ConfigDescription("Value of useInformationSchema MySQL JDBC driver connection property")
@ConfigDescription("Value of useInformationSchema StarRocks JDBC driver connection property")
public StarRocksJdbcConfig setDriverUseInformationSchema(boolean driverUseInformationSchema)
{
this.driverUseInformationSchema = driverUseInformationSchema;
return this;
}
@AssertTrue(message = "Invalid JDBC URL for MySQL connector")
@AssertTrue(message = "Invalid JDBC URL for MariaDB connector")
public boolean isUrlValid()
{
try {
Driver driver = new Driver();
return driver.acceptsURL(getConnectionUrl());
}
catch (SQLException e) {
throw new RuntimeException(e);
}
Driver driver = new Driver();
return driver.acceptsURL(transConnectionUrl(getConnectionUrl()));
}
@AssertTrue(message = "Database (catalog) must not be specified in JDBC URL for MySQL connector")
@AssertTrue(message = "Database (catalog) must not be specified in JDBC URL for StarRocks connector")
public boolean isUrlWithoutDatabase()
{
try {
ConnectionUrlParser parser = parseConnectionString(getConnectionUrl());
return isNullOrEmpty(parser.getPath());
}
catch (CJException ignore) {
Matcher matcher = CONNECTION_STRING_PTRN.matcher(transConnectionUrl(getConnectionUrl()));
if (!matcher.matches()) {
return false;
}
String path = matcher.group("path") == null ? null : decode(matcher.group("path")).trim();
return isNullOrEmpty(path);
}
private static String decode(String text) {
if (isNullOrEmpty(text)) {
return text;
}
try {
return URLDecoder.decode(text, StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException e) {
// Won't happen.
}
return "";
}
public static String transConnectionUrl(String connectionUrl) {
// use org.mariadb.jdbc.Driver for mysql because of gpl protocol
if (connectionUrl.contains("mysql")) {
connectionUrl = connectionUrl.replace("mysql", "mariadb");
}
return connectionUrl;
}
}