Encapsulate a tools for stream load when import file with a big size #4031 (#5451)

Use java and shell realize tools about stream load

The method of stream load is limited by `curl` performance. As a result, it's too slow to import a big file.
I want to realize a Java client to transfer the big file in parallel, can increase the performance.

First, I need to sequential read the big file, due to the characteristics of the disk, the performance of sequential read is of less concern about thread number.
Second, the data that was read will compose a long string, and be put in the blocking queue. when the main thread will block, the size of the blocking queue is equal to a limited size.
Third, the worker thread will consume the blocking queue, and send data through the network. when the worker thread is stopped, they receive an empty string from the main thread.

Using curl with one thread, the transmission speed is limited by disk bandwidth and at tens of megabytes.
With the new tools, the transmission speed can run over the 10 Gigabit network and satulated the disk bandwidth.
However, the transmission speed may also be affected by the sequential read-write performance of the hard disk. If you want faster transmission speed, you need a hard disk with better performance
This commit is contained in:
林郎俊赏丶 2022-04-25 21:48:17 +08:00 committed by GitHub
parent dfc0debed0
commit f74c7a6212
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 285 additions and 0 deletions

View File

@ -0,0 +1,39 @@
stream load import tools ( multi-threading parallel ) , expect memory use in 500M no matter file size
### Use
First , you need to install jdk , and run the script
```shell
./stream-load-import.sh --url=http://{fe_ip}:{fe_http_port}/api/{database_name}/{table_name}/_stream_load \
--source_file=/file/path/name.csv \
--H=column_separator:, \
--u=sr_username:sr_password
```
### Params
Necessary:
- --source_file: the absolute path of import file
- --url: the `fe` url , it should contain protocol and so on, not `be` redirect url
- --u: the starrocks database user and password ,not server
Optional:
- --enable_debug: default false ,`--enable_debug=true`
- --max_threads: parallel thread number , default min(server_core_number,32) ,`--max_threads=16`
- --timeout: http protocol connect timeout and proccess timeout , default 60*1000ms, for example 5s `--timeout=5000`
- --queue_size: memory queue limit size , default 256 , not reset unless you have a lot of memory and you need reset `-Xmx`
- --H: http request header , for example `--H=column_separator:,`,column_separator as key,`,`as value
Other:
https://docs.starrocks.com/zh-cn/main/loading/StreamLoad
### Warn
When appear error a certain thread, other normal thread transaction will not rollback
Currently, you need ensure your file not contain error data,and can clear table
We are realizing union transaction , it can resolve this problem

View File

