[Feature] Add StarRocks Connector for Trino (#16240)

Signed-off-by: predator4ann <yunlong.sun@hotmail.com>
This commit is contained in:
predator4ann 2024-01-30 20:05:23 +08:00 committed by GitHub
parent f1315e1ae4
commit ff2b6f54a7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 5260 additions and 0 deletions

View File

@ -0,0 +1,262 @@
<?xml version="1.0"?>
<modernizer>
<violation>
<name>java/lang/Class.newInstance:()Ljava/lang/Object;</name>
<version>1.1</version>
<comment>Prefer Class.getConstructor().newInstance()</comment>
</violation>
<violation>
<name>java/lang/String."&lt;init&gt;":([B)V</name>
<version>1.1</version>
<comment>Prefer new String(byte[], Charset)</comment>
</violation>
<violation>
<name>java/lang/String.getBytes:()[B</name>
<version>1.1</version>
<comment>Prefer String.getBytes(Charset)</comment>
</violation>
<violation>
<name>java/lang/String.toLowerCase:()Ljava/lang/String;</name>
<version>1.1</version>
<comment>Prefer String.toLowerCase(java.util.Locale)</comment>
</violation>
<violation>
<name>java/lang/String.toUpperCase:()Ljava/lang/String;</name>
<version>1.1</version>
<comment>Prefer String.toUpperCase(java.util.Locale)</comment>
</violation>
<violation>
<!-- File.getPath() is the canonical way to convert File to its String representation suitable for passing to new File() -->
<name>java/io/File.toString:()Ljava/lang/String;</name>
<version>1.1</version>
<comment>Prefer File.getPath()</comment>
</violation>
<violation>
<name>com/google/common/primitives/Ints.checkedCast:(J)I</name>
<version>1.8</version>
<comment>Prefer Math.toIntExact(long)</comment>
</violation>
<violation>
<name>com/google/common/collect/ImmutableMap$Builder.build:()Lcom/google/common/collect/ImmutableMap;</name>
<version>1.8</version>
<comment>Use buildOrThrow() instead, as it makes it clear that it will throw on duplicated values</comment>
</violation>
<violation>
<name>com/google/common/collect/ImmutableTable$Builder.build:()Lcom/google/common/collect/ImmutableTable;</name>
<version>1.8</version>
<comment>Use buildOrThrow() instead, as it makes it clear that it will throw on duplicated values</comment>
</violation>
<violation>
<name>com/google/common/collect/ImmutableBiMap$Builder."&lt;init&gt;":()V</name>
<version>1.8</version>
<comment>Use builder() static factory method instead</comment>
</violation>
<violation>
<name>com/google/common/collect/ImmutableList$Builder."&lt;init&gt;":()V</name>
<version>1.8</version>
<comment>Use builder() static factory method instead</comment>
</violation>
<violation>
<name>com/google/common/collect/ImmutableMap$Builder."&lt;init&gt;":()V</name>
<version>1.8</version>
<comment>Use builder() static factory method instead</comment>
</violation>
<violation>
<name>com/google/common/collect/ImmutableMultimap$Builder."&lt;init&gt;":()V</name>
<version>1.8</version>
<comment>Use builder() static factory method instead</comment>
</violation>
<violation>
<name>com/google/common/collect/ImmutableMultiset$Builder."&lt;init&gt;":()V</name>
<version>1.8</version>
<comment>Use builder() static factory method instead</comment>
</violation>
<violation>
<name>com/google/common/collect/ImmutableSet$Builder."&lt;init&gt;":()V</name>
<version>1.8</version>
<comment>Use builder() static factory method instead</comment>
</violation>
<violation>
<name>com/google/common/collect/ImmutableSortedMap$Builder."&lt;init&gt;":()V</name>
<version>1.8</version>
<comment>Use orderedBy() static factory method instead</comment>
</violation>
<violation>
<name>com/google/common/collect/ImmutableSortedSet$Builder."&lt;init&gt;":()V</name>
<version>1.8</version>
<comment>Use orderedBy() static factory method instead</comment>
</violation>
<violation>
<name>com/google/common/collect/ImmutableTable$Builder."&lt;init&gt;":()V</name>
<version>1.8</version>
<comment>Use builder() static factory method instead</comment>
</violation>
<violation>
<name>com/google/common/cache/CacheBuilder.build:()Lcom/google/common/cache/Cache;</name>
<version>1.8</version>
<comment>Guava Cache has concurrency issues around invalidation and ongoing loads. Use EvictableCacheBuilder or SafeCaches to build caches.
See https://github.com/trinodb/trino/issues/10512 for more information and see https://github.com/trinodb/trino/issues/10512#issuecomment-1016221168
for why Caffeine does not solve the problem.</comment>
</violation>
<violation>
<name>com/google/common/cache/CacheBuilder.build:(Lcom/google/common/cache/CacheLoader;)Lcom/google/common/cache/LoadingCache;</name>
<version>1.8</version>
<comment>Guava LoadingCache has concurrency issues around invalidation and ongoing loads. Use EvictableCacheBuilder or SafeCaches to build caches.
See https://github.com/trinodb/trino/issues/10512 for more information and see https://github.com/trinodb/trino/issues/10512#issuecomment-1016221168
for why Caffeine does not solve the problem.</comment>
</violation>
<violation>
<name>org/testng/Assert.assertEquals:(Ljava/lang/Iterable;Ljava/lang/Iterable;)V</name>
<version>1.8</version>
<comment>Use io.trino.testing.assertions.Assert.assertEquals due to TestNG #543</comment>
</violation>
<violation>
<name>org/testng/Assert.assertEquals:(Ljava/lang/Iterable;Ljava/lang/Iterable;Ljava/lang/String;)V</name>
<version>1.8</version>
<comment>Use io.trino.testing.assertions.Assert.assertEquals due to TestNG #543</comment>
</violation>
<violation>
<name>org/testng/Assert.assertThrows:(Lorg/testng/Assert$ThrowingRunnable;)V</name>
<version>1.8</version>
<comment>Use AssertJ's assertThatThrownBy, see https://github.com/trinodb/trino/issues/5320 for rationale</comment>
</violation>
<violation>
<name>org/testng/Assert.assertThrows:(Ljava/lang/Class;Lorg/testng/Assert$ThrowingRunnable;)V</name>
<version>1.8</version>
<comment>Use AssertJ's assertThatThrownBy, see https://github.com/trinodb/trino/issues/5320 for rationale</comment>
</violation>
<violation>
<name>org/apache/hadoop/mapred/JobConf."&lt;init&gt;":()V</name>
<version>1.1</version>
<comment>This constructor reads default configuration resource files implicitly. Prefer new JobConf(Configuration)</comment>
</violation>
<violation>
<name>org/apache/hadoop/mapred/JobConf."&lt;init&gt;":(Ljava/lang/Class;)V</name>
<version>1.1</version>
<comment>This constructor reads default configuration resource files implicitly. Prefer new JobConf(Configuration)</comment>
</violation>
<violation>
<name>org/apache/hadoop/conf/Configuration."&lt;init&gt;":()V</name>
<version>1.1</version>
<comment>Prefer ConfigurationInstantiator.newEmptyConfiguration() for two reasons: (1) loading default resources is unlikely desired and (2) ConfigurationInstantiator adds additional safety checks</comment>
</violation>
<violation>
<name>org/apache/hadoop/conf/Configuration."&lt;init&gt;":(Z)V</name>
<version>1.1</version>
<comment>Prefer ConfigurationInstantiator.newEmptyConfiguration()</comment>
</violation>
<violation>
<name>java/util/TimeZone.getTimeZone:(Ljava/lang/String;)Ljava/util/TimeZone;</name>
<version>1.8</version>
<comment>Avoid TimeZone.getTimeZone as it returns GMT for a zone not supported by the JVM. Use TimeZone.getTimeZone(ZoneId.of(..)) instead, or TimeZone.getTimeZone(..., false).</comment>
</violation>
<violation>
<name>org/joda/time/DateTimeZone.toTimeZone:()Ljava/util/TimeZone;</name>
<version>1.8</version>
<comment>Avoid DateTimeZone.toTimeZone as it returns GMT for a zone not supported by the JVM. Use TimeZone.getTimeZone(ZoneId.of(dtz.getId())) instead.</comment>
</violation>
<violation>
<name>com/esri/core/geometry/ogc/OGCGeometry.equals:(Lcom/esri/core/geometry/ogc/OGCGeometry;)Z</name>
<version>1.6</version>
<comment>Prefer OGCGeometry.Equals(OGCGeometry)</comment>
</violation>
<violation>
<name>com/esri/core/geometry/ogc/OGCGeometry.equals:(Ljava/lang/Object;)Z</name>
<version>1.6</version>
<comment>Prefer OGCGeometry.Equals(OGCGeometry)</comment>
</violation>
<violation>
<name>io/airlift/units/DataSize."&lt;init&gt;":(DLio/airlift/units/DataSize$Unit;)V</name>
<version>1.8</version>
<comment>Use io.airlift.units.DataSize.of(long, DataSize.Unit)</comment>
</violation>
<violation>
<name>io/airlift/units/DataSize.succinctDataSize:(DLio/airlift/units/DataSize$Unit;)Lio/airlift/units/DataSize;</name>
<version>1.8</version>
<comment>Use io.airlift.units.DataSize.of(long, DataSize.Unit).succinct() -- Note that succinct conversion only affects toString() results</comment>
</violation>
<violation>
<name>io/airlift/units/DataSize.getValue:()D</name>
<version>1.8</version>
<comment>Use io.airlift.units.DataSize.toBytes() and Unit.inBytes() for conversion</comment>
</violation>
<violation>
<name>io/airlift/units/DataSize.getValue:(Lio/airlift/units/DataSize$Unit;)D</name>
<version>1.8</version>
<comment>Use io.airlift.units.DataSize.toBytes() and Unit.inBytes() for conversion</comment>
</violation>
<violation>
<name>io/airlift/units/DataSize.roundTo:(Lio/airlift/units/DataSize$Unit;)J</name>
<version>1.8</version>
<comment>Method is deprecated for removal</comment>
</violation>
<violation>
<name>io/airlift/units/DataSize.convertTo:(Lio/airlift/units/DataSize$Unit;)Lio/airlift/units/DataSize;</name>
<version>1.8</version>
<comment>Use io.airlift.units.DataSize.to(DataSize.Unit)</comment>
</violation>
<violation>
<name>io/airlift/units/DataSize.convertToMostSuccinctDataSize:()Lio/airlift/units/DataSize;</name>
<version>1.8</version>
<comment>Use io.airlift.units.DataSize.succinct()</comment>
</violation>
<violation>
<name>io/airlift/testing/Closeables.closeQuietly:([Ljava/io/Closeable;)V</name>
<version>1.0</version>
<comment>Use Closeables.closeAll() or Closer.</comment>
</violation>
<violation>
<name>com/google/inject/util/Modules.combine:(Ljava/lang/Iterable;)Lcom/google/inject/Module;</name>
<version>1.8</version>
<comment>Use io.airlift.configuration.ConfigurationAwareModule.combine</comment>
</violation>
<violation>
<name>com/google/inject/util/Modules.combine:([Lcom/google/inject/Module;)Lcom/google/inject/Module;</name>
<version>1.8</version>
<comment>Use io.airlift.configuration.ConfigurationAwareModule.combine</comment>
</violation>
<violation>
<name>io/jsonwebtoken/Jwts.builder:()Lio/jsonwebtoken/JwtBuilder;</name>
<version>1.8</version>
<comment>Use io.trino.server.security.jwt.JwtsUtil or equivalent</comment>
</violation>
<violation>
<name>io/jsonwebtoken/Jwts.parserBuilder:()Lio/jsonwebtoken/JwtParserBuilder;</name>
<version>1.8</version>
<comment>Use io.trino.server.security.jwt.JwtsUtil or equivalent</comment>
</violation>
</modernizer>

View File

@ -0,0 +1,133 @@
# StarRocks connector
This is a connector for StarRocks that is compatible with [Trino](https://trino.io/). The connector allows querying and inserting in an external StarRocks instance. This can be used to join data between different systems like StarRocks and Hive, or between two different StarRocks instances.
## Requirements
To connect to StarRocks, you need:
- StarRocks 2.5.0 or higher.
- Network access from the Trino coordinator and workers to StarRocks. Port 9030 and 8030 are the default port.
## Configuration
To configure the StarRocks connector, create a catalog properties file in `etc/catalog` named, for example, example.properties, to mount the StarRocks connector as the starrocks catalog. Create the file with the following contents, replacing the connection properties as appropriate for your setup:
```
connector.name=starrocks
connection-url=jdbc:mysql://starrocks:9030
connection-user=root
connection-password=
starrocks.client.load-url=starrocks:8080
```
The `connection-url` defines the connection information and parameters to pass to the MySQL JDBC driver. The `connection-user` and `connection-password` are typically required and determine the user credentials for the connection, often a service user. You can use secrets to avoid actual values in the catalog properties files.
The following table describes configuration properties for connection credentials:
| Property name | Required | Default value | Description |
|-|-|-|-|
| connector.name | Yes | NONE | Connector name. |
| connection-url | Yes | NONE | Connection information and parameters. |
| connection-user | Yes | NONE | Connection user name. |
| connection-password | Yes | NONE | Connection password. |
| starrocks.client.load-url | Yes | NONE | The address that is used to connect to the HTTP server of the FE. You can specify multiple addresses, which must be separated by a semicolon (;). Format: <fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>. |
| starrocks.client.label-prefix | No | trino- | The label prefix used by Stream Load. |
| starrocks.client.max-cache-bytes | No | 268435456 | The maximum size of data that can be accumulated in memory before being sent to StarRocks at a time. The maximum value ranges from 64 MB to 10 GB, the default value is 256MB. |
| starrocks.client.connect-timeout | No | 30000 | The timeout for establishing HTTP connection. Valid values: 100 to 60000. Unit: ms, the default value is 30000. |
## Multiple StarRocks servers
You can have as many catalogs as you need, so if you have additional StarRocks servers, simply add another properties file to `etc/catalog` with a different name, making sure it ends in .properties. For example, if you name the property file sales.properties, Trino creates a catalog named sales using the configured connector.
## Type mapping
Because Trino and StarRocks each support types that the other does not, this connector modifies some types when reading or writing data. Data types may not map the same way in both directions between Trino and the data source. Refer to the following sections for type mapping in each direction.
### StarRocks to Trino type mapping
The connector maps StarRocks types to the corresponding Trino types following this table:
| StarRocks type | Trino type | Notes |
|-|-|-|
| `BOOLEAN` | `TINYINT ` | |
| `TINYINT` | `TINYINT` | |
| `SMALLINT` | `SMALLINT` | |
| `INT` | `INTEGER` | |
| `BIGINT` | `BIGINT` | |
| `LARGEINT` | `DECIMAL(20, 0)` | |
| `DECIMAL(p, s)` | `DECIMAL(p, s)` | |
| `DOUBLE` | `DOUBLE` | |
| `FLOAT` | `REAL` | |
| `CHAR(n)` | `CHAR(n)` | |
| `STRING` | `VARCHAR(65533)` | |
| `VARCHAR(n)` | `VARCHAR(n)` | |
| `BINARY, VARBINARY` | `VARBINARY` | |
| `DATE` | `DATE` | |
| `DATETIME` | `TIMESTAMP(0)` | |
| `JSON` | `JSON` | |
### Trino to StarRocks type mapping
The connector maps StarRocks types to the corresponding Trino types following this table:
| Trino type | StarRocks type | Notes |
|-|-|-|
| `BOOLEAN` | `BOOLEAN` | |
| `TINYINT` | `TINYINT` | |
| `SMALLINT` | `SMALLINT` | |
| `INTEGER` | `INT` | |
| `BIGINT` | `BIGINT` | |
| `REAL` | `FLOAT` | |
| `DOUBLE` | `DOUBLE` | |
| `DECIMAL(p, s)` | `DECIMAL(p, s)` | |
| `VARCHAR(n)` | `VARCHAR(n)` | |
| `VARCHAR(0)` | `VARCHAR(65533)` | |
| `CHAR(n)` | `CHAR(n)` | |
| `VARBINARY` | `VARBINARY` | |
| `JSON` | `JSON` | |
| `DATE` | `DATE` | |
| `TIMESTAMP(0)` | `DATETIME` | |
No other types are supported.
### Type mapping configuration properties
The following properties can be used to configure how data types from the connected data source are mapped to Trino data types and how the metadata is cached in Trino.
| Property name | Description | Default value |
|-|-|-|
| `upsupported-type-handling` | Configure how unsupported column data types are handled: <br>`IGNORE`, column is not accessible.<br>`CONVERT_TO_VARCHAR`, column is converted to unbounded VARCHAR.<br>The respective catalog session property is unsupported_type_handling. | `IGNORE` |
## Querying StarRocks
The StarRocks connector provides a schema for every StarRocks database. You can see the available StarRocks databases by running SHOW SCHEMAS:
```sql
SHOW SCHEMAS FROM starrocks;
```
If you have a StarRocks database named example, you can view the tables in this database by running SHOW TABLES:
```
SHOW TABLES FROM starrocks.example;
```
You can see a list of the columns in the clicks table in the example database using either of the following:
```sql
DESCRIBE starrocks.example.clicks;
SHOW COLUMNS FROM starrocks.example.clicks;
```
Finally, you can access the clicks table in the example database:
```sql
SELECT * FROM starrocks.example.clicks;
```
If you used a different name for your catalog properties file, use that catalog name instead of starrocks in the above examples.
## SQL support
The connector provides read access and write access to data and metadata in the StarRocks database. In addition to the globally available and read operation statements, the connector supports the following statements:
- [INSERT](https://trino.io/docs/current/sql/insert.html)
- [DELETE](https://trino.io/docs/current/sql/delete.html)
- [TRUNCATE](https://trino.io/docs/current/sql/truncate.html)
- [DROP TABLE](https://trino.io/docs/current/sql/drop-table.html)
- [CREATE SCHEMA](https://trino.io/docs/current/sql/create-schema.html)
- [DROP SCHEMA](https://trino.io/docs/current/sql/drop-schema.html)
### SQL DELETE
If a WHERE clause is specified, the DELETE operation only works if the predicate in the clause can be fully pushed down to the data source.
## Fault-tolerant execution support
The connector does not support [Fault-tolerant](https://trino.io/docs/current/admin/fault-tolerant-execution.html) execution of query processing.

View File

@ -0,0 +1,9 @@
connector.name=starrocks
connection-url=jdbc:mysql://starrocks:9030
connection-user=root
connection-password=
starrocks.client.load-url=starrocks:8080
starrocks.client.label-prefix=trino-
starrocks.client.max-cache-bytes=209715200
starrocks.client.connect-timeout=30000
starrocks.client.chunk-limit=314572800

View File

@ -0,0 +1,465 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.starrocks</groupId>
<artifactId>trino-starrocks</artifactId>
<version>418</version>
<description>Trino - StarRocks Connector</description>
<packaging>trino-plugin</packaging>
<properties>
<air.main.basedir>${project.basedir}</air.main.basedir>
<dep.airlift.version>230</dep.airlift.version>
<dep.airlift.units.version>1.8</dep.airlift.units.version>
<dep.jackson.version>2.15.0</dep.jackson.version>
<dep.guava.version>31.1-jre</dep.guava.version>
<dep.guice.version>6.0.0</dep.guice.version>
<dep.drift.version>1.14</dep.drift.version>
<dep.antlr.version>4.12.0</dep.antlr.version>
<dep.opentelemetry.version>1.26.0</dep.opentelemetry.version>
</properties>
<dependencies>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-base-jdbc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-plugin-toolkit</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>configuration</artifactId>
<version>${dep.airlift.version}</version>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
<version>${dep.airlift.version}</version>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
<version>${dep.airlift.version}</version>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
<version>${dep.airlift.units.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${dep.jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${dep.jackson.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>3.0.2</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${dep.guava.version}</version>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>${dep.guice.version}</version>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>2.0.1.Final</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
<exclusions>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.15</version>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi3-core</artifactId>
<version>3.23.0</version>
</dependency>
<!-- used by tests but also needed transitively -->
<dependency>
<groupId>io.airlift</groupId>
<artifactId>log-manager</artifactId>
<version>${dep.airlift.version}</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Trino SPI -->
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>slice</artifactId>
<version>0.42</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${dep.jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>${dep.opentelemetry.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
<version>${dep.opentelemetry.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.2</version>
<scope>provided</scope>
</dependency>
<!-- for testing -->
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-base-jdbc</artifactId>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-exchange-filesystem</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-main</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
</exclusion>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-main</artifactId>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing-containers</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing-services</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-tpch</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.trino.tpch</groupId>
<artifactId>tpch</artifactId>
<version>1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>testing</artifactId>
<version>${dep.airlift.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.18.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>19.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>1.16.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.16.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.10</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
<version>${dep.antlr.version}</version>
<executions>
<execution>
<goals>
<goal>antlr4</goal>
</goals>
</execution>
</executions>
<configuration>
<visitor>true</visitor>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.skife.maven</groupId>
<artifactId>really-executable-jar-maven-plugin</artifactId>
<version>2.1.1</version>
</plugin>
<plugin>
<groupId>org.codehaus.gmaven</groupId>
<artifactId>groovy-maven-plugin</artifactId>
<version>2.1.1</version>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<groupId>io.airlift.drift</groupId>
<artifactId>drift-maven-plugin</artifactId>
<version>${dep.drift.version}</version>
</plugin>
<plugin>
<groupId>org.gaul</groupId>
<artifactId>modernizer-maven-plugin</artifactId>
<configuration>
<violationsFiles>
<violationsFile>${air.main.basedir}/.mvn/modernizer/violations.xml</violationsFile>
</violationsFiles>
<exclusionPatterns>
<exclusionPattern>org/joda/time/.*</exclusionPattern>
</exclusionPatterns>
<exclusions>
<!-- getOnlyElement is more readable than the stream analogue -->
<exclusion>com/google/common/collect/Iterables.getOnlyElement:(Ljava/lang/Iterable;)Ljava/lang/Object;</exclusion>
<!-- getLast has lower complexity for array based lists than the stream analogue (O(1) vs O(log(N)) -->
<exclusion>com/google/common/collect/Iterables.getLast:(Ljava/lang/Iterable;)Ljava/lang/Object;</exclusion>
<exclusion>com/google/common/collect/Iterables.getLast:(Ljava/lang/Iterable;Ljava/lang/Object;)Ljava/lang/Object;</exclusion>
<!-- TODO: requires getting to common understanding which of those we want to enable -->
<exclusion>com/google/common/collect/Iterables.transform:(Ljava/lang/Iterable;Lcom/google/common/base/Function;)Ljava/lang/Iterable;</exclusion>
<exclusion>com/google/common/collect/Lists.transform:(Ljava/util/List;Lcom/google/common/base/Function;)Ljava/util/List;</exclusion>
<exclusion>com/google/common/collect/Iterables.isEmpty:(Ljava/lang/Iterable;)Z</exclusion>
<exclusion>com/google/common/collect/Iterables.concat:(Ljava/lang/Iterable;Ljava/lang/Iterable;)Ljava/lang/Iterable;</exclusion>
<exclusion>com/google/common/collect/Iterables.concat:(Ljava/lang/Iterable;Ljava/lang/Iterable;Ljava/lang/Iterable;)Ljava/lang/Iterable;</exclusion>
<exclusion>com/google/common/collect/Iterables.concat:(Ljava/lang/Iterable;Ljava/lang/Iterable;Ljava/lang/Iterable;Ljava/lang/Iterable;)Ljava/lang/Iterable;</exclusion>
<exclusion>com/google/common/collect/Iterables.concat:(Ljava/lang/Iterable;)Ljava/lang/Iterable;</exclusion>
<exclusion>com/google/common/collect/Iterables.all:(Ljava/lang/Iterable;Lcom/google/common/base/Predicate;)Z</exclusion>
<exclusion>com/google/common/collect/Iterables.any:(Ljava/lang/Iterable;Lcom/google/common/base/Predicate;)Z</exclusion>
<exclusion>com/google/common/collect/Iterables.skip:(Ljava/lang/Iterable;I)Ljava/lang/Iterable;</exclusion>
<exclusion>com/google/common/collect/Iterables.limit:(Ljava/lang/Iterable;I)Ljava/lang/Iterable;</exclusion>
<exclusion>com/google/common/collect/Iterables.get:(Ljava/lang/Iterable;I)Ljava/lang/Object;</exclusion>
<exclusion>com/google/common/collect/Iterables.getFirst:(Ljava/lang/Iterable;Ljava/lang/Object;)Ljava/lang/Object;</exclusion>
<exclusion>com/google/common/collect/Iterables.getLast:(Ljava/lang/Iterable;)Ljava/lang/Object;</exclusion>
<exclusion>com/google/common/collect/Iterables.cycle:(Ljava/lang/Iterable;)Ljava/lang/Iterable;</exclusion>
<exclusion>com/google/common/collect/Iterables.cycle:([Ljava/lang/Object;)Ljava/lang/Iterable;</exclusion>
<exclusion>com/google/common/collect/Iterables.getOnlyElement:(Ljava/lang/Iterable;Ljava/lang/Object;)Ljava/lang/Object;</exclusion>
</exclusions>
</configuration>
</plugin>
<plugin>
<groupId>ca.vanzyl.provisio.maven.plugins</groupId>
<artifactId>provisio-maven-plugin</artifactId>
<version>1.0.18</version>
</plugin>
<plugin>
<groupId>io.trino</groupId>
<artifactId>trino-maven-plugin</artifactId>
<version>12</version>
<extensions>true</extensions>
</plugin>
<plugin>
<groupId>ca.vanzyl.provisio.maven.plugins</groupId>
<artifactId>provisio-maven-plugin</artifactId>
<extensions>true</extensions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration combine.children="append">
<fork>false</fork>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<failIfNoTests>false</failIfNoTests>
<includes>
<!-- Tests classes should start with "Test", but we do also want to include tests incorrectly named, with "Test" at the end -->
<include>**/Test*.java</include>
<include>**/*Test.java</include>
<include>**/Benchmark*.java</include>
</includes>
<excludes>
<exclude>**/*jmhTest*.java</exclude>
<exclude>**/*jmhType*.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,58 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.starrocks.data.load.stream;
import java.time.format.DateTimeFormatter;
public class StreamLoadConstants
{
private StreamLoadConstants()
{}
public static final String RESULT_STATUS_OK = "OK";
public static final String RESULT_STATUS_SUCCESS = "Success";
public static final String RESULT_STATUS_FAILED = "Fail";
public static final String RESULT_STATUS_LABEL_EXISTED = "Label Already Exists";
public static final String RESULT_STATUS_TRANSACTION_PUBLISH_TIMEOUT = "Publish Timeout";
public static final String LABEL_STATE_VISIBLE = "VISIBLE";
public static final String LABEL_STATE_COMMITTED = "COMMITTED";
public static final String LABEL_STATE_PREPARED = "PREPARED";
public static final String LABEL_STATE_PREPARE = "PREPARE";
public static final String LABEL_STATE_ABORTED = "ABORTED";
public static final String LABEL_STATE_UNKNOWN = "UNKNOWN";
public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
public static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static final String TABLE_MODEL_PRIMARY_KEYS = "PRIMARY_KEYS";
}

View File

@ -0,0 +1,114 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.starrocks.data.load.stream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
public interface StreamLoadDataFormat
{
StreamLoadDataFormat JSON = new JSONFormat();
StreamLoadDataFormat CSV = new CSVFormat();
byte[] first();
byte[] delimiter();
byte[] end();
class CSVFormat
implements StreamLoadDataFormat, Serializable
{
private static final byte[] NEW_LINE = "\n".getBytes(StandardCharsets.UTF_8);
private final byte[] delimiter;
public CSVFormat()
{
this("\n");
}
public CSVFormat(String rowDelimiter)
{
if (rowDelimiter == null) {
throw new IllegalArgumentException("row delimiter can not be null");
}
this.delimiter = rowDelimiter.getBytes(StandardCharsets.UTF_8);
}
@Override
public byte[] first()
{
return NEW_LINE;
}
@Override
public byte[] delimiter()
{
return delimiter;
}
@Override
public byte[] end()
{
return NEW_LINE;
}
}
class JSONFormat
implements StreamLoadDataFormat, Serializable
{
private static final byte[] first = "[".getBytes(StandardCharsets.UTF_8);
private static final byte[] delimiter = ",".getBytes(StandardCharsets.UTF_8);
private static final byte[] end = "]".getBytes(StandardCharsets.UTF_8);
@Override
public byte[] first()
{
return first;
}
@Override
public byte[] delimiter()
{
return delimiter;
}
@Override
public byte[] end()
{
return end;
}
}
}

View File

@ -0,0 +1,303 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.starrocks.data.load.stream;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Serializable;
public class StreamLoadResponse
implements Serializable
{
private boolean cancel;
private Long flushRows;
private Long flushBytes;
private Long costNanoTime;
private StreamLoadResponseBody body;
private Exception exception;
public void cancel()
{
this.cancel = true;
}
public boolean isCancel()
{
return cancel;
}
public Long getFlushRows()
{
return flushRows;
}
public Long getFlushBytes()
{
return flushBytes;
}
public Long getCostNanoTime()
{
return costNanoTime;
}
public StreamLoadResponseBody getBody()
{
return body;
}
public Exception getException()
{
return exception;
}
public void setFlushBytes(long flushBytes)
{
this.flushBytes = flushBytes;
}
public void setFlushRows(long flushRows)
{
this.flushRows = flushRows;
}
public void setCostNanoTime(long costNanoTime)
{
this.costNanoTime = costNanoTime;
}
public void setBody(StreamLoadResponseBody body)
{
this.body = body;
}
public void setException(Exception e)
{
this.exception = e;
}
public static class StreamLoadResponseBody
implements Serializable
{
@JsonProperty(value = "TxnId")
private Long txnId;
@JsonProperty(value = "Label")
private String label;
@JsonProperty(value = "State")
private String state;
@JsonProperty(value = "Status")
private String status;
@JsonProperty(value = "ExistingJobStatus")
private String existingJobStatus;
@JsonProperty(value = "Message")
private String message;
@JsonProperty(value = "Msg")
private String msg;
@JsonProperty(value = "NumberTotalRows")
private Long numberTotalRows;
@JsonProperty(value = "NumberLoadedRows")
private Long numberLoadedRows;
@JsonProperty(value = "NumberFilteredRows")
private Long numberFilteredRows;
@JsonProperty(value = "NumberUnselectedRows")
private Long numberUnselectedRows;
@JsonProperty(value = "ErrorURL")
private String errorURL;
@JsonProperty(value = "LoadBytes")
private Long loadBytes;
@JsonProperty(value = "LoadTimeMs")
private Long loadTimeMs;
@JsonProperty(value = "BeginTxnTimeMs")
private Long beginTxnTimeMs;
@JsonProperty(value = "StreamLoadPlanTimeMs")
private Long streamLoadPutTimeMs;
@JsonProperty(value = "ReadDataTimeMs")
private Long readDataTimeMs;
@JsonProperty(value = "WriteDataTimeMs")
private Long writeDataTimeMs;
@JsonProperty(value = "CommitAndPublishTimeMs")
private Long commitAndPublishTimeMs;
public Long getNumberTotalRows()
{
return numberTotalRows;
}
public Long getNumberLoadedRows()
{
return numberLoadedRows;
}
public void setTxnId(Long txnId)
{
this.txnId = txnId;
}
public void setLabel(String label)
{
this.label = label;
}
public void setState(String state)
{
this.state = state;
}
public void setStatus(String status)
{
this.status = status;
}
public void setExistingJobStatus(String existingJobStatus)
{
this.existingJobStatus = existingJobStatus;
}
public void setMessage(String message)
{
this.message = message;
}
public void setMsg(String msg)
{
this.msg = msg;
}
public void setNumberTotalRows(Long numberTotalRows)
{
this.numberTotalRows = numberTotalRows;
}
public void setNumberLoadedRows(Long numberLoadedRows)
{
this.numberLoadedRows = numberLoadedRows;
}
public void setNumberFilteredRows(Long numberFilteredRows)
{
this.numberFilteredRows = numberFilteredRows;
}
public void setNumberUnselectedRows(Long numberUnselectedRows)
{
this.numberUnselectedRows = numberUnselectedRows;
}
public void setErrorURL(String errorURL)
{
this.errorURL = errorURL;
}
public void setLoadBytes(Long loadBytes)
{
this.loadBytes = loadBytes;
}
public void setLoadTimeMs(Long loadTimeMs)
{
this.loadTimeMs = loadTimeMs;
}
public void setBeginTxnTimeMs(Long beginTxnTimeMs)
{
this.beginTxnTimeMs = beginTxnTimeMs;
}
public void setStreamLoadPutTimeMs(Long streamLoadPutTimeMs)
{
this.streamLoadPutTimeMs = streamLoadPutTimeMs;
}
public void setReadDataTimeMs(Long readDataTimeMs)
{
this.readDataTimeMs = readDataTimeMs;
}
public void setWriteDataTimeMs(Long writeDataTimeMs)
{
this.writeDataTimeMs = writeDataTimeMs;
}
public void setCommitAndPublishTimeMs(Long commitAndPublishTimeMs)
{
this.commitAndPublishTimeMs = commitAndPublishTimeMs;
}
public String getState()
{
return state;
}
public String getStatus()
{
return status;
}
public String getMsg()
{
return msg;
}
public Long getCommitAndPublishTimeMs()
{
return commitAndPublishTimeMs;
}
public Long getStreamLoadPutTimeMs()
{
return streamLoadPutTimeMs;
}
public Long getReadDataTimeMs()
{
return readDataTimeMs;
}
public Long getWriteDataTimeMs()
{
return writeDataTimeMs;
}
public Long getLoadTimeMs()
{
return loadTimeMs;
}
public Long getNumberFilteredRows()
{
return numberFilteredRows;
}
}
}

View File

@ -0,0 +1,116 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.starrocks.data.load.stream;
import io.trino.spi.block.Block;
import io.trino.spi.type.LongTimestamp;
import io.trino.spi.type.TimestampType;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.Base64;
import static io.trino.spi.type.TimestampType.MAX_SHORT_PRECISION;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND;
import static io.trino.spi.type.Timestamps.roundDiv;
import static java.lang.Math.floorMod;
import static java.time.ZoneOffset.UTC;
public class StreamLoadUtils
{
private StreamLoadUtils()
{
}
public static String getTableUniqueKey(String database, String table)
{
return database + "-" + table;
}
public static String getSendUrl(String host, String database, String table)
{
if (host == null) {
throw new IllegalArgumentException("None of the hosts in `load_url` could be connected.");
}
return host + "/api/" + database + "/" + table + "/_stream_load";
}
public static String getBasicAuthHeader(String username, String password)
{
String auth = username + ":" + password;
byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encodedAuth, StandardCharsets.UTF_8);
}
public static LocalDateTime toLocalDateTime(TimestampType type, Block block, int position)
{
int precision = type.getPrecision();
long epochMicros;
int picosOfMicro = 0;
if (precision <= MAX_SHORT_PRECISION) {
epochMicros = type.getLong(block, position);
}
else {
LongTimestamp timestamp = (LongTimestamp) type.getObject(block, position);
epochMicros = timestamp.getEpochMicros();
picosOfMicro = timestamp.getPicosOfMicro();
}
long epochSecond = scaleEpochMicrosToSeconds(epochMicros);
long nanoFraction = getMicrosOfSecond(epochMicros) * NANOSECONDS_PER_MICROSECOND + (roundToNearest(picosOfMicro, PICOSECONDS_PER_NANOSECOND) / PICOSECONDS_PER_NANOSECOND);
Instant instant = Instant.ofEpochSecond(epochSecond, nanoFraction);
return LocalDateTime.ofInstant(instant, UTC);
}
public static long scaleEpochMicrosToSeconds(long epochMicros)
{
return Math.floorDiv(epochMicros, MICROSECONDS_PER_SECOND);
}
public static long getMicrosOfSecond(long epochMicros)
{
return floorMod(epochMicros, MICROSECONDS_PER_SECOND);
}
public static long roundToNearest(long value, long bound)
{
return roundDiv(value, bound) * bound;
}
}

View File

@ -0,0 +1,256 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.starrocks.data.load.stream;
import com.starrocks.data.load.stream.http.StreamLoadEntityMeta;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import io.airlift.log.Logger;
import io.trino.plugin.starrocks.StarRocksOperationApplier;
import java.io.Serializable;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicLong;
public class StreamTableRegion
implements Serializable
{
private static final Logger log = Logger.get(StreamTableRegion.class);
private static final byte[] END_STREAM = new byte[0];
private final String uniqueKey;
private final String database;
private final String table;
private final Optional<String> temporaryTableName;
private final StarRocksOperationApplier applier;
private final long chunkLimit;
private final String labelPrefix;
private final StreamLoadDataFormat dataFormat;
private final BlockingQueue<byte[]> buffer = new LinkedTransferQueue<>();
private final AtomicLong cacheBytes = new AtomicLong();
private final AtomicLong flushBytes = new AtomicLong();
private final AtomicLong flushRows = new AtomicLong();
private volatile String label;
private volatile boolean flushing;
public StreamTableRegion(String uniqueKey,
String database,
String table,
Optional<String> temporaryTableName,
String labelPrefix,
StarRocksOperationApplier applier,
StreamLoadTableProperties properties)
{
this.uniqueKey = uniqueKey;
this.database = database;
this.table = table;
this.temporaryTableName = temporaryTableName;
this.applier = applier;
this.dataFormat = properties.getDataFormat();
this.chunkLimit = properties.getChunkLimit();
this.labelPrefix = labelPrefix;
}
public String getUniqueKey()
{
return uniqueKey;
}
public String getDatabase()
{
return database;
}
public String getTable()
{
return table;
}
public Optional<String> getTemporaryTableName()
{
return temporaryTableName;
}
public void setLabel(String label)
{
this.label = label;
}
public String getLabel()
{
return label;
}
public StreamLoadEntityMeta getEntityMeta()
{
return StreamLoadEntityMeta.CHUNK_ENTITY_META;
}
public int write(byte[] row)
{
try {
buffer.put(row);
if (row != END_STREAM) {
cacheBytes.addAndGet(row.length);
}
else {
log.info("Write EOF");
}
return row.length;
}
catch (InterruptedException ignored) {
}
return 0;
}
private final AtomicLong totalFlushBytes = new AtomicLong();
private volatile boolean endStream;
private volatile byte[] next;
public byte[] read()
{
if (!flushing) {
flushing = true;
}
try {
byte[] row;
if (next == null) {
row = buffer.take();
}
else {
row = next;
}
if (row == END_STREAM) {
endStream = true;
log.info("Read EOF");
return null;
}
int delimiterL = dataFormat.delimiter() == null ? 0 : dataFormat.delimiter().length;
if (totalFlushBytes.get() + row.length + delimiterL > chunkLimit) {
next = row;
log.info("Read part EOF");
return null;
}
next = null;
totalFlushBytes.addAndGet(row.length + delimiterL);
cacheBytes.addAndGet(-row.length);
flushBytes.addAndGet(row.length);
flushRows.incrementAndGet();
return row;
}
catch (InterruptedException e) {
log.info("read queue interrupted, msg : %s", e.getMessage());
}
return null;
}
protected void flip()
{
flushBytes.set(0L);
flushRows.set(0L);
final int initSize = (dataFormat.first() == null ? 0 : dataFormat.first().length)
+ (dataFormat.end() == null ? 0 : dataFormat.end().length)
- (dataFormat.delimiter() == null ? 0 : dataFormat.delimiter().length);
totalFlushBytes.set(initSize);
endStream = false;
}
public boolean commit()
{
if (isReadable()) {
flip();
write(END_STREAM);
setLabel(genLabel());
return Boolean.TRUE;
}
return Boolean.FALSE;
}
private String genLabel()
{
if (labelPrefix != null) {
return labelPrefix + UUID.randomUUID();
}
return UUID.randomUUID().toString();
}
public void callback(StreamLoadResponse response)
{
applier.callback(response);
}
public void callback(Throwable e)
{
applier.callback(e);
}
public void complete(StreamLoadResponse response)
{
response.setFlushBytes(flushBytes.get());
response.setFlushRows(flushRows.get());
callback(response);
log.info("Stream load flushed, label : %s", label);
if (!endStream) {
log.info("Stream load continue");
streamLoad();
return;
}
log.info("Stream load completed");
}
public boolean isReadable()
{
return cacheBytes.get() > 0;
}
protected boolean streamLoad()
{
try {
flip();
applier.send();
return true;
}
catch (Exception e) {
callback(e);
}
return false;
}
}

View File

@ -0,0 +1,59 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.starrocks.data.load.stream.exception;
import com.starrocks.data.load.stream.StreamLoadResponse;
public class StreamLoadFailException
extends RuntimeException
{
private final StreamLoadResponse.StreamLoadResponseBody responseBody;
public StreamLoadFailException(StreamLoadResponse.StreamLoadResponseBody responseBody)
{
this(responseBody.toString(), responseBody);
}
public StreamLoadFailException(String message)
{
this(message, null);
}
public StreamLoadFailException(String message, StreamLoadResponse.StreamLoadResponseBody responseBody)
{
super(message);
this.responseBody = responseBody;
}
}

View File

@ -0,0 +1,135 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.starrocks.data.load.stream.http;
import com.starrocks.data.load.stream.StreamLoadDataFormat;
import com.starrocks.data.load.stream.StreamTableRegion;
import com.starrocks.data.load.stream.io.StreamLoadStream;
import io.airlift.log.Logger;
import org.apache.http.Header;
import org.apache.http.HttpHeaders;
import org.apache.http.entity.AbstractHttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.message.BasicHeader;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class StreamLoadEntity
extends AbstractHttpEntity
{
private static final Logger log = Logger.get(StreamLoadEntity.class);
protected static final int OUTPUT_BUFFER_SIZE = 2048;
private static final Header CONTENT_TYPE =
new BasicHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_OCTET_STREAM.toString());
private final StreamTableRegion region;
private final InputStream content;
private final boolean chunked;
private final long contentLength;
public StreamLoadEntity(StreamTableRegion region,
StreamLoadDataFormat dataFormat,
StreamLoadEntityMeta meta)
{
this.region = region;
this.content = new StreamLoadStream(region, dataFormat);
this.chunked = meta.getBytes() == -1L;
this.contentLength = meta.getBytes();
}
@Override
public boolean isRepeatable()
{
return false;
}
@Override
public boolean isChunked()
{
return chunked;
}
@Override
public long getContentLength()
{
return contentLength;
}
@Override
public Header getContentType()
{
return CONTENT_TYPE;
}
@Override
public Header getContentEncoding()
{
return null;
}
@Override
public InputStream getContent()
throws IOException, UnsupportedOperationException
{
return content;
}
@Override
public void writeTo(OutputStream outputStream)
throws IOException
{
long total = 0;
try (InputStream inputStream = this.content) {
final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE];
int l;
while ((l = inputStream.read(buffer)) != -1) {
total += l;
outputStream.write(buffer, 0, l);
}
}
log.info("Entity write end, contentLength : %s, total : %s", contentLength, total);
}
@Override
public boolean isStreaming()
{
return true;
}
}

View File

@ -0,0 +1,72 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.starrocks.data.load.stream.http;
import java.io.Serializable;
public class StreamLoadEntityMeta
implements Serializable
{
public static final StreamLoadEntityMeta CHUNK_ENTITY_META = new StreamLoadEntityMeta(-1, -1);
private long bytes;
private long rows;
public StreamLoadEntityMeta(long bytes, long rows)
{
this.bytes = bytes;
this.rows = rows;
}
public long getBytes()
{
return bytes;
}
public void setBytes(long bytes)
{
this.bytes = bytes;
}
public long getRows()
{
return rows;
}
public void setRows(long rows)
{
this.rows = rows;
}
}

View File

@ -0,0 +1,217 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.starrocks.data.load.stream.io;
import com.starrocks.data.load.stream.StreamLoadDataFormat;
import com.starrocks.data.load.stream.StreamTableRegion;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
public class StreamLoadStream
extends InputStream
{
private static final int DEFAULT_BUFFER_SIZE = 2048;
private final StreamTableRegion region;
private final StreamLoadDataFormat dataFormat;
private ByteBuffer buffer;
private byte[] cache;
private int pos;
private boolean endStream;
public StreamLoadStream(StreamTableRegion region, StreamLoadDataFormat dataFormat)
{
this.region = region;
this.dataFormat = dataFormat;
buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
buffer.position(buffer.capacity());
}
@Override
public int read()
throws IOException
{
byte[] bytes = new byte[1];
int ws = read(bytes);
if (ws == -1) {
return -1;
}
return bytes[0];
}
@Override
public int read(byte[] b)
throws IOException
{
return read(b, 0, b.length);
}
@Override
public int read(byte[] b, int off, int len)
throws IOException
{
if (len == 0) {
return 0;
}
if (!buffer.hasRemaining()) {
if (cache == null && endStream) {
return -1;
}
fillBuffer();
}
int size = len - off;
int ws = Math.min(size, buffer.remaining());
for (int pos = off; pos < off + ws; pos++) {
b[pos] = buffer.get();
}
return ws;
}
@Override
public void close()
throws IOException
{
buffer = null;
cache = null;
pos = 0;
endStream = true;
}
private void fillBuffer()
{
buffer.clear();
if (cache != null) {
writeBuffer(cache, pos);
}
if (endStream || !buffer.hasRemaining()) {
buffer.flip();
return;
}
byte[] bytes;
while ((bytes = readRegion()) != null) {
writeBuffer(bytes, 0);
bytes = null;
if (!buffer.hasRemaining()) {
break;
}
}
if (buffer.position() == 0) {
buffer.position(buffer.limit());
}
else {
buffer.flip();
}
}
private void writeBuffer(byte[] bytes, int pos)
{
int size = bytes.length - pos;
int remain = buffer.remaining();
int ws = Math.min(size, remain);
buffer.put(bytes, pos, ws);
if (size > remain) {
this.cache = bytes;
this.pos = pos + ws;
}
else {
this.cache = null;
this.pos = 0;
}
}
private static final int DATA_FIRST = 1;
private static final int DATA_BODY = 2;
private static final int DATA_END = 3;
private int state = DATA_FIRST;
private byte[] next;
private boolean first = true;
private byte[] readRegion()
{
switch (state) {
case DATA_FIRST:
state = DATA_BODY;
if (dataFormat.first() != null && dataFormat.first().length > 0) {
return dataFormat.first();
}
else {
return readRegion();
}
case DATA_BODY:
byte[] body;
if (next != null) {
body = next;
next = null;
return body;
}
body = region.read();
if (body == null) {
state = DATA_END;
return null;
}
if (!first) {
next = body;
body = dataFormat.delimiter();
}
else {
first = false;
}
return body;
case DATA_END:
if (endStream) {
return null;
}
endStream = true;
return dataFormat.end();
default:
return null;
}
}
}

View File

@ -0,0 +1,200 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.starrocks.data.load.stream.properties;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class StreamLoadProperties
implements Serializable
{
private final String jdbcUrl;
private final String[] loadUrls;
private final String username;
private final String password;
private final String labelPrefix;
private final StreamLoadTableProperties tableProperties;
/**
* 最大缓存空间
*/
private final long maxCacheBytes;
/**
* http client settings ms
*/
private final int connectTimeout;
private final Map<String, String> headers;
private StreamLoadProperties(Builder builder)
{
this.jdbcUrl = builder.jdbcUrl;
this.loadUrls = builder.loadUrls;
this.username = builder.username;
this.password = builder.password;
this.labelPrefix = builder.labelPrefix;
this.tableProperties = builder.tableProperties;
this.maxCacheBytes = builder.maxCacheBytes;
this.connectTimeout = builder.connectTimeout;
this.headers = Collections.unmodifiableMap(builder.headers);
}
public String getJdbcUrl()
{
return jdbcUrl;
}
public String[] getLoadUrls()
{
return loadUrls;
}
public String getUsername()
{
return username;
}
public String getPassword()
{
return password;
}
public String getLabelPrefix()
{
return labelPrefix;
}
public StreamLoadTableProperties getTableProperties()
{
return tableProperties;
}
public long getMaxCacheBytes()
{
return maxCacheBytes;
}
public int getConnectTimeout()
{
return connectTimeout;
}
public Map<String, String> getHeaders()
{
return headers;
}
public static Builder builder()
{
return new Builder();
}
public static class Builder
{
private String jdbcUrl;
private String[] loadUrls;
private String username;
private String password;
private String labelPrefix = "";
private long maxCacheBytes = (long) (Runtime.getRuntime().freeMemory() * 0.7);
private StreamLoadTableProperties tableProperties;
private int connectTimeout = 60000;
private Map<String, String> headers = new HashMap<>();
public Builder jdbcUrl(String jdbcUrl)
{
this.jdbcUrl = jdbcUrl;
return this;
}
public Builder loadUrls(String... loadUrls)
{
this.loadUrls = loadUrls;
return this;
}
public Builder cacheMaxBytes(long maxCacheBytes)
{
if (maxCacheBytes <= 0) {
throw new IllegalArgumentException("cacheMaxBytes `" + maxCacheBytes + "` set failed, must greater to 0");
}
if (maxCacheBytes > Runtime.getRuntime().maxMemory()) {
throw new IllegalArgumentException("cacheMaxBytes `" + maxCacheBytes + "` set failed, current maxMemory is " + Runtime.getRuntime().maxMemory());
}
this.maxCacheBytes = maxCacheBytes;
return this;
}
public Builder connectTimeout(int connectTimeout)
{
this.connectTimeout = connectTimeout;
return this;
}
public Builder username(String username)
{
this.username = username;
return this;
}
public Builder password(String password)
{
this.password = password;
return this;
}
public Builder labelPrefix(String labelPrefix)
{
this.labelPrefix = labelPrefix;
return this;
}
public Builder tableProperties(StreamLoadTableProperties tableProperties)
{
this.tableProperties = tableProperties;
return this;
}
public StreamLoadProperties build()
{
return new StreamLoadProperties(this);
}
}
}

View File

@ -0,0 +1,212 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.starrocks.data.load.stream.properties;
import com.starrocks.data.load.stream.StreamLoadDataFormat;
import com.starrocks.data.load.stream.StreamLoadUtils;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
public class StreamLoadTableProperties
implements Serializable
{
private final String uniqueKey;
private final String database;
private final String table;
private final String[] columns;
private final StreamLoadDataFormat dataFormat;
private final Map<String, String> properties;
private final boolean enableUpsertDelete;
private final long chunkLimit;
private StreamLoadTableProperties(Builder builder)
{
this.database = builder.database;
this.table = builder.table;
this.uniqueKey = builder.uniqueKey == null
? StreamLoadUtils.getTableUniqueKey(database, table)
: builder.uniqueKey;
this.columns = builder.columns;
this.enableUpsertDelete = builder.enableUpsertDelete;
this.dataFormat = builder.dataFormat == null
? StreamLoadDataFormat.JSON
: builder.dataFormat;
if (dataFormat instanceof StreamLoadDataFormat.JSONFormat) {
chunkLimit = Math.min(3221225472L, builder.chunkLimit);
}
else {
chunkLimit = Math.min(10737418240L, builder.chunkLimit);
}
this.properties = builder.properties;
}
public String getUniqueKey()
{
return uniqueKey;
}
public String getDatabase()
{
return database;
}
public String getTable()
{
return table;
}
public String[] getColumns()
{
return columns;
}
public boolean isEnableUpsertDelete()
{
return enableUpsertDelete;
}
public StreamLoadDataFormat getDataFormat()
{
return dataFormat;
}
public Long getChunkLimit()
{
return chunkLimit;
}
public Map<String, String> getProperties()
{
return properties;
}
public static Builder builder()
{
return new Builder();
}
public static class Builder
{
private String uniqueKey;
private String database;
private String table;
private String[] columns;
private boolean enableUpsertDelete;
private StreamLoadDataFormat dataFormat;
private long chunkLimit;
private final Map<String, String> properties = new HashMap<>();
private Builder()
{
}
public Builder uniqueKey(String uniqueKey)
{
this.uniqueKey = uniqueKey;
return this;
}
public Builder database(String database)
{
this.database = database;
return this;
}
public Builder table(String table)
{
this.table = table;
return this;
}
public Builder columns(String... columns)
{
this.columns = columns;
return this;
}
public Builder enableUpsertDelete(boolean enableUpsertDelete)
{
this.enableUpsertDelete = enableUpsertDelete;
return this;
}
public Builder streamLoadDataFormat(StreamLoadDataFormat dataFormat)
{
this.dataFormat = dataFormat;
return this;
}
public Builder chunkLimit(long chunkLimit)
{
this.chunkLimit = chunkLimit;
return this;
}
public Builder addProperties(Map<String, String> properties)
{
this.properties.putAll(properties);
return this;
}
public Builder addProperty(String key, String value)
{
this.properties.put(key, value);
return this;
}
public StreamLoadTableProperties build()
{
if (database == null || table == null) {
throw new IllegalArgumentException(String.format("database `%s` or table `%s` can't be null", database, table));
}
addProperty("db", database);
addProperty("table", table);
if (columns != null && columns.length > 0) {
addProperty("columns", String.join(",", columns));
}
return new StreamLoadTableProperties(this);
}
}
}

View File

@ -0,0 +1,156 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package io.trino.plugin.starrocks;
import com.google.inject.Binder;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
import com.mysql.jdbc.Driver;
import com.starrocks.data.load.stream.StreamLoadDataFormat;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.DecimalModule;
import io.trino.plugin.jdbc.DriverConnectionFactory;
import io.trino.plugin.jdbc.ForBaseJdbc;
import io.trino.plugin.jdbc.JdbcClient;
import io.trino.plugin.jdbc.JdbcJoinPushdownSupportModule;
import io.trino.plugin.jdbc.JdbcStatisticsConfig;
import io.trino.plugin.jdbc.credential.CredentialConfig;
import io.trino.plugin.jdbc.credential.CredentialProvider;
import io.trino.plugin.jdbc.ptf.Query;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.ptf.ConnectorTableFunction;
import java.sql.SQLException;
import java.util.Properties;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
public class StarRocksClientModule
extends AbstractConfigurationAwareModule
{
public static final String EMPTY = "";
@Override
protected void setup(Binder binder)
{
binder.bind(JdbcClient.class).annotatedWith(ForBaseJdbc.class).to(StarRocksClient.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(StarRocksJdbcConfig.class);
configBinder(binder).bindConfig(StarRocksConfig.class);
configBinder(binder).bindConfig(StarRocksWriteConfig.class);
configBinder(binder).bindConfig(JdbcStatisticsConfig.class);
bindSessionPropertiesProvider(binder, StarRocksWriteSessionProperties.class);
install(new DecimalModule());
install(new JdbcJoinPushdownSupportModule());
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(Query.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, ConnectorPageSinkProvider.class).setBinding().to(StarRocksPageSinkProvider.class).in(Scopes.SINGLETON);
}
public static Multibinder<SessionPropertiesProvider> sessionPropertiesProviderBinder(Binder binder)
{
return newSetBinder(binder, SessionPropertiesProvider.class);
}
public static void bindSessionPropertiesProvider(Binder binder, Class<? extends SessionPropertiesProvider> type)
{
sessionPropertiesProviderBinder(binder).addBinding().to(type).in(Scopes.SINGLETON);
}
@Provides
@Singleton
@ForBaseJdbc
public static ConnectionFactory createConnectionFactory(BaseJdbcConfig config, CredentialProvider credentialProvider, StarRocksJdbcConfig starRocksJdbcConfig)
throws SQLException
{
return new DriverConnectionFactory(
new Driver(),
config.getConnectionUrl(),
getConnectionProperties(starRocksJdbcConfig),
credentialProvider);
}
public static Properties getConnectionProperties(StarRocksJdbcConfig starRocksJdbcConfig)
{
Properties connectionProperties = new Properties();
connectionProperties.setProperty("useInformationSchema", Boolean.toString(starRocksJdbcConfig.isDriverUseInformationSchema()));
connectionProperties.setProperty("useUnicode", "true");
connectionProperties.setProperty("characterEncoding", "utf8");
connectionProperties.setProperty("tinyInt1isBit", "false");
connectionProperties.setProperty("rewriteBatchedStatements", "true");
if (starRocksJdbcConfig.isAutoReconnect()) {
connectionProperties.setProperty("autoReconnect", String.valueOf(starRocksJdbcConfig.isAutoReconnect()));
connectionProperties.setProperty("maxReconnects", String.valueOf(starRocksJdbcConfig.getMaxReconnects()));
}
if (starRocksJdbcConfig.getConnectionTimeout() != null) {
connectionProperties.setProperty("connectTimeout", String.valueOf(starRocksJdbcConfig.getConnectionTimeout().toMillis()));
}
return connectionProperties;
}
@Provides
@Singleton
public static StreamLoadProperties getStreamLoadProperties(StarRocksConfig starRocksConfig, BaseJdbcConfig baseJdbcConfig, CredentialConfig credentialConfig)
{
StreamLoadTableProperties streamLoadTableProperties = StreamLoadTableProperties.builder()
.database(EMPTY)
.table(EMPTY)
.streamLoadDataFormat(StreamLoadDataFormat.JSON)
.chunkLimit(starRocksConfig.getChunkLimit())
.enableUpsertDelete(Boolean.TRUE)
.addProperty("format", "json")
.addProperty("strict_mode", "true")
.addProperty("Expect", "100-continue")
.addProperty("strip_outer_array", "true")
.build();
return StreamLoadProperties.builder()
.loadUrls(starRocksConfig.getLoadUrls().toArray(new String[0]))
.jdbcUrl(baseJdbcConfig.getConnectionUrl())
.tableProperties(streamLoadTableProperties)
.cacheMaxBytes(starRocksConfig.getMaxCacheBytes())
.connectTimeout(starRocksConfig.getConnectTimeout())
.labelPrefix(starRocksConfig.getLabelPrefix())
.username(credentialConfig.getConnectionUser().orElse(EMPTY))
.password(credentialConfig.getConnectionPassword().orElse(EMPTY))
.build();
}
}

View File

@ -0,0 +1,116 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package io.trino.plugin.starrocks;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.configuration.Config;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import java.util.List;
public class StarRocksConfig
{
private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();
private List<String> loadUrls = ImmutableList.of();
private String labelPrefix = "trino-";
private long maxCacheBytes = 268435456L;
private int connectTimeout = 30000;
private long chunkLimit = Long.MAX_VALUE;
@NotNull
@Size(min = 1)
public List<String> getLoadUrls()
{
return loadUrls;
}
@Config("starrocks.client.load-url")
public StarRocksConfig setLoadUrls(String commaSeparatedList)
{
this.loadUrls = SPLITTER.splitToList(commaSeparatedList);
return this;
}
public String getLabelPrefix()
{
return labelPrefix;
}
@Config("starrocks.client.label-prefix")
public StarRocksConfig setLabelPrefix(String labelPrefix)
{
this.labelPrefix = labelPrefix;
return this;
}
public long getMaxCacheBytes()
{
return maxCacheBytes;
}
@Config("starrocks.client.max-cache-bytes")
public StarRocksConfig setMaxCacheBytes(long maxCacheBytes)
{
this.maxCacheBytes = maxCacheBytes;
return this;
}
public int getConnectTimeout()
{
return connectTimeout;
}
@Config("starrocks.client.connect-timeout")
public StarRocksConfig setConnectTimeout(int connectTimeout)
{
this.connectTimeout = connectTimeout;
return this;
}
public long getChunkLimit()
{
return chunkLimit;
}
@Config("starrocks.client.chunk-limit")
public StarRocksConfig setChunkLimit(long chunkLimit)
{
this.chunkLimit = chunkLimit;
return this;
}
}

View File

@ -0,0 +1,60 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package io.trino.plugin.starrocks;
import io.trino.spi.ErrorCode;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.ErrorType;
import static io.trino.spi.ErrorType.EXTERNAL;
public enum StarRocksErrorCode implements ErrorCodeSupplier
{
STAR_ROCKS_WRITE_ERROR(0, EXTERNAL)
/**/;
private final ErrorCode errorCode;
StarRocksErrorCode(int code, ErrorType type)
{
errorCode = new ErrorCode(code + 0x0102_0000, name(), type);
}
@Override
public ErrorCode toErrorCode()
{
return errorCode;
}
}

View File

@ -0,0 +1,139 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package io.trino.plugin.starrocks;
import com.mysql.cj.conf.ConnectionUrlParser;
import com.mysql.cj.exceptions.CJException;
import com.mysql.jdbc.Driver;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.Duration;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import javax.validation.constraints.AssertTrue;
import javax.validation.constraints.Min;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.mysql.cj.conf.ConnectionUrlParser.parseConnectionString;
public class StarRocksJdbcConfig
extends BaseJdbcConfig
{
private boolean autoReconnect = true;
private int maxReconnects = 3;
private Duration connectionTimeout = new Duration(10, TimeUnit.SECONDS);
// Using `useInformationSchema=true` prevents race condition inside MySQL driver's java.sql.DatabaseMetaData.getColumns
// implementation, which throw SQL exception when a table disappears during listing.
// Using `useInformationSchema=false` may provide more diagnostic information (see https://github.com/trinodb/trino/issues/1597)
private boolean driverUseInformationSchema = true;
public boolean isAutoReconnect()
{
return autoReconnect;
}
@Config("starrocks.auto-reconnect")
public StarRocksJdbcConfig setAutoReconnect(boolean autoReconnect)
{
this.autoReconnect = autoReconnect;
return this;
}
@Min(1)
public int getMaxReconnects()
{
return maxReconnects;
}
@Config("starrocks.max-reconnects")
public StarRocksJdbcConfig setMaxReconnects(int maxReconnects)
{
this.maxReconnects = maxReconnects;
return this;
}
public Duration getConnectionTimeout()
{
return connectionTimeout;
}
@Config("starrocks.connection-timeout")
public StarRocksJdbcConfig setConnectionTimeout(Duration connectionTimeout)
{
this.connectionTimeout = connectionTimeout;
return this;
}
public boolean isDriverUseInformationSchema()
{
return driverUseInformationSchema;
}
@Config("starrocks.jdbc.use-information-schema")
@ConfigDescription("Value of useInformationSchema MySQL JDBC driver connection property")
public StarRocksJdbcConfig setDriverUseInformationSchema(boolean driverUseInformationSchema)
{
this.driverUseInformationSchema = driverUseInformationSchema;
return this;
}
@AssertTrue(message = "Invalid JDBC URL for MySQL connector")
public boolean isUrlValid()
{
try {
Driver driver = new Driver();
return driver.acceptsURL(getConnectionUrl());
}
catch (SQLException e) {
throw new RuntimeException(e);
}
}
@AssertTrue(message = "Database (catalog) must not be specified in JDBC URL for MySQL connector")
public boolean isUrlWithoutDatabase()
{
try {
ConnectionUrlParser parser = parseConnectionString(getConnectionUrl());
return isNullOrEmpty(parser.getPath());
}
catch (CJException ignore) {
return false;
}
}
}

View File

@ -0,0 +1,325 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package io.trino.plugin.starrocks;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.starrocks.data.load.stream.StreamLoadConstants;
import com.starrocks.data.load.stream.StreamLoadDataFormat;
import com.starrocks.data.load.stream.StreamLoadResponse;
import com.starrocks.data.load.stream.StreamLoadUtils;
import com.starrocks.data.load.stream.StreamTableRegion;
import com.starrocks.data.load.stream.exception.StreamLoadFailException;
import com.starrocks.data.load.stream.http.StreamLoadEntity;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import com.starrocks.data.load.stream.properties.StreamLoadTableProperties;
import io.airlift.log.Logger;
import io.trino.spi.TrinoException;
import org.apache.http.Header;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static com.starrocks.data.load.stream.StreamLoadUtils.getSendUrl;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
public final class StarRocksOperationApplier
implements AutoCloseable
{
private static final Logger log = Logger.get(StarRocksOperationApplier.class);
private final StreamTableRegion region;
private final StreamLoadProperties properties;
private final long maxCacheBytes;
private Header[] defaultHeaders;
private final HttpClientBuilder clientBuilder;
private volatile long availableHostPos;
private final ObjectMapper objectMapper;
private final AtomicLong currentCacheBytes = new AtomicLong(0L);
private final AtomicLong totalFlushRows = new AtomicLong(0L);
private final AtomicLong numberTotalRows = new AtomicLong(0L);
private final AtomicLong numberLoadRows = new AtomicLong(0L);
public StarRocksOperationApplier(String database, String table, Optional<String> temporaryTableName, List<String> columns, Boolean isPkTable, StreamLoadProperties properties, HttpClientBuilder clientBuilder)
{
String uniqueKey = StreamLoadUtils.getTableUniqueKey(database, table);
StreamLoadTableProperties tableProperties = properties.getTableProperties();
this.region = new StreamTableRegion(uniqueKey, database, table, temporaryTableName, properties.getLabelPrefix(), this, tableProperties);
this.properties = properties;
this.maxCacheBytes = properties.getMaxCacheBytes();
initDefaultHeaders(isPkTable, columns, properties);
this.clientBuilder = clientBuilder;
this.objectMapper = new ObjectMapper();
}
/**
* Not thread safe
* Applies an operation without waiting for it to be flushed, operations are flushed in the background
*
* @param row row data
*/
public void applyOperationAsync(String row)
{
int bytes = region.write(row.getBytes(StandardCharsets.UTF_8));
if (currentCacheBytes.addAndGet(bytes) >= maxCacheBytes) {
try {
if (region.commit()) {
this.send();
}
}
catch (Exception e) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, e);
}
}
}
protected void initDefaultHeaders(Boolean isPkTable, List<String> columns, StreamLoadProperties properties)
{
Map<String, String> headers = new HashMap<>(properties.getHeaders());
if (!headers.containsKey("timeout")) {
headers.put("timeout", "120");
}
headers.put(HttpHeaders.AUTHORIZATION, StreamLoadUtils.getBasicAuthHeader(properties.getUsername(), properties.getPassword()));
headers.put(HttpHeaders.EXPECT, "100-continue");
headers.put("ignore_json_size", "true");
if (isPkTable) {
headers.put("columns", String.join(",", columns));
headers.put("partial_update", "true");
}
this.defaultHeaders = headers.entrySet().stream()
.map(entry -> new BasicHeader(entry.getKey(), entry.getValue()))
.toArray(Header[]::new);
}
public StreamLoadResponse send()
{
StreamLoadTableProperties tableProperties = properties.getTableProperties();
try {
StreamLoadDataFormat dataFormat = tableProperties.getDataFormat();
String host = getAvailableHost();
String table = region.getTemporaryTableName().orElseGet(region::getTable);
String sendUrl = getSendUrl(host, region.getDatabase(), table);
String label = region.getLabel();
log.info("Stream loading, label : %s, region : %s", label, region.getUniqueKey());
HttpPut httpPut = new HttpPut(sendUrl);
httpPut.setConfig(RequestConfig.custom().setExpectContinueEnabled(true).setRedirectsEnabled(true).build());
httpPut.setEntity(new StreamLoadEntity(region, dataFormat, region.getEntityMeta()));
httpPut.setHeaders(defaultHeaders);
for (Map.Entry<String, String> entry : tableProperties.getProperties().entrySet()) {
httpPut.removeHeaders(entry.getKey());
httpPut.addHeader(entry.getKey(), entry.getValue());
}
httpPut.addHeader("label", label);
try (CloseableHttpClient client = clientBuilder.build()) {
log.info("Stream loading, label : %s, request : %s", label, httpPut);
long startNanoTime = System.currentTimeMillis();
CloseableHttpResponse response = client.execute(httpPut);
String responseBody = EntityUtils.toString(response.getEntity());
log.info("Stream load completed, label : %s, database : %s, table : %s, body : %s",
label, region.getDatabase(), table, responseBody);
StreamLoadResponse streamLoadResponse = new StreamLoadResponse();
StreamLoadResponse.StreamLoadResponseBody streamLoadBody = objectMapper.readValue(responseBody, StreamLoadResponse.StreamLoadResponseBody.class);
streamLoadResponse.setBody(streamLoadBody);
String status = streamLoadBody.getStatus();
if (StreamLoadConstants.RESULT_STATUS_SUCCESS.equals(status)
|| StreamLoadConstants.RESULT_STATUS_OK.equals(status)
|| StreamLoadConstants.RESULT_STATUS_TRANSACTION_PUBLISH_TIMEOUT.equals(status)) {
streamLoadResponse.setCostNanoTime(System.nanoTime() - startNanoTime);
region.complete(streamLoadResponse);
}
else if (StreamLoadConstants.RESULT_STATUS_LABEL_EXISTED.equals(status)) {
boolean succeed = checkLabelState(host, region.getDatabase(), label);
if (!succeed) {
throw new StreamLoadFailException("Stream load failed");
}
}
else {
throw new StreamLoadFailException(responseBody, streamLoadBody);
}
return streamLoadResponse;
}
catch (Exception e) {
log.error("Stream load failed unknown, label : " + label, e);
throw e;
}
}
catch (Exception e) {
log.error("Stream load failed, thread : " + Thread.currentThread().getName(), e);
region.callback(e);
}
return null;
}
protected String getAvailableHost()
{
String[] hosts = properties.getLoadUrls();
int size = hosts.length;
long pos = availableHostPos;
while (pos < pos + size) {
String host = "http://" + hosts[(int) (pos % size)];
if (testHttpConnection(host)) {
pos++;
availableHostPos = pos;
return host;
}
}
return null;
}
private boolean testHttpConnection(String host)
{
try {
URL url = new URL(host);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setConnectTimeout(properties.getConnectTimeout());
connection.connect();
connection.disconnect();
return true;
}
catch (Exception e) {
log.warn("Failed to connect to address: %s", host, e);
return false;
}
}
protected boolean checkLabelState(String host, String database, String label)
throws Exception
{
int idx = 0;
for (;; ) {
TimeUnit.SECONDS.sleep(Math.min(++idx, 5));
try (CloseableHttpClient client = HttpClients.createDefault()) {
String url = host + "/api/" + database + "/get_load_state?label=" + label;
HttpGet httpGet = new HttpGet(url);
httpGet.addHeader("Authorization", StreamLoadUtils.getBasicAuthHeader(properties.getUsername(), properties.getPassword()));
httpGet.setHeader("Connection", "close");
try (CloseableHttpResponse response = client.execute(httpGet)) {
String entityContent = EntityUtils.toString(response.getEntity());
if (response.getStatusLine().getStatusCode() != 200) {
throw new StreamLoadFailException("Failed to flush data to StarRocks, Error " +
"could not get the final state of label : `" + label + "`, body : " + entityContent);
}
log.info("Label `%s` check, body : %s", label, entityContent);
StreamLoadResponse.StreamLoadResponseBody responseBody =
objectMapper.readValue(entityContent, StreamLoadResponse.StreamLoadResponseBody.class);
String state = responseBody.getState();
if (state == null) {
log.error("Get label state failed, body : %s", objectMapper.writeValueAsString(responseBody));
throw new StreamLoadFailException(String.format("Failed to flush data to StarRocks, Error " +
"could not get the final state of label[%s]. response[%s]\n", label, entityContent));
}
switch (state) {
case StreamLoadConstants.LABEL_STATE_VISIBLE:
case StreamLoadConstants.LABEL_STATE_PREPARED:
case StreamLoadConstants.LABEL_STATE_COMMITTED:
return true;
case StreamLoadConstants.LABEL_STATE_PREPARE:
continue;
case StreamLoadConstants.LABEL_STATE_ABORTED:
return false;
case StreamLoadConstants.LABEL_STATE_UNKNOWN:
default:
throw new StreamLoadFailException(String.format("Failed to flush data to StarRocks, Error " +
"label[%s] state[%s]\n", label, state));
}
}
}
}
}
public void callback(StreamLoadResponse response)
{
long currentBytes = response.getFlushBytes() != null ? currentCacheBytes.getAndAdd(-response.getFlushBytes()) : currentCacheBytes.get();
if (response.getFlushRows() != null) {
totalFlushRows.addAndGet(response.getFlushRows());
}
log.info("pre bytes : %s, current bytes : %s, totalFlushRows : %s", currentBytes, currentCacheBytes.get(), totalFlushRows.get());
if (response.getBody() != null) {
if (response.getBody().getNumberTotalRows() != null) {
numberTotalRows.addAndGet(response.getBody().getNumberTotalRows());
}
if (response.getBody().getNumberLoadedRows() != null) {
numberLoadRows.addAndGet(response.getBody().getNumberLoadedRows());
}
}
}
public void callback(Throwable e)
{
log.error("Stream load failed", e);
throw new TrinoException(GENERIC_INTERNAL_ERROR, e);
}
@Override
public void close()
{
if (region.commit()) {
this.send();
log.info("Operation applier close, currentBytes : %s, flushRows : %s" +
", numberTotalRows : %s, numberLoadRows : %s",
currentCacheBytes.get(), totalFlushRows.get(), numberTotalRows.get(), numberLoadRows.get());
}
}
}

View File

@ -0,0 +1,76 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package io.trino.plugin.starrocks;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.plugin.jdbc.JdbcOutputTableHandle;
import io.trino.plugin.jdbc.JdbcTypeHandle;
import io.trino.spi.type.Type;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Optional;
import static java.util.Objects.requireNonNull;
public class StarRocksOutputTableHandle
extends JdbcOutputTableHandle
{
private final Boolean isPkTable;
@JsonCreator
public StarRocksOutputTableHandle(
@JsonProperty("catalogName") @Nullable String catalogName,
@Nullable @JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("columnNames") List<String> columnNames,
@JsonProperty("columnTypes") List<Type> columnTypes,
@JsonProperty("jdbcColumnTypes") Optional<List<JdbcTypeHandle>> jdbcColumnTypes,
@JsonProperty("temporaryTableName") Optional<String> temporaryTableName,
@JsonProperty("pageSinkIdColumnName") Optional<String> pageSinkIdColumnName,
@JsonProperty("isPkTable") Boolean isPkTable)
{
super(catalogName, schemaName, tableName, columnNames, columnTypes, jdbcColumnTypes, temporaryTableName, pageSinkIdColumnName);
this.isPkTable = requireNonNull(isPkTable, "isPkTable is null");
}
@JsonProperty
public Boolean getIsPkTable()
{
return isPkTable;
}
}

View File

@ -0,0 +1,157 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package io.trino.plugin.starrocks;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import io.airlift.json.ObjectMapperProvider;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.plugin.jdbc.JdbcOutputTableHandle;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorPageSinkId;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Type;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import static com.starrocks.data.load.stream.StreamLoadConstants.DATETIME_FORMATTER;
import static com.starrocks.data.load.stream.StreamLoadConstants.DATE_FORMATTER;
import static com.starrocks.data.load.stream.StreamLoadUtils.toLocalDateTime;
import static io.trino.plugin.starrocks.StarRocksErrorCode.STAR_ROCKS_WRITE_ERROR;
import static io.trino.spi.type.DateType.DATE;
import static java.util.concurrent.CompletableFuture.completedFuture;
public class StarRocksPageSink
implements ConnectorPageSink
{
private final ObjectMapper objectMapper;
private final JdbcOutputTableHandle handle;
private final StarRocksOperationApplier applier;
private final ConnectorPageSinkId pageSinkId;
public StarRocksPageSink(JdbcOutputTableHandle handle, StarRocksOperationApplier applier, ConnectorPageSinkId pageSinkId)
{
this.handle = handle;
this.applier = applier;
this.objectMapper = new ObjectMapperProvider().get();
this.pageSinkId = pageSinkId;
}
@Override
public CompletableFuture<?> appendPage(Page page)
{
try {
for (int position = 0; position < page.getPositionCount(); position++) {
ObjectNode objectNode = objectMapper.createObjectNode();
for (int channel = 0; channel < page.getChannelCount(); channel++) {
appendColumn(page, position, channel, objectNode);
}
if (handle.getPageSinkIdColumnName().isPresent()) {
objectNode.put(handle.getPageSinkIdColumnName().get(), pageSinkId.getId());
}
String row = objectMapper.writeValueAsString(objectNode);
applier.applyOperationAsync(row);
}
}
catch (SQLException | JsonProcessingException e) {
throw new TrinoException(STAR_ROCKS_WRITE_ERROR, e);
}
return NOT_BLOCKED;
}
private void appendColumn(Page page, int position, int channel, ObjectNode objectNode)
throws SQLException
{
Block block = page.getBlock(channel);
if (block.isNull(position)) {
return;
}
List<Type> columnTypes = handle.getColumnTypes();
List<String> columnNames = handle.getColumnNames();
Type type = columnTypes.get(channel);
String columnName = columnNames.get(channel);
Class<?> javaType = type.getJavaType();
if (javaType == boolean.class) {
objectNode.put(columnName, type.getBoolean(block, position));
}
else if (javaType == long.class) {
long value = type.getLong(block, position);
if (type.equals(DATE)) {
objectNode.put(columnName, LocalDate.ofEpochDay(value).format(DATE_FORMATTER));
}
else if (type instanceof TimestampType) {
LocalDateTime dateTime = toLocalDateTime(((TimestampType) type), block, position);
objectNode.put(columnName, dateTime.format(DATETIME_FORMATTER));
}
else {
objectNode.put(columnName, value);
}
}
else if (javaType == double.class) {
objectNode.put(columnName, type.getDouble(block, position));
}
else if (javaType == Slice.class) {
objectNode.put(columnName, type.getSlice(block, position).toStringUtf8());
}
else {
objectNode.put(columnName, type.getObject(block, position).toString());
}
}
@Override
public CompletableFuture<Collection<Slice>> finish()
{
applier.close();
return completedFuture(ImmutableList.of(Slices.wrappedLongArray(pageSinkId.getId())));
}
@SuppressWarnings("unused")
@Override
public void abort()
{
}
}

View File

@ -0,0 +1,91 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package io.trino.plugin.starrocks;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorPageSinkId;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTransactionHandle;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import javax.inject.Inject;
import static java.util.Objects.requireNonNull;
public class StarRocksPageSinkProvider
implements ConnectorPageSinkProvider
{
private final StreamLoadProperties streamLoadProperties;
private final HttpClientBuilder clientBuilder;
@Inject
public StarRocksPageSinkProvider(StreamLoadProperties streamLoadProperties)
{
this.streamLoadProperties = requireNonNull(streamLoadProperties, "streamLoadProperties is null");
this.clientBuilder = HttpClients.custom()
.setRedirectStrategy(new DefaultRedirectStrategy()
{
@Override
protected boolean isRedirectable(String method)
{
return true;
}
});
}
@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle tableHandle, ConnectorPageSinkId pageSinkId)
{
StarRocksOutputTableHandle starRocksOutputTableHandle = (StarRocksOutputTableHandle) tableHandle;
StarRocksOperationApplier applier = new StarRocksOperationApplier(
starRocksOutputTableHandle.getSchemaName(), starRocksOutputTableHandle.getTableName(), starRocksOutputTableHandle.getTemporaryTableName(), starRocksOutputTableHandle.getColumnNames(), starRocksOutputTableHandle.getIsPkTable(), streamLoadProperties, clientBuilder);
return new StarRocksPageSink(starRocksOutputTableHandle, applier, pageSinkId);
}
@Override
public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle tableHandle, ConnectorPageSinkId pageSinkId)
{
StarRocksOutputTableHandle starRocksOutputTableHandle = (StarRocksOutputTableHandle) tableHandle;
StarRocksOperationApplier applier = new StarRocksOperationApplier(
starRocksOutputTableHandle.getSchemaName(), starRocksOutputTableHandle.getTableName(), starRocksOutputTableHandle.getTemporaryTableName(), starRocksOutputTableHandle.getColumnNames(), starRocksOutputTableHandle.getIsPkTable(), streamLoadProperties, clientBuilder);
return new StarRocksPageSink(starRocksOutputTableHandle, applier, pageSinkId);
}
}

View File

@ -0,0 +1,46 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package io.trino.plugin.starrocks;
import io.trino.plugin.jdbc.JdbcPlugin;
public class StarRocksPlugin
extends JdbcPlugin
{
public StarRocksPlugin()
{
super("starrocks", new StarRocksClientModule());
}
}

View File

@ -0,0 +1,64 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package io.trino.plugin.starrocks;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
public class StarRocksWriteConfig
{
static final int MAX_ALLOWED_QUERY_TIMEOUT = 259_200;
static final int MIN_ALLOWED_QUERY_TIMEOUT = 1;
private int queryTimeout = 3000;
@Min(MIN_ALLOWED_QUERY_TIMEOUT)
@Max(MAX_ALLOWED_QUERY_TIMEOUT)
public int getQueryTimeout()
{
return queryTimeout;
}
@Config("write.query-timeout")
@ConfigDescription("The query timeout duration in seconds")
public StarRocksWriteConfig setQueryTimeout(int queryTimeout)
{
this.queryTimeout = queryTimeout;
return this;
}
}

View File

@ -0,0 +1,92 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is based on code available under the Apache license here:
// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/StarRocksFE.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package io.trino.plugin.starrocks;
import com.google.common.collect.ImmutableList;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.session.PropertyMetadata;
import javax.inject.Inject;
import java.util.List;
import static io.trino.plugin.starrocks.StarRocksWriteConfig.MAX_ALLOWED_QUERY_TIMEOUT;
import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
import static java.lang.String.format;
public class StarRocksWriteSessionProperties
implements SessionPropertiesProvider
{
public static final String QUERY_TIMEOUT = "query_timeout";
private final List<PropertyMetadata<?>> properties;
@Inject
public StarRocksWriteSessionProperties(StarRocksWriteConfig writeConfig)
{
properties = ImmutableList.<PropertyMetadata<?>>builder()
.add(integerProperty(
QUERY_TIMEOUT,
"The query timeout duration in seconds",
writeConfig.getQueryTimeout(),
StarRocksWriteSessionProperties::validateQueryTimeout,
false))
.build();
}
@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
return properties;
}
public static int getQueryTimeout(ConnectorSession session)
{
return session.getProperty(QUERY_TIMEOUT, Integer.class);
}
private static void validateQueryTimeout(int queryTimeout)
{
if (queryTimeout < 1) {
throw new TrinoException(INVALID_SESSION_PROPERTY, format("%s must be greater than 0: %s", QUERY_TIMEOUT, queryTimeout));
}
if (queryTimeout > MAX_ALLOWED_QUERY_TIMEOUT) {
throw new TrinoException(INVALID_SESSION_PROPERTY, format("%s cannot exceed %s: %s", QUERY_TIMEOUT, MAX_ALLOWED_QUERY_TIMEOUT, queryTimeout));
}
}
}