Access s3/oss via new `S3Client` of aws-sdk-cpp (#3066)

This commit is contained in:
yan.zhang 2022-02-15 11:14:24 +08:00 committed by GitHub
parent af0881614e
commit 36e2b4818f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1131 additions and 154 deletions

View File

@ -52,7 +52,6 @@ message(STATUS "Build target arch is ${CMAKE_BUILD_TARGET_ARCH}")
# Set dirs
set(BASE_DIR "${CMAKE_CURRENT_SOURCE_DIR}")
set(ENV{STARROCKS_HOME} "${BASE_DIR}/../")
set(BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}")
set(THIRDPARTY_DIR "$ENV{STARROCKS_THIRDPARTY}/installed/")
set(STARROCKS_THIRDPARTY "$ENV{STARROCKS_THIRDPARTY}")
@ -116,6 +115,11 @@ if(NOT ${MAKE_GENSRC_RESULT} EQUAL 0 AND NOT APPLE)
message(FATAL_ERROR "Failed to build ${BASE_DIR}/../gensrc/")
endif()
# Add common cmake prefix path and link library path
list(APPEND CMAKE_PREFIX_PATH ${THIRDPARTY_DIR}/lib/cmake)
list(APPEND CMAKE_PREFIX_PATH ${THIRDPARTY_DIR}/lib64/cmake)
link_directories(${THIRDPARTY_DIR}/lib ${THIRDPARTY_DIR}/lib64)
# Set Boost
set(Boost_DEBUG FALSE)
set(Boost_USE_MULTITHREADED ON)
@ -188,12 +192,22 @@ if (WITH_HDFS)
set_target_properties(hdfs PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libhdfs.a)
endif()
add_library(libevent STATIC IMPORTED)
set_target_properties(libevent PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libevent.a)
add_library(crypto STATIC IMPORTED)
set_target_properties(crypto PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libcrypto.a)
if (WITH_AWS)
set(AWSSDK_ROOT_DIR ${THIRDPARTY_DIR})
set(AWSSDK_COMMON_RUNTIME_LIBS "aws-crt-cpp;aws-c-auth;aws-c-cal;aws-c-common;aws-c-compression;aws-c-event-stream;aws-c-http;aws-c-io;aws-c-mqtt;aws-c-s3;aws-checksums;s2n")
foreach(lib IN ITEMS ${AWSSDK_COMMON_RUNTIME_LIBS})
list(APPEND CMAKE_PREFIX_PATH ${THIRDPARTY_DIR}/lib64/${lib}/cmake)
endforeach()
find_package(AWSSDK REQUIRED COMPONENTS core s3 s3-crt transfer)
include_directories(${AWSSDK_INCLUDE_DIRS})
endif()
add_library(libevent STATIC IMPORTED)
set_target_properties(libevent PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libevent.a)
add_library(openssl STATIC IMPORTED)
set_target_properties(openssl PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libssl.a)
@ -328,6 +342,10 @@ if (WITH_HDFS)
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DSTARROCKS_WITH_HDFS")
endif()
if (WITH_AWS)
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DSTARROCKS_WITH_AWS")
endif()
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -faligned-new")
endif()
@ -447,6 +465,7 @@ set(STARROCKS_LINK_LIBS
Tools
Geo
Plugin
ObjectStore
${WL_END_GROUP}
)
@ -467,6 +486,13 @@ if (WITH_HDFS)
include_directories(${JAVA_HOME}/include/linux)
endif()
if (WITH_AWS)
set(STARROCKS_DEPENDENCIES ${STARROCKS_DEPENDENCIES}
${AWSSDK_LINK_LIBRARIES}
${AWSSDK_PLATFORM_DEPS}
)
endif()
# Set thirdparty libraries
set(STARROCKS_DEPENDENCIES
${STARROCKS_DEPENDENCIES}
@ -587,6 +613,7 @@ add_subdirectory(${SRC_DIR}/geo)
add_subdirectory(${SRC_DIR}/gutil)
add_subdirectory(${SRC_DIR}/http)
add_subdirectory(${SRC_DIR}/io)
add_subdirectory(${SRC_DIR}/object_store)
add_subdirectory(${SRC_DIR}/storage)
add_subdirectory(${SRC_DIR}/runtime)
add_subdirectory(${SRC_DIR}/serde)

View File

@ -20,6 +20,7 @@
// under the License.
#pragma once
// Compiler hint that this branch is likely or unlikely to
// be taken. Take from the "What all programmers should know
// about memory" paper.
@ -48,6 +49,7 @@
#define ALIGN_CACHE_LINE __attribute__((aligned(CACHE_LINE_SIZE)))
#ifndef DIAGNOSTIC_PUSH
#ifdef __clang__
#define DIAGNOSTIC_PUSH _Pragma("clang diagnostic push")
#define DIAGNOSTIC_POP _Pragma("clang diagnostic pop")
@ -60,9 +62,10 @@
#else
#error("Unknown compiler")
#endif
#endif // ifndef DIAGNOSTIC_PUSH
#ifndef DIAGNOSTIC_IGNORE
#define PRAGMA(TXT) _Pragma(#TXT)
#ifdef __clang__
#define DIAGNOSTIC_IGNORE(XXX) PRAGMA(clang diagnostic ignored XXX)
#elif defined(__GNUC__)
@ -72,3 +75,4 @@
#else
#define DIAGNOSTIC_IGNORE(XXX)
#endif
#endif // ifndef DIAGNOSTIC_IGNORE

View File

@ -681,6 +681,12 @@ CONF_Bool(use_hdfs_pread, "true");
// default: true
CONF_Bool(rewrite_partial_segment, "true");
// properties to access aws s3
CONF_String(aws_access_key_id, "");
CONF_String(aws_secret_access_key, "");
CONF_String(aws_s3_endpoint, "");
CONF_Int64(aws_s3_max_connection, "102400");
} // namespace config
} // namespace starrocks

View File

