Signed-off-by: shuming.li <ming.moriarty@gmail.com> |
||
|---|---|---|
| .. | ||
| src | ||
| .gitignore | ||
| README.md | ||
| build.sh | ||
| pom.xml | ||
README.md
StarRocks Format Library
StarRocks Format Library is an SDK for reading and writing StarRocks data files, which provides C++ and Java version of tablet data file reader and writer. With the Format SDK, you can bypass the StarRocks BE node and directly read and write tablet data files that stored in the remote file system (e.g. S3, HDFS) in the shared-data mode. So that a piece of data can be accessed (read and write) by multiple computing engines (e.g. Spark, StarRocks) at the same time.
Why we do it?
StarRocks is a next-gen, high-performance analytical data warehouse. However, the limitations of the MPP architecture still cause it to face some problems in large-data ETL scenarios. For example:
- Low resource utilization: StarRocks clusters adopt the resource reservation mode mostly, but in order to support large-data ETL scenarios, which require a large amount of resource overhead in a relatively short period of time, plan redundant resources in advance may reduce the overall resource utilization of the cluster.
- Poor resource isolation: In terms of resource isolation, StarRocks adopts Group rather than Query level isolation. In large-data ETL scenarios, there is a risk that a query with large resource overhead will run out of resources, thereby starving some small queries to death.
- Lack of failure tolerance: Due to the lack of task-level failure tolerance mechanism, when an ETL job fails, it will usually still fail even if rerun it manually.
In order to solve the above problems, we have proposed the idea of bypassing StarRocks BE node to directly read and write tablet data files in the shared-data mode. Taking the Apache Spark as an example(In fact, we are not bound to it, and we can support more computing engines by design.), the overall architecture design is as follows:
For complex query scenarios, users can submit jobs through the spark-sql client. Spark interacts with StarRocks to obtain tables's metadata information, manages data write transaction, and do read/write operation on shared storage through the Format SDK directly. Finally, for one piece of StarRocks's data files, multiple computing engines can read and write it.
How to build?
The building process is divided into two steps. Firstly, execute the following command to compile be and format-lib module to generate the libstarrocks_format.so file:
# build be
./build.sh --clean --be --enable-shared-data
# build format-lib
./build.sh --format-lib --enable-shared-data
Then execute the following command to compile the format-sdk module:
cd ./format-sdk
mvn clean package -DskipTests
After that, you will get two files: libstarrocks_format_wrapper.so and format-sdk.jar.
How to use?
Assume there is a product sales information table, the table schema is as follows:
CREATE TABLE `tb_sales`
(
`product_id` BIGINT NOT NULL COMMENT "Product ID",
`date` DATE NOT NULL COMMENT "Sale Date",
`quantity` INT NOT NULL COMMENT "Sale Quantity",
`amount` BIGINT NOT NULL COMMENT "Sale Amount"
) ENGINE = OLAP PRIMARY KEY(`product_id`, `date`)
DISTRIBUTED BY HASH(`product_id`)
PROPERTIES (
"enable_persistent_index" = "true"
);
Next, we will take a Java program as an example to show how to use the Format SDK to read and write data files of this table.
Note: you need to introduce the
format-sdk.jardependency into your project.
- Write by StarRocksWriter
You can write a program to use StarRocksWriter to perform write operations according to the following steps:
- Request the metadata and partition information of the table by
RestClient; - Begin a transaction;
- Create and open a
StarRocksWriterwhich binding to the target Tablet; - Create an Arrow
VectorSchemaRootobject, and fill it with custom data according to the table schema definition; - Call the
StarRocksWriter#writemethod to write the filledVectorSchemaRootobject to StarRocks; - Batch as needed, and call the
StarRocksWriter#flushmethod to flush the data to the file system; - Repeat steps 4 to 6 above until all data is written;
- Close and release
StarRocksWriter, it's automatically closed and released using thetry-with-resourcesstatement in this example; - Commit or rollback the transaction.
Example code as follows:
final String DEFAULT_CATALOG = "default_catalog";
final String DB_NAME = "bypass";
final String TB_NAME = "tb_sales";
Config config = Config.newBuilder()
.feHttpUrl("http://127.0.0.1:8030")
.feJdbcUrl("jdbc:mysql://127.0.0.1:9030")
.database(DB_NAME)
.username("root")
.password("******")
.s3Endpoint("https://tos-s3-cn-beijing.volces.com")
.s3EndpointRegion("cn-beijing")
.s3ConnectionSslEnabled(false)
.s3PathStyleAccess(false)
.s3AccessKey("******")
.s3SecretKey("******")
.build();
String label = String.format(
"bypass_%s_%s_%s", DB_NAME, TB_NAME, RandomStringUtils.randomAlphabetic(8)
);
try (RestClient restClient = RestClient.newBuilder()
.feHttpEndpoints("http://127.0.0.1:8030")
.username("root")
.password("******")
.build()) {
// Request the metadata and partition information of the table by RestClient
TableSchema tableSchema = restClient.getTableSchema(DEFAULT_CATALOG, DB_NAME, TB_NAME);
List<TablePartition> tablePartitions = restClient.listTablePartitions(DEFAULT_CATALOG, DB_NAME, TB_NAME, false);
Schema arrowSchema = ArrowUtils.toArrowSchema(tableSchema, ZoneId.systemDefault());
// Begin a transaction
TransactionResult beginTxnResult = restClient.beginTransaction(DEFAULT_CATALOG, DB_NAME, TB_NAME, label);
assertTrue(beginTxnResult.isOk());
List<TabletCommitInfo> commitTablets = new ArrayList<>();
TablePartition tablePartition = tablePartitions.get(0);
for (int i = 0; i < tablePartition.getTablets().size(); i++) {
TablePartition.Tablet tablet = tablePartition.getTablets().get(i);
Long tabletId = tablet.getId();
// Create and open a StarRocksWriter which binding to the target Tablet
try (StarRocksWriter tabletWriter = new StarRocksWriter(
tabletId, tablePartition.getStoragePath(), beginTxnResult.getTxnId(), arrowSchema, config)) {
tabletWriter.open();
try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, tabletWriter.getAllocator())) {
// Fill the VectorSchemaRoot object with custom data according to the table schema definition
fillSampleData(i, root);
// Write the filled VectorSchemaRoot object to StarRocks
tabletWriter.write(root);
// Flush the data to the file system
tabletWriter.flush();
}
tabletWriter.finish();
}
commitTablets.add(new TabletCommitInfo(tabletId, tablet.getPrimaryComputeNodeId()));
}
// Commit or rollback the transaction
TransactionResult prepareTxnResult = restClient.prepareTransaction(
DEFAULT_CATALOG, DB_NAME, beginTxnResult.getLabel(), commitTablets, null);
if (prepareTxnResult.isOk()) {
TransactionResult commitTxnResult = restClient.commitTransaction(
DEFAULT_CATALOG, DB_NAME, prepareTxnResult.getLabel());
assertTrue(commitTxnResult.isOk());
} else {
TransactionResult rollbackTxnResult = restClient.rollbackTransaction(
DEFAULT_CATALOG, DB_NAME, prepareTxnResult.getLabel(), null);
assertTrue(rollbackTxnResult.isOk());
}
}
- Read by StarRocksReader
You can write a program to use StarRocksWriter to perform read operations according to the following steps:
- Request the metadata and partition information of the table by
RestClient; - Create and open a
StarRocksReaderwhich binding to the target Tablet; StarRocksReadersupports iterator access, so you can iterate and read the data;- Close and release
StarRocksReader, it's automatically closed and released using thetry-with-resourcesstatement in this example.
Example code as follows:
final String DEFAULT_CATALOG = "default_catalog";
final String DB_NAME = "bypass";
final String TB_NAME = "tb_sales";
Config config = Config.newBuilder()
.feHttpUrl("http://127.0.0.1:8030")
.feJdbcUrl("jdbc:mysql://127.0.0.1:9030")
.database(DB_NAME)
.username("root")
.password("******")
.s3Endpoint("https://tos-s3-cn-beijing.volces.com")
.s3EndpointRegion("cn-beijing")
.s3ConnectionSslEnabled(false)
.s3PathStyleAccess(false)
.s3AccessKey("******")
.s3SecretKey("******")
.build();
try (RestClient restClient = RestClient.newBuilder()
.feHttpEndpoints("http://127.0.0.1:8030")
.username("root")
.password("******")
.build()) {
// Request the metadata and partition information of the table by RestClient
TableSchema tableSchema = restClient.getTableSchema(DEFAULT_CATALOG, DB_NAME, TB_NAME);
List<TablePartition> tablePartitions = restClient.listTablePartitions(DEFAULT_CATALOG, DB_NAME, TB_NAME, false);
Schema arrowSchema = ArrowUtils.toArrowSchema(tableSchema, ZoneId.systemDefault());
TablePartition partition = tablePartitions.get(0);
for (TablePartition.Tablet tablet : partition.getTablets()) {
Long tabletId = tablet.getId();
// Create and open a StarRocksReader which binding to the target Tablet
try (StarRocksReader reader = new StarRocksReader(
tabletId,
partition.getStoragePath(),
partition.getVisibleVersion(),
arrowSchema,
arrowSchema,
config)) {
reader.open();
// StarRocksReader supports iterator access, so you can iterate and read the data
while (reader.hasNext()) {
try (VectorSchemaRoot root = reader.next()) {
System.out.println(root.contentToTSVString());
}
}
}
}
}