@ -0,0 +1,223 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited.
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
public class StreamLoadImportUtils {
private static ExecutorService service;
private static String sourceFilePath;
private static String url;
private static String auth;
private static List<String> headers = new ArrayList<>();
private static Integer queueSize = 256;
private static Boolean enableDebug = Boolean.FALSE;
private static Integer connectTimeout = 60 * 1000;
private static Integer maxThreads = Math.min(Runtime.getRuntime().availableProcessors(), 32);
private static volatile BlockingDeque<String> blockingQueue = new LinkedBlockingDeque<>();
public static void main(String[] args) throws IOException, InterruptedException {
resetDefaultConfig(args);
printConfig();
initWorkerThread();
InputStream inputStream = new FileInputStream(sourceFilePath);
InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
BufferedReader reader = new BufferedReader(inputStreamReader);
StringBuilder stringBuilder = new StringBuilder();
String line;
int count = queueSize;
while ((line = reader.readLine()) != null) {
stringBuilder.append(line).append("\n");
count--;
if (count == 0) {
blockingQueue.addLast(stringBuilder.toString());
count = queueSize;
stringBuilder = new StringBuilder();
// current-limiting
while (blockingQueue.size() > queueSize) {
if (enableDebug) {
System.out.println("The main thread is sleeping because the speed of reading file is too fast. If printing frequently, you should consider resetting the queue size");
}
Thread.sleep(30L);
}
}
}
// clear string builder
if (stringBuilder.length() > 0) {
blockingQueue.addLast(stringBuilder.toString());
}
// send signal to worker thread
for (Integer i = 0; i < maxThreads; i++) {
blockingQueue.addLast("");
}
try {
reader.close();
inputStreamReader.close();
inputStream.close();
service.shutdown();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void printConfig() {
if (url == null) {
System.out.println("url is empty , please set --url=xxx");
System.exit(0);
}
if (auth == null) {
System.out.println("auth is empty , please set --u=username:password");
System.exit(0);
}
if (sourceFilePath == null) {
System.out.println("source file path is empty , please set --source_file=/xxx/xx.csv");
System.exit(0);
}
if (enableDebug) {
System.out.println(String.format("%s=%s", "sourceFilePath", sourceFilePath));
System.out.println(String.format("%s=%s", "url", url));
System.out.println(String.format("%s=%s", "queueSize", queueSize));
System.out.println(String.format("%s=%s", "timeout", connectTimeout));
System.out.println(String.format("%s=%s", "maxThreads", maxThreads));
System.out.println(String.format("%s=%s", "auth", auth));
System.out.println("Header:");
for (String header : headers) {
System.out.println(String.format("%s", header));
}
}
}
public static void executeGetAndSend() {
OutputStream outputStream = null;
InputStream inputStream = null;
HttpURLConnection conn = null;
try {
// fe redirect be
conn = getConnection(url);
if (conn.getResponseCode() > 300 && conn.getResponseCode() < 400) {
String redirectUrl = conn.getHeaderField("Location");
conn.disconnect();
conn = getConnection(redirectUrl);
}
// get data and send to be
outputStream = conn.getOutputStream();
String data;
while ((data = blockingQueue.takeFirst()) != null) {
if ("".equals(data)) {
break;
}
outputStream.write(data.getBytes(StandardCharsets.UTF_8));
}
inputStream = conn.getInputStream();
int available = inputStream.available();
byte[] bytes = new byte[available];
inputStream.read(bytes);
String result = new String(bytes);
if (enableDebug) {
System.out.println(result);
}
if (result != null && result.contains("\"Status\": \"Fail\"")) {
System.out.println("stream load status is fail \n" + result);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (conn != null) {
conn.disconnect();
}
}
}
private static HttpURLConnection getConnection(String loadUrl) throws IOException {
URL url = new URL(loadUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setDoOutput(true);
conn.setDoInput(true);
conn.setUseCaches(false);
conn.setReadTimeout(connectTimeout);
conn.setConnectTimeout(connectTimeout);
conn.setRequestMethod("PUT");
conn.setRequestProperty("label", UUID.randomUUID().toString());
conn.setInstanceFollowRedirects(false);
conn.setRequestProperty("Expect", "100-continue");
conn.setRequestProperty("Content-Type", "multipart/form-data;");
conn.setRequestProperty("Authorization", "Basic " + auth);
conn.setRequestProperty("Connection", "Keep-Alive");
conn.setRequestProperty("Accept", "*/*");
conn.setRequestProperty("Accept-Encoding", "gzip, deflate");
conn.setRequestProperty("Cache-Control", "no-cache");
conn.setRequestProperty("Content-Type", "multipart/form-data;");
conn.setChunkedStreamingMode(8192);
for (String header : headers) {
String[] split = header.split(":");
if (split.length > 1) {
conn.setRequestProperty(split[0], split[1]);
}
}
conn.connect();
return conn;
}
public static void resetDefaultConfig(String[] args) {
for (String arg : args) {
String param = arg.replace("--", "");
String name = param.substring(0, param.indexOf("="));
String value = param.substring(param.indexOf("=") + 1);
switch (name) {
case "url":
url = value;
break;
case "max_threads":
maxThreads = Integer.valueOf(value);
break;
case "queue_size":
queueSize = Integer.valueOf(value);
break;
case "enable_debug":
enableDebug = Boolean.valueOf(value);
break;
case "timeout":
connectTimeout = Integer.valueOf(value);
break;
case "u":
auth = new String(Base64.getEncoder().encode(value.getBytes(StandardCharsets.UTF_8)));
break;
case "source_file":
sourceFilePath = value;
break;
case "H":
headers.add(value);
break;
}
}
}
public static void initWorkerThread() {
service = Executors.newFixedThreadPool(maxThreads);
for (Integer i = 0; i < maxThreads; i++) {
service.submit(() -> executeGetAndSend());
}
}
}

View File

@ -0,0 +1,23 @@
# This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited.
#!/usr/bin/env bash
# java
if [ "$JAVA_HOME" = "" ]; then
echo "Error: JAVA_HOME is not set."
exit 1
fi
JAVA=$JAVA_HOME/bin/java
JAVAC=$JAVA_HOME/bin/javac
STREAM_LOAD_UTILS_PARAM="$STREAM_LOAD_UTILS_PARAM";
for i in "$*";
do
STREAM_LOAD_UTILS_PARAM="$STREAM_LOAD_UTILS_PARAM $i"
done
# echo $STREAM_LOAD_UTILS_PARAM
$JAVAC StreamLoadImportUtils.java
$JAVA -Xmx512m -XX:+UseG1GC StreamLoadImportUtils $STREAM_LOAD_UTILS_PARAM