@ -10,8 +10,10 @@
namespace starrocks {
HdfsRandomAccessFile::HdfsRandomAccessFile(hdfsFS fs, std::string filename, bool usePread)
: _opened(false), _fs(fs), _file(nullptr), _filename(std::move(filename)), _usePread(usePread) {}
// ================================== HdfsRandomAccessFile ==========================================
HdfsRandomAccessFile::HdfsRandomAccessFile(hdfsFS fs, const std::string& file_name, size_t file_size, bool usePread)
: _opened(false), _fs(fs), _file(nullptr), _file_name(file_name), _file_size(file_size), _usePread(usePread) {}
HdfsRandomAccessFile::~HdfsRandomAccessFile() noexcept {
close();
@ -20,16 +22,16 @@ HdfsRandomAccessFile::~HdfsRandomAccessFile() noexcept {
Status HdfsRandomAccessFile::open() {
DCHECK(!_opened);
if (_fs) {
_file = hdfsOpenFile(_fs, _filename.c_str(), O_RDONLY, 0, 0, 0);
_file = hdfsOpenFile(_fs, _file_name.c_str(), O_RDONLY, 0, 0, 0);
if (_file == nullptr) {
return Status::InternalError(fmt::format("open file failed, file={}", _filename));
return Status::InternalError(fmt::format("open file failed, file={}", _file_name));
}
}
_opened = true;
return Status::OK();
}
void HdfsRandomAccessFile::close() noexcept {
void HdfsRandomAccessFile::close() {
if (_opened) {
if (_fs && _file) {
hdfsCloseFile(_fs, _file);
@ -38,56 +40,58 @@ void HdfsRandomAccessFile::close() noexcept {
}
}
static Status read_at_internal(hdfsFS fs, hdfsFile file, const std::string& file_name, int64_t offset, Slice* res,
bool usePread) {
if (usePread) {
if (hdfsPreadFully(fs, file, offset, res->data, res->size) == -1) {
return Status::IOError(strings::Substitute("fail to hdfsPreadFully file, file=$0, error=$1", file_name,
Status HdfsRandomAccessFile::_read_at(int64_t offset, char* data, size_t size, size_t* read_size) const {
if (_usePread) {
*read_size = size;
if (hdfsPreadFully(_fs, _file, offset, data, size) == -1) {
return Status::IOError(strings::Substitute("fail to hdfsPreadFully file, file=$0, error=$1", _file_name,
get_hdfs_err_msg()));
}
} else {
auto cur_offset = hdfsTell(fs, file);
auto cur_offset = hdfsTell(_fs, _file);
if (cur_offset == -1) {
return Status::IOError(
strings::Substitute("fail to get offset, file=$0, error=$1", file_name, get_hdfs_err_msg()));
strings::Substitute("fail to get offset, file=$0, error=$1", _file_name, get_hdfs_err_msg()));
}
if (cur_offset != offset) {
if (hdfsSeek(fs, file, offset)) {
if (hdfsSeek(_fs, _file, offset)) {
return Status::IOError(strings::Substitute("fail to seek offset, file=$0, offset=$1, error=$2",
file_name, offset, get_hdfs_err_msg()));
_file_name, offset, get_hdfs_err_msg()));
}
}
size_t bytes_read = 0;
while (bytes_read < res->size) {
size_t to_read = res->size - bytes_read;
auto hdfs_res = hdfsRead(fs, file, res->data + bytes_read, to_read);
while (bytes_read < size) {
size_t to_read = size - bytes_read;
auto hdfs_res = hdfsRead(_fs, _file, data + bytes_read, to_read);
if (hdfs_res < 0) {
return Status::IOError(
strings::Substitute("fail to hdfsRead file, file=$0, error=$1", file_name, get_hdfs_err_msg()));
return Status::IOError(strings::Substitute("fail to hdfsRead file, file=$0, error=$1", _file_name,
get_hdfs_err_msg()));
} else if (hdfs_res == 0) {
break;
}
bytes_read += hdfs_res;
}
res->size = bytes_read;
*read_size = bytes_read;
}
return Status::OK();
}
Status HdfsRandomAccessFile::read(uint64_t offset, Slice* res) const {
DCHECK(_opened);
RETURN_IF_ERROR(read_at_internal(_fs, _file, _filename, offset, res, _usePread));
size_t read_size = 0;
Status st = _read_at(offset, res->data, res->size, &read_size);
if (!st.ok()) return st;
res->size = read_size;
return Status::OK();
}
Status HdfsRandomAccessFile::read_at(uint64_t offset, const Slice& res) const {
DCHECK(_opened);
Slice slice = res;
RETURN_IF_ERROR(read_at_internal(_fs, _file, _filename, offset, &slice, _usePread));
if (slice.size != res.size) {
size_t read_size = 0;
RETURN_IF_ERROR(_read_at(offset, res.data, res.size, &read_size));
if (read_size != res.size) {
return Status::InternalError(
strings::Substitute("fail to read enough data, file=$0, offset=$1, size=$2, expect=$3", _filename,
offset, slice.size, res.size));
strings::Substitute("fail to read enough data, file=$0, offset=$1, size=$2, expect=$3", _file_name,
offset, read_size, res.size));
}
return Status::OK();
}
@ -97,8 +101,53 @@ Status HdfsRandomAccessFile::readv_at(uint64_t offset, const Slice* res, size_t
return Status::InternalError("HdfsRandomAccessFile::readv_at not implement");
}
Status HdfsRandomAccessFile::size(uint64_t* size) const {
// TODO: implement
return Status::InternalError("HdfsRandomAccessFile::size not implement");
// =================================== S3RandomAccessFile =========================================
S3RandomAccessFile::S3RandomAccessFile(S3Client* client, const std::string& bucket, const std::string& object,
size_t object_size)
: _client(client), _bucket(bucket), _object(object), _object_size(object_size) {
_file_name = "s3://" + _bucket + "/" + _object;
}
Status S3RandomAccessFile::read(uint64_t offset, Slice* res) const {
size_t read_size = 0;
Status st = _client->get_object_range(_bucket, _object, offset, res->size, res->data, &read_size);
if (!st.ok()) return st;
res->size = read_size;
return st;
}
Status S3RandomAccessFile::read_at(uint64_t offset, const Slice& res) const {
size_t read_size = 0;
RETURN_IF_ERROR(_client->get_object_range(_bucket, _object, offset, res.size, res.data, &read_size));
if (read_size != res.size) {
return Status::InternalError(
strings::Substitute("fail to read enough data, file=$0, offset=$1, size=$2, expect=$3", _file_name,
offset, read_size, res.size));
}
return Status::OK();
}
Status S3RandomAccessFile::readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const {
// TODO: implement
return Status::InternalError("S3RandomAccessFile::readv_at not implement");
}
std::shared_ptr<RandomAccessFile> create_random_access_hdfs_file(const HdfsFsHandle& handle,
const std::string& file_path, size_t file_size,
bool usePread) {
if (handle.type == HdfsFsHandle::Type::HDFS) {
return std::make_shared<HdfsRandomAccessFile>(handle.hdfs_fs, file_path, file_size, usePread);
} else if (handle.type == HdfsFsHandle::Type::S3) {
const std::string& nn = handle.namenode;
const std::string bucket = get_bucket_from_namenode(nn);
const std::string object = file_path.substr(nn.size(), file_path.size() - nn.size());
return std::make_shared<S3RandomAccessFile>(handle.s3_client, bucket, object, file_size);
} else {
CHECK(false) << strings::Substitute("Unknown HdfsFsHandle::Type $0", static_cast<int>(handle.type));
__builtin_unreachable();
return nullptr;
}
}
} // namespace starrocks

52
be/src/env/env_hdfs.h vendored
View File

@ -5,33 +5,73 @@
#include <hdfs/hdfs.h>
#include "env/env.h"
#include "object_store/s3_client.h"
namespace starrocks {
struct HdfsFsHandle {
enum class Type { LOCAL, HDFS, S3 };
Type type;
std::string namenode;
hdfsFS hdfs_fs;
S3Client* s3_client;
};
// class for remote read hdfs file
// Now this is not thread-safe.
class HdfsRandomAccessFile : public RandomAccessFile {
public:
HdfsRandomAccessFile(hdfsFS fs, std::string filename, bool usePread);
HdfsRandomAccessFile(hdfsFS fs, const std::string& file_name, size_t file_size, bool usePread);
virtual ~HdfsRandomAccessFile() noexcept;
void close();
Status open();
void close() noexcept;
Status read(uint64_t offset, Slice* res) const override;
Status read_at(uint64_t offset, const Slice& res) const override;
Status readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const override;
Status size(uint64_t* size) const override;
const std::string& file_name() const override { return _filename; }
Status size(uint64_t* size) const override {
*size = _file_size;
return Status::OK();
}
const std::string& file_name() const override { return _file_name; }
hdfsFile hdfs_file() const { return _file; }
private:
Status _read_at(int64_t offset, char* data, size_t size, size_t* read_size) const;
bool _opened;
hdfsFS _fs;
hdfsFile _file;
std::string _filename;
std::string _file_name;
size_t _file_size;
bool _usePread;
};
class S3RandomAccessFile : public RandomAccessFile {
public:
S3RandomAccessFile(S3Client* client, const std::string& bucket, const std::string& object, size_t object_size = 0);
virtual ~S3RandomAccessFile() noexcept = default;
Status read(uint64_t offset, Slice* res) const override;
Status read_at(uint64_t offset, const Slice& res) const override;
Status readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const override;
Status size(uint64_t* size) const override {
*size = _object_size;
return Status::OK();
}
const std::string& file_name() const override { return _file_name; }
private:
void _init(const HdfsFsHandle& handle);
S3Client* _client;
std::string _file_name;
std::string _bucket;
std::string _object;
size_t _object_size;
};
std::shared_ptr<RandomAccessFile> create_random_access_hdfs_file(const HdfsFsHandle& handle,
const std::string& file_path, size_t file_size,
bool usePread);
} // namespace starrocks

View File

@ -254,7 +254,7 @@ public:
PosixRandomAccessFile(std::string filename, int fd) : _filename(std::move(filename)), _fd(fd) {}
~PosixRandomAccessFile() override {
int res;
RETRY_ON_EINTR(res, close(_fd));
RETRY_ON_EINTR(res, ::close(_fd));
if (res != 0) {
LOG(WARNING) << "close file failed, name=" << _filename << ", msg=" << errno_to_string(errno);
}

View File

@ -13,6 +13,7 @@
#include "exprs/vectorized/runtime_filter.h"
#include "fmt/core.h"
#include "glog/logging.h"
#include "gutil/strings/substitute.h"
#include "runtime/current_thread.h"
#include "runtime/exec_env.h"
#include "runtime/hdfs/hdfs_fs_cache.h"
@ -205,6 +206,7 @@ Status HdfsScanNode::_create_and_init_scanner(RuntimeState* state, const HdfsFil
scanner_params.runtime_filter_collector = &_runtime_filter_collector;
scanner_params.scan_ranges = hdfs_file_desc.splits;
scanner_params.fs = hdfs_file_desc.fs;
scanner_params.fs_handle_type = hdfs_file_desc.fs_handle_type;
scanner_params.tuple_desc = _tuple_desc;
scanner_params.materialize_slots = _materialize_slots;
scanner_params.materialize_index_in_chunk = _materialize_index_in_chunk;
@ -619,7 +621,7 @@ Status HdfsScanNode::_find_and_insert_hdfs_file(const THdfsScanRange& scan_range
// if found, add file splits to hdfs file desc
// if not found, create
for (auto& item : _hdfs_files) {
if (item->partition_id == scan_range.partition_id && item->path == scan_range_path) {
if (item->partition_id == scan_range.partition_id && item->scan_range_path == scan_range_path) {
item->splits.emplace_back(&scan_range);
return Status::OK();
}
@ -637,8 +639,7 @@ Status HdfsScanNode::_find_and_insert_hdfs_file(const THdfsScanRange& scan_range
native_file_path = file_path.native();
}
std::string namenode;
RETURN_IF_ERROR(get_name_node_from_path(native_file_path, &namenode));
_is_hdfs_fs = is_hdfs_path(namenode.c_str());
RETURN_IF_ERROR(get_namenode_from_path(native_file_path, &namenode));
bool usePread = starrocks::config::use_hdfs_pread || is_object_storage_path(namenode.c_str());
if (namenode.compare("default") == 0) {
@ -646,26 +647,26 @@ Status HdfsScanNode::_find_and_insert_hdfs_file(const THdfsScanRange& scan_range
auto* env = Env::Default();
std::unique_ptr<RandomAccessFile> file;
env->new_random_access_file(native_file_path, &file);
auto* hdfs_file_desc = _pool->add(new HdfsFileDesc());
hdfs_file_desc->hdfs_fs = nullptr;
hdfs_file_desc->fs = std::move(file);
hdfs_file_desc->fs_handle_type = HdfsFsHandle::Type::LOCAL;
hdfs_file_desc->partition_id = scan_range.partition_id;
hdfs_file_desc->path = scan_range_path;
hdfs_file_desc->scan_range_path = scan_range_path;
hdfs_file_desc->file_length = scan_range.file_length;
hdfs_file_desc->splits.emplace_back(&scan_range);
hdfs_file_desc->hdfs_file_format = scan_range.file_format;
hdfs_file_desc->open_limit = nullptr;
_hdfs_files.emplace_back(hdfs_file_desc);
} else {
hdfsFS hdfs;
HdfsFsHandle handle;
std::atomic<int32_t>* open_limit = nullptr;
RETURN_IF_ERROR(HdfsFsCache::instance()->get_connection(namenode, &hdfs, &open_limit));
RETURN_IF_ERROR(HdfsFsCache::instance()->get_connection(namenode, &handle, &open_limit));
auto* hdfs_file_desc = _pool->add(new HdfsFileDesc());
hdfs_file_desc->hdfs_fs = hdfs;
hdfs_file_desc->fs = std::make_shared<HdfsRandomAccessFile>(hdfs, native_file_path, usePread);
size_t file_size = scan_range.file_length;
hdfs_file_desc->fs_handle_type = handle.type;
hdfs_file_desc->fs = create_random_access_hdfs_file(handle, native_file_path, file_size, usePread);
hdfs_file_desc->partition_id = scan_range.partition_id;
hdfs_file_desc->path = scan_range_path;
hdfs_file_desc->scan_range_path = scan_range_path;
hdfs_file_desc->file_length = scan_range.file_length;
hdfs_file_desc->splits.emplace_back(&scan_range);
hdfs_file_desc->hdfs_file_format = scan_range.file_format;

View File

@ -16,12 +16,12 @@
namespace starrocks::vectorized {
struct HdfsFileDesc {
hdfsFS hdfs_fs;
THdfsFileFormat::type hdfs_file_format;
HdfsFsHandle::Type fs_handle_type;
std::shared_ptr<RandomAccessFile> fs;
int partition_id = 0;
std::string path;
std::string scan_range_path;
int64_t file_length = 0;
std::vector<const THdfsScanRange*> splits;
@ -153,8 +153,6 @@ private:
mutable SpinLock _status_mutex;
Status _status;
RuntimeState* _runtime_state = nullptr;
bool _is_hdfs_fs = true;
std::atomic_bool _pending_token = true;
std::atomic<int32_t> _scanner_submit_count = 0;

View File

@ -116,7 +116,10 @@ Status HdfsScanner::open(RuntimeState* runtime_state) {
return Status::OK();
}
#ifndef BE_TEST
RETURN_IF_ERROR(down_cast<HdfsRandomAccessFile*>(_scanner_params.fs.get())->open());
if (_scanner_params.fs_handle_type == HdfsFsHandle::Type::HDFS) {
auto* file = down_cast<HdfsRandomAccessFile*>(_scanner_params.fs.get());
RETURN_IF_ERROR(file->open());
}
#endif
_build_file_read_param();
auto status = do_open(runtime_state);
@ -142,7 +145,10 @@ void HdfsScanner::close(RuntimeState* runtime_state) noexcept {
}
do_close(runtime_state);
#ifndef BE_TEST
down_cast<HdfsRandomAccessFile*>(_scanner_params.fs.get())->close();
if (_scanner_params.fs_handle_type == HdfsFsHandle::Type::HDFS) {
auto* file = down_cast<HdfsRandomAccessFile*>(_scanner_params.fs.get());
file->close();
}
if (_is_open) {
(*_scanner_params.open_limit)--;
}
@ -190,11 +196,12 @@ void HdfsScanner::update_counter() {
if (_scanner_params.fs == nullptr) return;
HdfsReadStats hdfs_stats;
auto hdfs_file = down_cast<HdfsRandomAccessFile*>(_scanner_params.fs.get())->hdfs_file();
if (hdfs_file == nullptr) return;
// Hdfslib only supports obtaining statistics of hdfs file system.
// For other systems such as s3, calling this function will cause be crash.
if (_scanner_params.parent->_is_hdfs_fs) {
if (_scanner_params.fs_handle_type == HdfsFsHandle::Type::HDFS) {
auto hdfs_file = down_cast<HdfsRandomAccessFile*>(_scanner_params.fs.get())->hdfs_file();
if (hdfs_file == nullptr) return;
// Hdfslib only supports obtaining statistics of hdfs file system.
// For other systems such as s3, calling this function will cause be crash.
get_hdfs_statistics(hdfs_file, &hdfs_stats);
}

View File

@ -7,9 +7,11 @@
#include "column/chunk.h"
#include "env/env.h"
#include "env/env_hdfs.h"
#include "exprs/expr_context.h"
#include "runtime/descriptors.h"
#include "util/runtime_profile.h"
namespace starrocks::parquet {
class FileReader;
}
@ -70,6 +72,7 @@ struct HdfsScannerParams {
// file fd (local file or hdfs file)
std::shared_ptr<RandomAccessFile> fs = nullptr;
HdfsFsHandle::Type fs_handle_type;
const TupleDescriptor* tuple_desc;

View File

@ -0,0 +1,19 @@
# This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited.
# where to put generated libraries
set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/object_store")
# where to put generated binaries
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/object_store")
set(EXEC_FILES placeholder.cpp)
if (WITH_AWS)
set(EXEC_FILES ${EXEC_FILES}
s3_client.cpp)
endif()
add_library(ObjectStore STATIC
${EXEC_FILES}
)

View File

@ -0,0 +1,53 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited.
#pragma once
#include <string>
#include "common/status.h"
namespace starrocks {
// base class for object store client
class ObjectStoreClient {
public:
ObjectStoreClient() = default;
virtual ~ObjectStoreClient() = default;
/*
* Bucket Operation
*/
virtual Status create_bucket(const std::string& bucket_name) = 0;
virtual Status delete_bucket(const std::string& bucket_name) = 0;
/*
* Object Operation
*/
virtual Status put_object(const std::string& bucket_name, const std::string& object_key,
const std::string& object_path) = 0;
virtual Status put_string_object(const std::string& bucket_name, const std::string& object_key,
const std::string& object_value) = 0;
virtual Status get_object(const std::string& bucket_name, const std::string& object_key,
const std::string& object_path) = 0;
virtual Status get_object_range(const std::string& bucket_name, const std::string& object_key, size_t offset,
size_t length, std::string* object_value, size_t* read_bytes) = 0;
// `object_value` should already be allocated at least `length` bytes
virtual Status get_object_range(const std::string& bucket_name, const std::string& object_key, size_t offset,
size_t length, char* object_value, size_t* read_bytes) = 0;
virtual Status exist_object(const std::string& bucket_name, const std::string& object_key) = 0;
virtual Status get_object_size(const std::string& bucket_name, const std::string& object_key, size_t* size) = 0;
virtual Status delete_object(const std::string& bucket_name, const std::string& object_key) = 0;
virtual Status list_objects(const std::string& bucket_name, const std::string& object_prefix,
std::vector<std::string>* result) = 0;
};
} // namespace starrocks

View File

View File

@ -0,0 +1,356 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited.
#include "object_store/s3_client.h"
#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentialsProvider.h>
#include <aws/core/utils/threading/Executor.h>
#include <aws/s3/model/BucketLocationConstraint.h>
#include <aws/s3/model/CreateBucketRequest.h>
#include <aws/s3/model/DeleteBucketRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/ListObjectsRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/transfer/TransferHandle.h>
#include <fstream>
#include "common/logging.h"
#include "gutil/strings/strip.h"
#include "gutil/strings/substitute.h"
namespace starrocks {
static inline Aws::String to_aws_string(const std::string& s) {
return Aws::String(s.data(), s.size());
}
S3Client::S3Client(const Aws::Client::ClientConfiguration& config, const S3Credential* cred, bool use_transfer_manager)
: _config(config) {
const char* access_key_id = (cred == nullptr || cred->access_key_id.empty()) ? getenv("AWS_ACCESS_KEY_ID")
: cred->access_key_id.c_str();
const char* secret_access_key = (cred == nullptr || cred->secret_access_key.empty())
? getenv("AWS_SECRET_ACCESS_KEY")
: cred->secret_access_key.c_str();
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials =
std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(access_key_id, secret_access_key);
_client = std::make_shared<Aws::S3::S3Client>(credentials, _config);
if (use_transfer_manager) {
Aws::Transfer::TransferManagerConfiguration transfer_config(_get_transfer_manager_executor());
transfer_config.s3Client = _client;
transfer_config.transferBufferMaxHeapSize = kTransferManagerMaxBufferSize;
transfer_config.bufferSize = kTransferManagerSingleBufferSize;
_transfer_manager = Aws::Transfer::TransferManager::Create(transfer_config);
} else {
_transfer_manager = nullptr;
}
}
Status S3Client::create_bucket(const std::string& bucket_name) {
Aws::S3::Model::BucketLocationConstraint constraint =
Aws::S3::Model::BucketLocationConstraintMapper::GetBucketLocationConstraintForName(_config.region);
Aws::S3::Model::CreateBucketConfiguration bucket_config;
bucket_config.SetLocationConstraint(constraint);
Aws::S3::Model::CreateBucketRequest request;
request.SetCreateBucketConfiguration(bucket_config);
request.SetBucket(to_aws_string(bucket_name));
Aws::S3::Model::CreateBucketOutcome outcome = _client->CreateBucket(request);
if (outcome.IsSuccess()) {
return Status::OK();
} else {
std::string error =
strings::Substitute("Create Bucket $0 failed. $1.", bucket_name, outcome.GetError().GetMessage());
LOG(ERROR) << error;
return Status::IOError(error);
}
}
Status S3Client::delete_bucket(const std::string& bucket_name) {
Aws::S3::Model::DeleteBucketRequest request;
request.SetBucket(to_aws_string(bucket_name));
Aws::S3::Model::DeleteBucketOutcome outcome = _client->DeleteBucket(request);
if (outcome.IsSuccess()) {
return Status::OK();
} else {
std::string error =
strings::Substitute("Delete Bucket $0 failed. $1.", bucket_name, outcome.GetError().GetMessage());
LOG(ERROR) << error;
return Status::IOError(error);
}
}
Status S3Client::put_object(const std::string& bucket_name, const std::string& object_key,
const std::string& object_path) {
if (_transfer_manager) {
auto handle = _transfer_manager->UploadFile(to_aws_string(object_path), to_aws_string(bucket_name),
to_aws_string(object_key), Aws::DEFAULT_CONTENT_TYPE,
Aws::Map<Aws::String, Aws::String>());
handle->WaitUntilFinished();
if (handle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED) {
// TODO: log error
return Status::IOError(strings::Substitute("Put Object $0 failed.", object_key));
} else {
return Status::OK();
}
} else {
Aws::S3::Model::PutObjectRequest request;
request.SetBucket(to_aws_string(bucket_name));
request.SetKey(to_aws_string(object_key));
std::shared_ptr<Aws::IOStream> stream = Aws::MakeShared<Aws::FStream>(
Aws::Utils::ARRAY_ALLOCATION_TAG, object_path.c_str(), std::ios_base::in | std::ios_base::binary);
if (!stream->good()) {
std::string error =
strings::Substitute("Put Object $0 failed, fail to open local file $1.", object_key, object_path);
LOG(ERROR) << error;
return Status::IOError(error);
}
request.SetBody(stream);
Aws::S3::Model::PutObjectOutcome outcome = _client->PutObject(request);
if (outcome.IsSuccess()) {
return Status::OK();
} else {
std::string error =
strings::Substitute("Put Object $0 failed. $1.", object_key, outcome.GetError().GetMessage());
LOG(ERROR) << error;
return Status::IOError(error);
}
}
}
Status S3Client::put_string_object(const std::string& bucket_name, const std::string& object_key,
const std::string& object_value) {
std::shared_ptr<Aws::IOStream> stream = Aws::MakeShared<Aws::StringStream>("", object_value);
Aws::S3::Model::PutObjectRequest request;
request.SetBucket(to_aws_string(bucket_name));
request.SetKey(to_aws_string(object_key));
request.SetBody(stream);
Aws::S3::Model::PutObjectOutcome outcome = _client->PutObject(request);
if (outcome.IsSuccess()) {
return Status::OK();
} else {
std::string error =
strings::Substitute("Put Object $0 failed. $1.", object_key, outcome.GetError().GetMessage());
LOG(ERROR) << error;
return Status::IOError(error);
}
}
Status S3Client::get_object_range(const std::string& bucket_name, const std::string& object_key, size_t offset,
size_t length, std::string* object_value, size_t* read_bytes) {
object_value->resize(length);
return get_object_range(bucket_name, object_key, offset, length, (char*)object_value->data(), read_bytes);
}
Status S3Client::get_object_range(const std::string& bucket_name, const std::string& object_key, size_t offset,
size_t length, char* object_value, size_t* read_bytes) {
*read_bytes = 0;
length = (length ? length : 1);
char buffer[128];
int ret = snprintf(buffer, sizeof(buffer), "bytes=%lu-%lu", offset, offset + length - 1);
if (ret < 0) {
std::string error = strings::Substitute("Get Object Range $0 failed, fail to set range.", object_key);
LOG(ERROR) << error;
return Status::IOError(error);
}
Aws::String range(buffer);
Aws::S3::Model::GetObjectRequest request;
request.SetBucket(to_aws_string(bucket_name));
request.SetKey(to_aws_string(object_key));
request.SetRange(range);
Aws::S3::Model::GetObjectOutcome outcome = _client->GetObject(request);
if (outcome.IsSuccess()) {
if (object_value) {
Aws::IOStream& body = outcome.GetResult().GetBody();
body.read(object_value, length);
*read_bytes = body.gcount();
if (body.gcount() != length) {
std::string error = strings::Substitute("Get Object Range $0 failed. expected($1), read($2).",
object_key, length, *read_bytes);
LOG(ERROR) << error;
return Status::IOError(error);
}
}
return Status::OK();
} else {
std::string error =
strings::Substitute("Get Object Range $0 failed. $1.", object_key, outcome.GetError().GetMessage());
LOG(ERROR) << error;
return Status::IOError(error);
}
}
Status S3Client::get_object(const std::string& bucket_name, const std::string& object_key,
const std::string& object_path) {
if (object_path.empty()) {
return Status::IOError(strings::Substitute("Get Object $0 failed, path empty.", object_key));
}
if (_transfer_manager) {
Aws::Transfer::CreateDownloadStreamCallback stream;
if (!object_path.empty()) {
stream = [=]() -> Aws::IOStream* {
return Aws::New<Aws::FStream>(Aws::Utils::ARRAY_ALLOCATION_TAG, object_path,
std::ios_base::out | std::ios_base::trunc);
};
} else {
stream = [=]() -> Aws::IOStream* { return Aws::New<Aws::StringStream>(""); };
}
auto handle = _transfer_manager->DownloadFile(to_aws_string(bucket_name), to_aws_string(object_key),
std::move(stream));
handle->WaitUntilFinished();
if (handle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED) {
// TODO: log error
return Status::IOError(strings::Substitute("Get Object $0 failed.", object_key));
} else {
return Status::OK();
}
} else {
Aws::S3::Model::GetObjectRequest request;
request.SetBucket(to_aws_string(bucket_name));
request.SetKey(to_aws_string(object_key));
if (!object_path.empty()) {
auto stream = [=]() -> Aws::IOStream* {
return Aws::New<Aws::FStream>(Aws::Utils::ARRAY_ALLOCATION_TAG, object_path,
std::ios_base::out | std::ios_base::trunc);
};
request.SetResponseStreamFactory(std::move(stream));
}
Aws::S3::Model::GetObjectOutcome outcome = _client->GetObject(request);
if (outcome.IsSuccess()) {
return Status::OK();
} else {
std::string error =
strings::Substitute("Get Object $0 failed. $1.", object_key, outcome.GetError().GetMessage());
LOG(ERROR) << error;
return Status::IOError(error);
}
}
}
Status S3Client::_head_object(const std::string& bucket_name, const std::string& object_key, size_t* size) {
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(to_aws_string(bucket_name));
request.SetKey(to_aws_string(object_key));
Aws::S3::Model::HeadObjectOutcome outcome = _client->HeadObject(request);
if (!outcome.IsSuccess()) {
if (_is_not_found(outcome.GetError().GetErrorType())) {
return Status::NotFound(strings::Substitute("Object $0 not found.", object_key));
} else {
std::string error =
strings::Substitute("Head Object $0 failed. $1.", object_key, outcome.GetError().GetMessage());
LOG(ERROR) << error;
return Status::IOError(error);
}
} else {
if (size != nullptr) {
*size = outcome.GetResult().GetContentLength();
}
return Status::OK();
}
}
Status S3Client::exist_object(const std::string& bucket_name, const std::string& object_key) {
return _head_object(bucket_name, object_key, nullptr /* size */);
}
Status S3Client::get_object_size(const std::string& bucket_name, const std::string& object_key, size_t* size) {
return _head_object(bucket_name, object_key, size);
}
Status S3Client::delete_object(const std::string& bucket_name, const std::string& object_key) {
Aws::S3::Model::DeleteObjectRequest request;
request.SetBucket(to_aws_string(bucket_name));
request.SetKey(to_aws_string(object_key));
Aws::S3::Model::DeleteObjectOutcome outcome = _client->DeleteObject(request);
if (outcome.IsSuccess()) {
return Status::OK();
} else {
std::string error =
strings::Substitute("Delete Object $0 failed. $1.", object_key, outcome.GetError().GetMessage());
LOG(ERROR) << error;
return Status::IOError(error);
}
}
Status S3Client::list_objects(const std::string& bucket_name, const std::string& object_prefix,
std::vector<std::string>* result) {
result->clear();
// S3 paths don't start with '/'
std::string prefix = StripPrefixString(object_prefix, "/");
// S3 paths better end with '/', otherwise we might also get a list of files
// in a directory for which our path is a prefix
if (prefix.size() > 0 && prefix.back() != '/') {
prefix.push_back('/');
}
// the starting object marker
Aws::String marker;
// get info of bucket+object
while (1) {
Aws::S3::Model::ListObjectsRequest request;
request.SetBucket(to_aws_string(bucket_name));
request.SetMaxKeys(kListObjectMaxKeys);
request.SetPrefix(to_aws_string(prefix));
request.SetMarker(marker);
Aws::S3::Model::ListObjectsOutcome outcome = _client->ListObjects(request);
if (!outcome.IsSuccess()) {
if (_is_not_found(outcome.GetError().GetErrorType())) {
return Status::OK();
}
std::string error = strings::Substitute("List Objects prefix $0 failed. $1.", object_prefix,
outcome.GetError().GetMessage());
LOG(ERROR) << error;
return Status::IOError(error);
}
const Aws::S3::Model::ListObjectsResult& res = outcome.GetResult();
const Aws::Vector<Aws::S3::Model::Object>& objs = res.GetContents();
for (auto o : objs) {
const Aws::String& key = o.GetKey();
// Our path should be a prefix of the fetched value
std::string keystr(key.c_str(), key.size());
if (keystr.find(prefix) != 0) {
return Status::IOError(strings::Substitute("List Objects prefix $0 not match.", object_prefix));
}
const std::string fname = keystr.substr(prefix.size());
result->push_back(fname);
}
// If there are no more entries, then we are done.
if (!res.GetIsTruncated()) {
break;
}
// The new starting point
marker = res.GetNextMarker();
if (marker.empty()) {
// If response does not include the NextMaker and it is
// truncated, you can use the value of the last Key in the response
// as the marker in the subsequent request because all objects
// are returned in alphabetical order
marker = objs.back().GetKey();
}
}
return Status::OK();
}
} // namespace starrocks

View File

@ -0,0 +1,87 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited.
#pragma once
#include <aws/s3/S3Client.h>
#include <aws/transfer/TransferManager.h>
#include <memory>
#include "object_store/object_store_client.h"
namespace starrocks {
class S3Credential {
public:
std::string access_key_id;
std::string secret_access_key;
};
class S3Client final : public ObjectStoreClient {
public:
S3Client(const Aws::Client::ClientConfiguration& config, const S3Credential* cred = nullptr,
bool use_transfer_manager = false);
~S3Client() = default;
/*
* Bucket Operation
*/
Status create_bucket(const std::string& bucket_name) override;
Status delete_bucket(const std::string& bucket_name) override;
/*
* Object Operation
*/
Status put_object(const std::string& bucket_name, const std::string& object_key,
const std::string& object_path) override;
Status put_string_object(const std::string& bucket_name, const std::string& object_key,
const std::string& object_value) override;
Status get_object(const std::string& bucket_name, const std::string& object_key,
const std::string& object_path) override;
Status get_object_range(const std::string& bucket_name, const std::string& object_key, size_t offset, size_t length,
std::string* object_value, size_t* read_bytes) override;
Status get_object_range(const std::string& bucket_name, const std::string& object_key, size_t offset, size_t length,
char* object_value, size_t* read_bytes) override;
Status exist_object(const std::string& bucket_name, const std::string& object_key) override;
Status get_object_size(const std::string& bucket_name, const std::string& object_key, size_t* size) override;
Status delete_object(const std::string& bucket_name, const std::string& object_key) override;
Status list_objects(const std::string& bucket_name, const std::string& object_prefix,
std::vector<std::string>* result) override;
private:
// transfer manager's thread pool.
static const int kThreadPoolNumber = 16;
// maximum size of the transfer manager's working buffer to use.
static const int kTransferManagerMaxBufferSize = 512 * 1024 * 1024; // 256MB
// maximum size that transfer manager will process in a single request.
static const int kTransferManagerSingleBufferSize = 32 * 1024 * 1024; // 32MB
// return how many keys each time call list_object.
static const int kListObjectMaxKeys = 1000;
static Aws::Utils::Threading::Executor* _get_transfer_manager_executor() {
static Aws::Utils::Threading::PooledThreadExecutor executor(kThreadPoolNumber);
return &executor;
}
Status _head_object(const std::string& bucket_name, const std::string& object_key, size_t* size);
bool _is_not_found(const Aws::S3::S3Errors& err) {
return (err == Aws::S3::S3Errors::NO_SUCH_BUCKET || err == Aws::S3::S3Errors::NO_SUCH_KEY ||
err == Aws::S3::S3Errors::RESOURCE_NOT_FOUND);
}
Aws::Client::ClientConfiguration _config;
std::shared_ptr<Aws::S3::S3Client> _client;
std::shared_ptr<Aws::Transfer::TransferManager> _transfer_manager;
};
} // namespace starrocks

View File

@ -5,77 +5,58 @@
#include <memory>
#include <utility>
#include "common/config.h"
#include "gutil/strings/substitute.h"
#include "util/hdfs_util.h"
namespace starrocks {
static Status parse_namenode(const std::string& path, std::string* namenode) {
const std::string local_fs("file:/");
size_t n = path.find("://");
static Status create_hdfs_fs_handle(const std::string& namenode, HdfsFsHandle* handle) {
const char* nn = namenode.c_str();
if (is_hdfs_path(nn)) {
handle->type = HdfsFsHandle::Type::HDFS;
auto hdfs_builder = hdfsNewBuilder();
hdfsBuilderSetNameNode(hdfs_builder, namenode.c_str());
handle->hdfs_fs = hdfsBuilderConnect(hdfs_builder);
if (handle->hdfs_fs == nullptr) {
return Status::InternalError(strings::Substitute("fail to connect hdfs namenode, namenode=$0, err=$1",
namenode, get_hdfs_err_msg()));
}
if (n == std::string::npos) {
if (path.compare(0, local_fs.length(), local_fs) == 0) {
// Hadoop Path routines strip out consecutive /'s, so recognize 'file:/blah'.
*namenode = "file:///";
} else {
// Path is not qualified, so use the default FS.
*namenode = "default";
}
} else if (n == 0) {
return Status::InvalidArgument(strings::Substitute("Path missing scheme: $0", path));
} else if (is_s3a_path(nn) || is_oss_path(nn)) {
handle->type = HdfsFsHandle::Type::S3;
Aws::Client::ClientConfiguration config;
config.scheme = Aws::Http::Scheme::HTTP;
config.endpointOverride = config::aws_s3_endpoint;
config.maxConnections = config::aws_s3_max_connection;
S3Credential cred;
cred.access_key_id = config::aws_access_key_id;
cred.secret_access_key = config::aws_secret_access_key;
S3Client* s3_client = new S3Client(config, &cred, false);
handle->s3_client = s3_client;
} else {
// Path is qualified, i.e. "scheme://authority/path/to/file". Extract
// "scheme://authority/".
n = path.find('/', n + 3);
if (n == std::string::npos) {
return Status::InvalidArgument(strings::Substitute("Path missing '/' after authority: $0", path));
} else {
// Include the trailing '/' for local filesystem case, i.e. "file:///".
*namenode = path.substr(0, n + 1);
}
return Status::InternalError(strings::Substitute("failed to make client, namenode=$0", namenode));
}
return Status::OK();
}
Status HdfsFsCache::get_connection(const std::string& path, hdfsFS* fs, FileOpenLimitPtr* res, HdfsFsMap* local_cache) {
std::string namenode;
RETURN_IF_ERROR(parse_namenode(path, &namenode));
if (local_cache != nullptr) {
auto it = local_cache->find(namenode);
if (it != local_cache->end()) {
*fs = it->second.first;
*res = it->second.second.get();
return Status::OK();
}
}
Status HdfsFsCache::get_connection(const std::string& namenode, HdfsFsHandle* handle, FileOpenLimitPtr* res) {
{
std::lock_guard<std::mutex> l(_lock);
auto it = _cache.find(namenode);
if (it != _cache.end()) {
*fs = it->second.first;
*handle = it->second.first;
*res = it->second.second.get();
} else {
auto hdfs_builder = hdfsNewBuilder();
hdfsBuilderSetNameNode(hdfs_builder, namenode.c_str());
handle->namenode = namenode;
RETURN_IF_ERROR(create_hdfs_fs_handle(namenode, handle));
auto semaphore = std::make_unique<FileOpenLimit>(0);
*fs = hdfsBuilderConnect(hdfs_builder);
*res = semaphore.get();
if (*fs == nullptr) {
return Status::InternalError(strings::Substitute("fail to connect hdfs namenode, name=$0, err=$1",
namenode, get_hdfs_err_msg()));
}
_cache[namenode] = std::make_pair(*fs, std::move(semaphore));
_cache[namenode] = std::make_pair(*handle, std::move(semaphore));
}
}
if (local_cache != nullptr) {
local_cache->emplace(namenode, std::make_pair(*fs, std::make_unique<FileOpenLimit>(0)));
}
return Status::OK();
}

View File

@ -2,8 +2,6 @@
#pragma once
#include <hdfs/hdfs.h>
#include <atomic>
#include <memory>
#include <mutex>
@ -12,8 +10,7 @@
#include <utility>
#include "common/status.h"
#include "gutil/macros.h"
#include "env/env_hdfs.h"
namespace starrocks {
// Cache for HDFS file system
@ -21,7 +18,7 @@ class HdfsFsCache {
public:
using FileOpenLimit = std::atomic<int32_t>;
using FileOpenLimitPtr = FileOpenLimit*;
using HdfsFsMap = std::unordered_map<std::string, std::pair<hdfsFS, std::unique_ptr<FileOpenLimit>>>;
using HdfsFsMap = std::unordered_map<std::string, std::pair<HdfsFsHandle, std::unique_ptr<FileOpenLimit>>>;
static HdfsFsCache* instance() {
static HdfsFsCache s_instance;
@ -29,8 +26,7 @@ public:
}
// This function is thread-safe
Status get_connection(const std::string& path, hdfsFS* fs, FileOpenLimitPtr* semaphore = nullptr,
HdfsFsMap* map = nullptr);
Status get_connection(const std::string& namenode, HdfsFsHandle* handle, FileOpenLimitPtr* semaphore);
private:
std::mutex _lock;

View File

@ -19,6 +19,7 @@
// specific language governing permissions and limitations
// under the License.
#include <aws/core/Aws.h>
#include <gperftools/malloc_extension.h>
#include <sys/file.h>
#include <unistd.h>
@ -271,13 +272,20 @@ int main(int argc, char** argv) {
LOG(INFO) << "StarRocks BE HeartBeat Service started correctly.";
}
#ifdef STARROCKS_WITH_AWS
Aws::SDKOptions aws_sdk_options;
Aws::InitAPI(aws_sdk_options);
#endif
while (!starrocks::k_starrocks_exit) {
sleep(10);
}
daemon->stop();
daemon.reset();
#ifdef STARROCKS_WITH_AWS
Aws::ShutdownAPI(aws_sdk_options);
#endif
heartbeat_thrift_server->stop();
heartbeat_thrift_server->join();
delete heartbeat_thrift_server;

View File

@ -6,15 +6,15 @@
#include <hdfs/hdfs.h>
#include <string>
using std::string;
#include "gutil/strings/substitute.h"
#include "util/error_util.h"
namespace starrocks {
const char* FILESYS_PREFIX_HDFS = "hdfs://";
const char* FILESYS_PREFIX_S3 = "s3a://";
const char* FILESYS_PREFIX_OSS = "oss://";
static const char* kFileSysPrefixHdfs = "hdfs://";
static const char* kFileSysPrefixS3 = "s3a://";
static const char* KFileSysPrefixOSS = "oss://";
std::string get_hdfs_err_msg() {
std::string error_msg = get_str_err_msg();
@ -27,11 +27,11 @@ std::string get_hdfs_err_msg() {
return ss.str();
}
Status get_name_node_from_path(const std::string& path, std::string* namenode) {
const string local_fs("file:/");
size_t n = path.find("://");
Status get_namenode_from_path(const std::string& path, std::string* namenode) {
const std::string local_fs("file:/");
auto n = path.find("://");
if (n == string::npos) {
if (n == std::string::npos) {
if (path.compare(0, local_fs.length(), local_fs) == 0) {
// Hadoop Path routines strip out consecutive /'s, so recognize 'file:/blah'.
*namenode = "file:///";
@ -40,35 +40,45 @@ Status get_name_node_from_path(const std::string& path, std::string* namenode) {
*namenode = "default";
}
} else if (n == 0) {
return Status::InternalError("Path missing schema");
return Status::InvalidArgument(strings::Substitute("Path missing scheme: $0", path));
} else {
// Path is qualified, i.e. "scheme://authority/path/to/file". Extract
// "scheme://authority/".
n = path.find('/', n + 3);
if (n == string::npos) {
return Status::InternalError("Path missing '/' after authority");
if (n == std::string::npos) {
return Status::InvalidArgument(strings::Substitute("Path missing '/' after authority: $0", path));
} else {
// Include the trailing '/' for local filesystem case, i.e. "file:///".
*namenode = path.substr(0, n + 1);
}
// Include the trailing '/' for local filesystem case, i.e. "file:///".
*namenode = path.substr(0, n + 1);
}
return Status::OK();
}
bool is_specific_path(const char* path, const char* specific_prefix) {
std::string get_bucket_from_namenode(const std::string& namenode) {
auto n = namenode.find("://");
if (n == std::string::npos) return "";
n += 3;
auto n2 = namenode.find('/', n);
if (n2 == std::string::npos) return "";
return namenode.substr(n, n2 - n);
}
static bool is_specific_path(const char* path, const char* specific_prefix) {
size_t prefix_len = strlen(specific_prefix);
return strncmp(path, specific_prefix, prefix_len) == 0;
}
bool is_hdfs_path(const char* path) {
return is_specific_path(path, FILESYS_PREFIX_HDFS);
return is_specific_path(path, kFileSysPrefixHdfs);
}
bool is_s3a_path(const char* path) {
return is_specific_path(path, FILESYS_PREFIX_S3);
return is_specific_path(path, kFileSysPrefixS3);
}
bool is_oss_path(const char* path) {
return is_specific_path(path, FILESYS_PREFIX_OSS);
return is_specific_path(path, KFileSysPrefixOSS);
}
bool is_object_storage_path(const char* path) {

View File

@ -10,12 +10,15 @@ namespace starrocks {
std::string get_hdfs_err_msg();
Status get_name_node_from_path(const std::string& path, std::string* namenode);
Status get_namenode_from_path(const std::string& path, std::string* namenode);
std::string get_bucket_from_namenode(const std::string& namenode);
// Returns true if the path refers to a location on an HDFS filesystem.
bool is_hdfs_path(const char* path);
// Returns true if the path refers to a location on object storage filesystem.
bool is_object_storage_path(const char* path);
bool is_s3a_path(const char* path);
bool is_oss_path(const char* path);
} // namespace starrocks

View File

@ -273,6 +273,10 @@ if (WITH_HDFS)
set(EXEC_FILES ${EXEC_FILES} ./exec/vectorized/hdfs_scan_node_test.cpp)
endif ()
if (WITH_AWS)
set(EXEC_FILES ${EXEC_FILES} ./object_store/s3_client_test.cpp)
endif ()
add_executable(starrocks_test ${EXEC_FILES})
TARGET_LINK_LIBRARIES(starrocks_test ${TEST_LINK_LIBS})

View File

@ -0,0 +1,103 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited.
#include "object_store/s3_client.h"
#include <aws/core/Aws.h>
#include <gtest/gtest.h>
#include <fstream>
#include "util/file_utils.h"
namespace starrocks {
static const std::string bucket_name = "starrocks-cloud-test-2022";
class S3Test : public testing::Test {
public:
S3Test() {}
virtual ~S3Test() {}
void SetUp() override { Aws::InitAPI(_options); }
void TearDown() override { Aws::ShutdownAPI(_options); }
private:
Aws::SDKOptions _options;
};
TEST_F(S3Test, bucket_operation) {
Aws::Client::ClientConfiguration config;
config.region = Aws::Region::AP_SOUTHEAST_1;
S3Client client(config);
// create bucket
ASSERT_TRUE(client.create_bucket(bucket_name).ok());
// delete bucket
ASSERT_TRUE(client.delete_bucket(bucket_name).ok());
}
TEST_F(S3Test, object_operation) {
Aws::Client::ClientConfiguration config;
config.region = Aws::Region::AP_SOUTHEAST_1;
S3Client client(config);
// create bucket
ASSERT_TRUE(client.create_bucket(bucket_name).ok());
// put object
const std::string object_key = "hello";
const std::string object_value = "world";
const std::string object_path = "./S3Test_hello";
ASSERT_TRUE(client.put_string_object(bucket_name, object_key, object_value).ok());
// test object exist
ASSERT_TRUE(client.exist_object(bucket_name, object_key).ok());
// get object size
size_t size = 0;
ASSERT_TRUE(client.get_object_size(bucket_name, object_key, &size).ok());
ASSERT_EQ(size, object_value.size());
// get object
ASSERT_FALSE(client.get_object(bucket_name, object_key, "").ok());
ASSERT_TRUE(client.get_object(bucket_name, object_key, object_path).ok());
std::ifstream file(object_path.c_str(), std::ios::in);
ASSERT_TRUE(file.good());
std::string object_value_tmp;
std::getline(file, object_value_tmp);
ASSERT_EQ(object_value_tmp, object_value);
FileUtils::remove(object_path);
// get object range
std::string object_value_range;
size_t read_bytes;
ASSERT_TRUE(client.get_object_range(bucket_name, object_key, 1 /* offset */, 2 /* length */, &object_value_range,
&read_bytes)
.ok());
ASSERT_EQ(read_bytes, 2);
ASSERT_EQ(object_value_range, "or");
// list object
const std::string object_key2 = "test/hello";
ASSERT_TRUE(client.put_string_object(bucket_name, object_key2, object_value).ok());
std::vector<std::string> vector;
ASSERT_TRUE(client.list_objects(bucket_name, "/" /* object_prefix */, &vector).ok());
ASSERT_EQ(vector.size(), 2);
ASSERT_EQ(vector[0], "hello");
ASSERT_EQ(vector[1], "test/hello");
// delete object
ASSERT_TRUE(client.delete_object(bucket_name, object_key).ok());
ASSERT_TRUE(client.delete_object(bucket_name, object_key2).ok());
// test object not exist
ASSERT_TRUE(client.exist_object(bucket_name, object_key).is_not_found());
ASSERT_TRUE(client.exist_object(bucket_name, object_key2).is_not_found());
// delete bucket
ASSERT_TRUE(client.delete_bucket(bucket_name).ok());
}
} // namespace starrocks

View File

@ -62,6 +62,8 @@ Usage: $0 <options>
--without-gcov build Backend without gcov(default)
--with-hdfs enable hdfs support
--without-hdfs disable hdfs support
--with-aws enable aws support
--without-aws disable aws support
Eg.
$0 build all
@ -85,6 +87,8 @@ OPTS=$(getopt \
-l 'without-gcov' \
-l 'with-hdfs' \
-l 'without-hdfs' \
-l 'with-aws' \
-l 'without-aws' \
-l 'help' \
-- "$@")
@ -101,6 +105,7 @@ CLEAN=
RUN_UT=
WITH_GCOV=OFF
WITH_HDFS=ON
WITH_AWS=ON
if [[ -z ${USE_AVX2} ]]; then
USE_AVX2=ON
fi
@ -131,6 +136,8 @@ else
--without-gcov) WITH_GCOV=OFF; shift ;;
--with-hdfs) WITH_HDFS=ON; shift ;;
--without-hdfs) WITH_HDFS=OFF; shift ;;
--with-aws) WITH_AWS=ON; shift ;;
--without-aws) WITH_AWS=OFF; shift ;;
-h) HELP=1; shift ;;
--help) HELP=1; shift ;;
--) shift ; break ;;
@ -157,6 +164,7 @@ echo "Get params:
RUN_UT -- $RUN_UT
WITH_GCOV -- $WITH_GCOV
WITH_HDFS -- $WITH_HDFS
WITH_AWS -- $WITH_AWS
USE_AVX2 -- $USE_AVX2
"
@ -194,7 +202,7 @@ if [ ${BUILD_BE} -eq 1 ] ; then
mkdir -p ${CMAKE_BUILD_DIR}
cd ${CMAKE_BUILD_DIR}
${CMAKE_CMD} .. -DSTARROCKS_THIRDPARTY=${STARROCKS_THIRDPARTY} -DSTARROCKS_HOME=${STARROCKS_HOME} -DCMAKE_CXX_COMPILER_LAUNCHER=ccache -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} \
-DMAKE_TEST=OFF -DWITH_HDFS=${WITH_HDFS} -DWITH_GCOV=${WITH_GCOV} -DUSE_AVX2=$USE_AVX2 -DCMAKE_EXPORT_COMPILE_COMMANDS=ON
-DMAKE_TEST=OFF -DWITH_HDFS=${WITH_HDFS} -DWITH_AWS=${WITH_AWS} -DWITH_GCOV=${WITH_GCOV} -DUSE_AVX2=$USE_AVX2 -DCMAKE_EXPORT_COMPILE_COMMANDS=ON
time make -j${PARALLEL}
make install
cd ${STARROCKS_HOME}

View File

@ -2,10 +2,11 @@
package com.starrocks.external;
public class ObejctStorageUtils {
public class ObjectStorageUtils {
private static final String SCHEME_S3A = "s3a://";
private static final String SCHEME_S3 = "s3://";
private static final String SCHEME_S3N = "s3n://";
private static final String SCHEME_OSS = "oss://";
private static final String SCHEME_S3_PREFIX = "s3";
private static final String SCHEME_OSS_PREFIX = "oss";
@ -15,12 +16,15 @@ public class ObejctStorageUtils {
public static String formatObjectStoragePath(String path) {
if (path.startsWith(SCHEME_S3)) {
return SCHEME_S3A + path.substring(5);
return SCHEME_S3A + path.substring(SCHEME_S3.length());
}
if (path.startsWith(SCHEME_S3N)) {
return SCHEME_S3A + path.substring(6);
return SCHEME_S3A + path.substring(SCHEME_S3N.length());
}
// use s3a to access oss.
if (path.startsWith(SCHEME_OSS)) {
return SCHEME_S3A + path.substring(SCHEME_OSS.length());
}
return path;
}
}

View File

@ -13,7 +13,7 @@ import com.starrocks.catalog.Column;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.external.ObejctStorageUtils;
import com.starrocks.external.ObjectStorageUtils;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -230,8 +230,8 @@ public class HiveMetaCache {
private HivePartition getPartitionByEvent(StorageDescriptor sd) throws Exception {
HdfsFileFormat format = HdfsFileFormat.fromHdfsInputFormatClass(sd.getInputFormat());
String path = ObejctStorageUtils.formatObjectStoragePath(sd.getLocation());
boolean isSplittable = ObejctStorageUtils.isObjectStorage(path) ||
String path = ObjectStorageUtils.formatObjectStoragePath(sd.getLocation());
boolean isSplittable = ObjectStorageUtils.isObjectStorage(path) ||
HdfsFileFormat.isSplittable(sd.getInputFormat());
List<HdfsFileDesc> fileDescs = client.getHdfsFileDescs(path, isSplittable, sd);
return new HivePartition(format, ImmutableList.copyOf(fileDescs), path);
@ -319,7 +319,8 @@ public class HiveMetaCache {
public void refreshColumnStats(String dbName, String tableName, List<Column> partColumns, List<String> columnNames)
throws DdlException {
try {
HiveTableColumnsKey hiveTableColumnsKey = HiveTableColumnsKey.gen(dbName, tableName, partColumns, columnNames);
HiveTableColumnsKey hiveTableColumnsKey =
HiveTableColumnsKey.gen(dbName, tableName, partColumns, columnNames);
tableColumnStatsCache.put(hiveTableColumnsKey, loadTableColumnStats(hiveTableColumnsKey));
} catch (Exception e) {
throw new DdlException("refresh table column statistic cached failed: " + e.getMessage());

View File

@ -14,7 +14,7 @@ import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.Type;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.external.ObejctStorageUtils;
import com.starrocks.external.ObjectStorageUtils;
import com.starrocks.external.hive.text.TextFileFormatDesc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@ -198,9 +198,9 @@ public class HiveMetaClient {
throw new DdlException("unsupported file format [" + sd.getInputFormat() + "]");
}
String path = ObejctStorageUtils.formatObjectStoragePath(sd.getLocation());
String path = ObjectStorageUtils.formatObjectStoragePath(sd.getLocation());
List<HdfsFileDesc> fileDescs = getHdfsFileDescs(path,
ObejctStorageUtils.isObjectStorage(path) || HdfsFileFormat.isSplittable(sd.getInputFormat()),
ObjectStorageUtils.isObjectStorage(path) || HdfsFileFormat.isSplittable(sd.getInputFormat()),
sd);
return new HivePartition(format, ImmutableList.copyOf(fileDescs), path);
} catch (NoSuchObjectException e) {

View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright {yyyy} {name of copyright owner}
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -35,6 +35,8 @@ Usage: $0 <options>
--clean clean and build ut
--run build and run ut
--gtest_filter specify test cases
--with-hdfs enable to test hdfs
--with-aws enable to test aws
Eg.
$0 build ut
@ -53,6 +55,8 @@ OPTS=$(getopt \
-l 'run' \
-l 'clean' \
-l "gtest_filter:" \
-l "with-hdfs" \
-l 'with-aws' \
-l 'help' \
-- "$@")
@ -66,12 +70,16 @@ CLEAN=0
RUN=0
TEST_FILTER=*
HELP=0
WITH_AWS=OFF
WITH_HDFS=OFF
while true; do
case "$1" in
--clean) CLEAN=1 ; shift ;;
--run) RUN=1 ; shift ;;
--gtest_filter) TEST_FILTER=$2 ; shift 2;;
--help) HELP=1 ; shift ;;
--with-aws) WITH_AWS=ON; shift ;;
--with-hdfs) WITH_HDFS=ON; shift ;;
--) shift ; break ;;
*) echo "Internal error" ; exit 1 ;;
esac
@ -102,7 +110,7 @@ fi
cd ${CMAKE_BUILD_DIR}
${CMAKE_CMD} ../ -DSTARROCKS_THIRDPARTY=${STARROCKS_THIRDPARTY} -DSTARROCKS_HOME=${STARROCKS_HOME} -DCMAKE_CXX_COMPILER_LAUNCHER=ccache \
-DWITH_HDFS=OFF -DMAKE_TEST=ON -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} -DUSE_AVX2=$USE_AVX2 -DCMAKE_EXPORT_COMPILE_COMMANDS=ON
-DWITH_HDFS=${WITH_HDFS} -DWITH_AWS=${WITH_AWS} -DMAKE_TEST=ON -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} -DUSE_AVX2=$USE_AVX2 -DCMAKE_EXPORT_COMPILE_COMMANDS=ON
time make -j${PARALLEL}

View File

@ -737,7 +737,7 @@ build_aliyun_oss_jars() {
build_aws_cpp_sdk() {
check_if_source_exist $AWS_SDK_CPP_SOURCE
cd $TP_SOURCE_DIR/$AWS_SDK_CPP_SOURCE
# only build s3 and s3-crt, you can add more components if you want.
# only build s3, s3-crt and transfer manager, you can add more components if you want.
$CMAKE_CMD -Bbuild -DBUILD_ONLY="core;s3;s3-crt;transfer" -DCMAKE_BUILD_TYPE=RelWithDebInfo -DBUILD_SHARED_LIBS=OFF -DCMAKE_INSTALL_PREFIX=${TP_INSTALL_DIR} -DENABLE_TESTING=OFF
cd build
make -j$PARALLEL && make install