diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 02e507e1f2b..a8e55859a66 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -52,7 +52,6 @@ message(STATUS "Build target arch is ${CMAKE_BUILD_TARGET_ARCH}") # Set dirs set(BASE_DIR "${CMAKE_CURRENT_SOURCE_DIR}") -set(ENV{STARROCKS_HOME} "${BASE_DIR}/../") set(BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}") set(THIRDPARTY_DIR "$ENV{STARROCKS_THIRDPARTY}/installed/") set(STARROCKS_THIRDPARTY "$ENV{STARROCKS_THIRDPARTY}") @@ -116,6 +115,11 @@ if(NOT ${MAKE_GENSRC_RESULT} EQUAL 0 AND NOT APPLE) message(FATAL_ERROR "Failed to build ${BASE_DIR}/../gensrc/") endif() +# Add common cmake prefix path and link library path +list(APPEND CMAKE_PREFIX_PATH ${THIRDPARTY_DIR}/lib/cmake) +list(APPEND CMAKE_PREFIX_PATH ${THIRDPARTY_DIR}/lib64/cmake) +link_directories(${THIRDPARTY_DIR}/lib ${THIRDPARTY_DIR}/lib64) + # Set Boost set(Boost_DEBUG FALSE) set(Boost_USE_MULTITHREADED ON) @@ -188,12 +192,22 @@ if (WITH_HDFS) set_target_properties(hdfs PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libhdfs.a) endif() -add_library(libevent STATIC IMPORTED) -set_target_properties(libevent PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libevent.a) - add_library(crypto STATIC IMPORTED) set_target_properties(crypto PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libcrypto.a) +if (WITH_AWS) + set(AWSSDK_ROOT_DIR ${THIRDPARTY_DIR}) + set(AWSSDK_COMMON_RUNTIME_LIBS "aws-crt-cpp;aws-c-auth;aws-c-cal;aws-c-common;aws-c-compression;aws-c-event-stream;aws-c-http;aws-c-io;aws-c-mqtt;aws-c-s3;aws-checksums;s2n") + foreach(lib IN ITEMS ${AWSSDK_COMMON_RUNTIME_LIBS}) + list(APPEND CMAKE_PREFIX_PATH ${THIRDPARTY_DIR}/lib64/${lib}/cmake) + endforeach() + find_package(AWSSDK REQUIRED COMPONENTS core s3 s3-crt transfer) + include_directories(${AWSSDK_INCLUDE_DIRS}) +endif() + +add_library(libevent STATIC IMPORTED) +set_target_properties(libevent PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libevent.a) + add_library(openssl STATIC IMPORTED) set_target_properties(openssl PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libssl.a) @@ -328,6 +342,10 @@ if (WITH_HDFS) set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DSTARROCKS_WITH_HDFS") endif() +if (WITH_AWS) + set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DSTARROCKS_WITH_AWS") +endif() + if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0) set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -faligned-new") endif() @@ -447,6 +465,7 @@ set(STARROCKS_LINK_LIBS Tools Geo Plugin + ObjectStore ${WL_END_GROUP} ) @@ -467,6 +486,13 @@ if (WITH_HDFS) include_directories(${JAVA_HOME}/include/linux) endif() +if (WITH_AWS) + set(STARROCKS_DEPENDENCIES ${STARROCKS_DEPENDENCIES} + ${AWSSDK_LINK_LIBRARIES} + ${AWSSDK_PLATFORM_DEPS} + ) +endif() + # Set thirdparty libraries set(STARROCKS_DEPENDENCIES ${STARROCKS_DEPENDENCIES} @@ -587,6 +613,7 @@ add_subdirectory(${SRC_DIR}/geo) add_subdirectory(${SRC_DIR}/gutil) add_subdirectory(${SRC_DIR}/http) add_subdirectory(${SRC_DIR}/io) +add_subdirectory(${SRC_DIR}/object_store) add_subdirectory(${SRC_DIR}/storage) add_subdirectory(${SRC_DIR}/runtime) add_subdirectory(${SRC_DIR}/serde) diff --git a/be/src/common/compiler_util.h b/be/src/common/compiler_util.h index 4ab8894a73a..f65e826aa70 100644 --- a/be/src/common/compiler_util.h +++ b/be/src/common/compiler_util.h @@ -20,6 +20,7 @@ // under the License. #pragma once + // Compiler hint that this branch is likely or unlikely to // be taken. Take from the "What all programmers should know // about memory" paper. @@ -48,6 +49,7 @@ #define ALIGN_CACHE_LINE __attribute__((aligned(CACHE_LINE_SIZE))) +#ifndef DIAGNOSTIC_PUSH #ifdef __clang__ #define DIAGNOSTIC_PUSH _Pragma("clang diagnostic push") #define DIAGNOSTIC_POP _Pragma("clang diagnostic pop") @@ -60,9 +62,10 @@ #else #error("Unknown compiler") #endif +#endif // ifndef DIAGNOSTIC_PUSH +#ifndef DIAGNOSTIC_IGNORE #define PRAGMA(TXT) _Pragma(#TXT) - #ifdef __clang__ #define DIAGNOSTIC_IGNORE(XXX) PRAGMA(clang diagnostic ignored XXX) #elif defined(__GNUC__) @@ -72,3 +75,4 @@ #else #define DIAGNOSTIC_IGNORE(XXX) #endif +#endif // ifndef DIAGNOSTIC_IGNORE \ No newline at end of file diff --git a/be/src/common/config.h b/be/src/common/config.h index 3da84bc549e..32be1052be4 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -681,6 +681,12 @@ CONF_Bool(use_hdfs_pread, "true"); // default: true CONF_Bool(rewrite_partial_segment, "true"); +// properties to access aws s3 +CONF_String(aws_access_key_id, ""); +CONF_String(aws_secret_access_key, ""); +CONF_String(aws_s3_endpoint, ""); +CONF_Int64(aws_s3_max_connection, "102400"); + } // namespace config } // namespace starrocks diff --git a/be/src/env/env_hdfs.cpp b/be/src/env/env_hdfs.cpp index db1d17e044c..5c180bd81d0 100644 --- a/be/src/env/env_hdfs.cpp +++ b/be/src/env/env_hdfs.cpp @@ -10,8 +10,10 @@ namespace starrocks { -HdfsRandomAccessFile::HdfsRandomAccessFile(hdfsFS fs, std::string filename, bool usePread) - : _opened(false), _fs(fs), _file(nullptr), _filename(std::move(filename)), _usePread(usePread) {} +// ================================== HdfsRandomAccessFile ========================================== + +HdfsRandomAccessFile::HdfsRandomAccessFile(hdfsFS fs, const std::string& file_name, size_t file_size, bool usePread) + : _opened(false), _fs(fs), _file(nullptr), _file_name(file_name), _file_size(file_size), _usePread(usePread) {} HdfsRandomAccessFile::~HdfsRandomAccessFile() noexcept { close(); @@ -20,16 +22,16 @@ HdfsRandomAccessFile::~HdfsRandomAccessFile() noexcept { Status HdfsRandomAccessFile::open() { DCHECK(!_opened); if (_fs) { - _file = hdfsOpenFile(_fs, _filename.c_str(), O_RDONLY, 0, 0, 0); + _file = hdfsOpenFile(_fs, _file_name.c_str(), O_RDONLY, 0, 0, 0); if (_file == nullptr) { - return Status::InternalError(fmt::format("open file failed, file={}", _filename)); + return Status::InternalError(fmt::format("open file failed, file={}", _file_name)); } } _opened = true; return Status::OK(); } -void HdfsRandomAccessFile::close() noexcept { +void HdfsRandomAccessFile::close() { if (_opened) { if (_fs && _file) { hdfsCloseFile(_fs, _file); @@ -38,56 +40,58 @@ void HdfsRandomAccessFile::close() noexcept { } } -static Status read_at_internal(hdfsFS fs, hdfsFile file, const std::string& file_name, int64_t offset, Slice* res, - bool usePread) { - if (usePread) { - if (hdfsPreadFully(fs, file, offset, res->data, res->size) == -1) { - return Status::IOError(strings::Substitute("fail to hdfsPreadFully file, file=$0, error=$1", file_name, +Status HdfsRandomAccessFile::_read_at(int64_t offset, char* data, size_t size, size_t* read_size) const { + if (_usePread) { + *read_size = size; + if (hdfsPreadFully(_fs, _file, offset, data, size) == -1) { + return Status::IOError(strings::Substitute("fail to hdfsPreadFully file, file=$0, error=$1", _file_name, get_hdfs_err_msg())); } } else { - auto cur_offset = hdfsTell(fs, file); + auto cur_offset = hdfsTell(_fs, _file); if (cur_offset == -1) { return Status::IOError( - strings::Substitute("fail to get offset, file=$0, error=$1", file_name, get_hdfs_err_msg())); + strings::Substitute("fail to get offset, file=$0, error=$1", _file_name, get_hdfs_err_msg())); } if (cur_offset != offset) { - if (hdfsSeek(fs, file, offset)) { + if (hdfsSeek(_fs, _file, offset)) { return Status::IOError(strings::Substitute("fail to seek offset, file=$0, offset=$1, error=$2", - file_name, offset, get_hdfs_err_msg())); + _file_name, offset, get_hdfs_err_msg())); } } size_t bytes_read = 0; - while (bytes_read < res->size) { - size_t to_read = res->size - bytes_read; - auto hdfs_res = hdfsRead(fs, file, res->data + bytes_read, to_read); + while (bytes_read < size) { + size_t to_read = size - bytes_read; + auto hdfs_res = hdfsRead(_fs, _file, data + bytes_read, to_read); if (hdfs_res < 0) { - return Status::IOError( - strings::Substitute("fail to hdfsRead file, file=$0, error=$1", file_name, get_hdfs_err_msg())); + return Status::IOError(strings::Substitute("fail to hdfsRead file, file=$0, error=$1", _file_name, + get_hdfs_err_msg())); } else if (hdfs_res == 0) { break; } bytes_read += hdfs_res; } - res->size = bytes_read; + *read_size = bytes_read; } return Status::OK(); } Status HdfsRandomAccessFile::read(uint64_t offset, Slice* res) const { DCHECK(_opened); - RETURN_IF_ERROR(read_at_internal(_fs, _file, _filename, offset, res, _usePread)); + size_t read_size = 0; + Status st = _read_at(offset, res->data, res->size, &read_size); + if (!st.ok()) return st; + res->size = read_size; return Status::OK(); } Status HdfsRandomAccessFile::read_at(uint64_t offset, const Slice& res) const { - DCHECK(_opened); - Slice slice = res; - RETURN_IF_ERROR(read_at_internal(_fs, _file, _filename, offset, &slice, _usePread)); - if (slice.size != res.size) { + size_t read_size = 0; + RETURN_IF_ERROR(_read_at(offset, res.data, res.size, &read_size)); + if (read_size != res.size) { return Status::InternalError( - strings::Substitute("fail to read enough data, file=$0, offset=$1, size=$2, expect=$3", _filename, - offset, slice.size, res.size)); + strings::Substitute("fail to read enough data, file=$0, offset=$1, size=$2, expect=$3", _file_name, + offset, read_size, res.size)); } return Status::OK(); } @@ -97,8 +101,53 @@ Status HdfsRandomAccessFile::readv_at(uint64_t offset, const Slice* res, size_t return Status::InternalError("HdfsRandomAccessFile::readv_at not implement"); } -Status HdfsRandomAccessFile::size(uint64_t* size) const { - // TODO: implement - return Status::InternalError("HdfsRandomAccessFile::size not implement"); +// =================================== S3RandomAccessFile ========================================= + +S3RandomAccessFile::S3RandomAccessFile(S3Client* client, const std::string& bucket, const std::string& object, + size_t object_size) + : _client(client), _bucket(bucket), _object(object), _object_size(object_size) { + _file_name = "s3://" + _bucket + "/" + _object; } + +Status S3RandomAccessFile::read(uint64_t offset, Slice* res) const { + size_t read_size = 0; + Status st = _client->get_object_range(_bucket, _object, offset, res->size, res->data, &read_size); + if (!st.ok()) return st; + res->size = read_size; + return st; +} + +Status S3RandomAccessFile::read_at(uint64_t offset, const Slice& res) const { + size_t read_size = 0; + RETURN_IF_ERROR(_client->get_object_range(_bucket, _object, offset, res.size, res.data, &read_size)); + if (read_size != res.size) { + return Status::InternalError( + strings::Substitute("fail to read enough data, file=$0, offset=$1, size=$2, expect=$3", _file_name, + offset, read_size, res.size)); + } + return Status::OK(); +} + +Status S3RandomAccessFile::readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const { + // TODO: implement + return Status::InternalError("S3RandomAccessFile::readv_at not implement"); +} + +std::shared_ptr create_random_access_hdfs_file(const HdfsFsHandle& handle, + const std::string& file_path, size_t file_size, + bool usePread) { + if (handle.type == HdfsFsHandle::Type::HDFS) { + return std::make_shared(handle.hdfs_fs, file_path, file_size, usePread); + } else if (handle.type == HdfsFsHandle::Type::S3) { + const std::string& nn = handle.namenode; + const std::string bucket = get_bucket_from_namenode(nn); + const std::string object = file_path.substr(nn.size(), file_path.size() - nn.size()); + return std::make_shared(handle.s3_client, bucket, object, file_size); + } else { + CHECK(false) << strings::Substitute("Unknown HdfsFsHandle::Type $0", static_cast(handle.type)); + __builtin_unreachable(); + return nullptr; + } +} + } // namespace starrocks diff --git a/be/src/env/env_hdfs.h b/be/src/env/env_hdfs.h index 68c9a5889fc..6e5ef771cfd 100644 --- a/be/src/env/env_hdfs.h +++ b/be/src/env/env_hdfs.h @@ -5,33 +5,73 @@ #include #include "env/env.h" +#include "object_store/s3_client.h" namespace starrocks { +struct HdfsFsHandle { + enum class Type { LOCAL, HDFS, S3 }; + Type type; + std::string namenode; + hdfsFS hdfs_fs; + S3Client* s3_client; +}; + // class for remote read hdfs file // Now this is not thread-safe. class HdfsRandomAccessFile : public RandomAccessFile { public: - HdfsRandomAccessFile(hdfsFS fs, std::string filename, bool usePread); + HdfsRandomAccessFile(hdfsFS fs, const std::string& file_name, size_t file_size, bool usePread); virtual ~HdfsRandomAccessFile() noexcept; - + void close(); Status open(); - void close() noexcept; Status read(uint64_t offset, Slice* res) const override; Status read_at(uint64_t offset, const Slice& res) const override; Status readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const override; - Status size(uint64_t* size) const override; - const std::string& file_name() const override { return _filename; } + Status size(uint64_t* size) const override { + *size = _file_size; + return Status::OK(); + } + const std::string& file_name() const override { return _file_name; } hdfsFile hdfs_file() const { return _file; } private: + Status _read_at(int64_t offset, char* data, size_t size, size_t* read_size) const; bool _opened; hdfsFS _fs; hdfsFile _file; - std::string _filename; + std::string _file_name; + size_t _file_size; bool _usePread; }; +class S3RandomAccessFile : public RandomAccessFile { +public: + S3RandomAccessFile(S3Client* client, const std::string& bucket, const std::string& object, size_t object_size = 0); + virtual ~S3RandomAccessFile() noexcept = default; + + Status read(uint64_t offset, Slice* res) const override; + Status read_at(uint64_t offset, const Slice& res) const override; + Status readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const override; + + Status size(uint64_t* size) const override { + *size = _object_size; + return Status::OK(); + } + const std::string& file_name() const override { return _file_name; } + +private: + void _init(const HdfsFsHandle& handle); + S3Client* _client; + std::string _file_name; + std::string _bucket; + std::string _object; + size_t _object_size; +}; + +std::shared_ptr create_random_access_hdfs_file(const HdfsFsHandle& handle, + const std::string& file_path, size_t file_size, + bool usePread); } // namespace starrocks diff --git a/be/src/env/env_posix.cpp b/be/src/env/env_posix.cpp index 7291c5febe5..484a3a79442 100644 --- a/be/src/env/env_posix.cpp +++ b/be/src/env/env_posix.cpp @@ -254,7 +254,7 @@ public: PosixRandomAccessFile(std::string filename, int fd) : _filename(std::move(filename)), _fd(fd) {} ~PosixRandomAccessFile() override { int res; - RETRY_ON_EINTR(res, close(_fd)); + RETRY_ON_EINTR(res, ::close(_fd)); if (res != 0) { LOG(WARNING) << "close file failed, name=" << _filename << ", msg=" << errno_to_string(errno); } diff --git a/be/src/exec/vectorized/hdfs_scan_node.cpp b/be/src/exec/vectorized/hdfs_scan_node.cpp index 630a1f4f144..1e54b14cb2c 100644 --- a/be/src/exec/vectorized/hdfs_scan_node.cpp +++ b/be/src/exec/vectorized/hdfs_scan_node.cpp @@ -13,6 +13,7 @@ #include "exprs/vectorized/runtime_filter.h" #include "fmt/core.h" #include "glog/logging.h" +#include "gutil/strings/substitute.h" #include "runtime/current_thread.h" #include "runtime/exec_env.h" #include "runtime/hdfs/hdfs_fs_cache.h" @@ -205,6 +206,7 @@ Status HdfsScanNode::_create_and_init_scanner(RuntimeState* state, const HdfsFil scanner_params.runtime_filter_collector = &_runtime_filter_collector; scanner_params.scan_ranges = hdfs_file_desc.splits; scanner_params.fs = hdfs_file_desc.fs; + scanner_params.fs_handle_type = hdfs_file_desc.fs_handle_type; scanner_params.tuple_desc = _tuple_desc; scanner_params.materialize_slots = _materialize_slots; scanner_params.materialize_index_in_chunk = _materialize_index_in_chunk; @@ -619,7 +621,7 @@ Status HdfsScanNode::_find_and_insert_hdfs_file(const THdfsScanRange& scan_range // if found, add file splits to hdfs file desc // if not found, create for (auto& item : _hdfs_files) { - if (item->partition_id == scan_range.partition_id && item->path == scan_range_path) { + if (item->partition_id == scan_range.partition_id && item->scan_range_path == scan_range_path) { item->splits.emplace_back(&scan_range); return Status::OK(); } @@ -637,8 +639,7 @@ Status HdfsScanNode::_find_and_insert_hdfs_file(const THdfsScanRange& scan_range native_file_path = file_path.native(); } std::string namenode; - RETURN_IF_ERROR(get_name_node_from_path(native_file_path, &namenode)); - _is_hdfs_fs = is_hdfs_path(namenode.c_str()); + RETURN_IF_ERROR(get_namenode_from_path(native_file_path, &namenode)); bool usePread = starrocks::config::use_hdfs_pread || is_object_storage_path(namenode.c_str()); if (namenode.compare("default") == 0) { @@ -646,26 +647,26 @@ Status HdfsScanNode::_find_and_insert_hdfs_file(const THdfsScanRange& scan_range auto* env = Env::Default(); std::unique_ptr file; env->new_random_access_file(native_file_path, &file); - auto* hdfs_file_desc = _pool->add(new HdfsFileDesc()); - hdfs_file_desc->hdfs_fs = nullptr; hdfs_file_desc->fs = std::move(file); + hdfs_file_desc->fs_handle_type = HdfsFsHandle::Type::LOCAL; hdfs_file_desc->partition_id = scan_range.partition_id; - hdfs_file_desc->path = scan_range_path; + hdfs_file_desc->scan_range_path = scan_range_path; hdfs_file_desc->file_length = scan_range.file_length; hdfs_file_desc->splits.emplace_back(&scan_range); hdfs_file_desc->hdfs_file_format = scan_range.file_format; hdfs_file_desc->open_limit = nullptr; _hdfs_files.emplace_back(hdfs_file_desc); } else { - hdfsFS hdfs; + HdfsFsHandle handle; std::atomic* open_limit = nullptr; - RETURN_IF_ERROR(HdfsFsCache::instance()->get_connection(namenode, &hdfs, &open_limit)); + RETURN_IF_ERROR(HdfsFsCache::instance()->get_connection(namenode, &handle, &open_limit)); auto* hdfs_file_desc = _pool->add(new HdfsFileDesc()); - hdfs_file_desc->hdfs_fs = hdfs; - hdfs_file_desc->fs = std::make_shared(hdfs, native_file_path, usePread); + size_t file_size = scan_range.file_length; + hdfs_file_desc->fs_handle_type = handle.type; + hdfs_file_desc->fs = create_random_access_hdfs_file(handle, native_file_path, file_size, usePread); hdfs_file_desc->partition_id = scan_range.partition_id; - hdfs_file_desc->path = scan_range_path; + hdfs_file_desc->scan_range_path = scan_range_path; hdfs_file_desc->file_length = scan_range.file_length; hdfs_file_desc->splits.emplace_back(&scan_range); hdfs_file_desc->hdfs_file_format = scan_range.file_format; diff --git a/be/src/exec/vectorized/hdfs_scan_node.h b/be/src/exec/vectorized/hdfs_scan_node.h index b2980602afc..c49ddcb466d 100644 --- a/be/src/exec/vectorized/hdfs_scan_node.h +++ b/be/src/exec/vectorized/hdfs_scan_node.h @@ -16,12 +16,12 @@ namespace starrocks::vectorized { struct HdfsFileDesc { - hdfsFS hdfs_fs; THdfsFileFormat::type hdfs_file_format; + HdfsFsHandle::Type fs_handle_type; std::shared_ptr fs; int partition_id = 0; - std::string path; + std::string scan_range_path; int64_t file_length = 0; std::vector splits; @@ -153,8 +153,6 @@ private: mutable SpinLock _status_mutex; Status _status; RuntimeState* _runtime_state = nullptr; - bool _is_hdfs_fs = true; - std::atomic_bool _pending_token = true; std::atomic _scanner_submit_count = 0; diff --git a/be/src/exec/vectorized/hdfs_scanner.cpp b/be/src/exec/vectorized/hdfs_scanner.cpp index d210b8002cf..f28124f0037 100644 --- a/be/src/exec/vectorized/hdfs_scanner.cpp +++ b/be/src/exec/vectorized/hdfs_scanner.cpp @@ -116,7 +116,10 @@ Status HdfsScanner::open(RuntimeState* runtime_state) { return Status::OK(); } #ifndef BE_TEST - RETURN_IF_ERROR(down_cast(_scanner_params.fs.get())->open()); + if (_scanner_params.fs_handle_type == HdfsFsHandle::Type::HDFS) { + auto* file = down_cast(_scanner_params.fs.get()); + RETURN_IF_ERROR(file->open()); + } #endif _build_file_read_param(); auto status = do_open(runtime_state); @@ -142,7 +145,10 @@ void HdfsScanner::close(RuntimeState* runtime_state) noexcept { } do_close(runtime_state); #ifndef BE_TEST - down_cast(_scanner_params.fs.get())->close(); + if (_scanner_params.fs_handle_type == HdfsFsHandle::Type::HDFS) { + auto* file = down_cast(_scanner_params.fs.get()); + file->close(); + } if (_is_open) { (*_scanner_params.open_limit)--; } @@ -190,11 +196,12 @@ void HdfsScanner::update_counter() { if (_scanner_params.fs == nullptr) return; HdfsReadStats hdfs_stats; - auto hdfs_file = down_cast(_scanner_params.fs.get())->hdfs_file(); - if (hdfs_file == nullptr) return; - // Hdfslib only supports obtaining statistics of hdfs file system. - // For other systems such as s3, calling this function will cause be crash. - if (_scanner_params.parent->_is_hdfs_fs) { + + if (_scanner_params.fs_handle_type == HdfsFsHandle::Type::HDFS) { + auto hdfs_file = down_cast(_scanner_params.fs.get())->hdfs_file(); + if (hdfs_file == nullptr) return; + // Hdfslib only supports obtaining statistics of hdfs file system. + // For other systems such as s3, calling this function will cause be crash. get_hdfs_statistics(hdfs_file, &hdfs_stats); } diff --git a/be/src/exec/vectorized/hdfs_scanner.h b/be/src/exec/vectorized/hdfs_scanner.h index 758f1f4658e..4217375e538 100644 --- a/be/src/exec/vectorized/hdfs_scanner.h +++ b/be/src/exec/vectorized/hdfs_scanner.h @@ -7,9 +7,11 @@ #include "column/chunk.h" #include "env/env.h" +#include "env/env_hdfs.h" #include "exprs/expr_context.h" #include "runtime/descriptors.h" #include "util/runtime_profile.h" + namespace starrocks::parquet { class FileReader; } @@ -70,6 +72,7 @@ struct HdfsScannerParams { // file fd (local file or hdfs file) std::shared_ptr fs = nullptr; + HdfsFsHandle::Type fs_handle_type; const TupleDescriptor* tuple_desc; diff --git a/be/src/object_store/CMakeLists.txt b/be/src/object_store/CMakeLists.txt new file mode 100644 index 00000000000..10631236219 --- /dev/null +++ b/be/src/object_store/CMakeLists.txt @@ -0,0 +1,19 @@ +# This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited. + + +# where to put generated libraries +set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/object_store") + +# where to put generated binaries +set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/object_store") + +set(EXEC_FILES placeholder.cpp) + +if (WITH_AWS) +set(EXEC_FILES ${EXEC_FILES} + s3_client.cpp) +endif() + +add_library(ObjectStore STATIC + ${EXEC_FILES} +) diff --git a/be/src/object_store/object_store_client.h b/be/src/object_store/object_store_client.h new file mode 100644 index 00000000000..8544543522f --- /dev/null +++ b/be/src/object_store/object_store_client.h @@ -0,0 +1,53 @@ +// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited. + +#pragma once + +#include + +#include "common/status.h" + +namespace starrocks { + +// base class for object store client +class ObjectStoreClient { +public: + ObjectStoreClient() = default; + virtual ~ObjectStoreClient() = default; + + /* + * Bucket Operation + */ + virtual Status create_bucket(const std::string& bucket_name) = 0; + + virtual Status delete_bucket(const std::string& bucket_name) = 0; + + /* + * Object Operation + */ + virtual Status put_object(const std::string& bucket_name, const std::string& object_key, + const std::string& object_path) = 0; + + virtual Status put_string_object(const std::string& bucket_name, const std::string& object_key, + const std::string& object_value) = 0; + + virtual Status get_object(const std::string& bucket_name, const std::string& object_key, + const std::string& object_path) = 0; + + virtual Status get_object_range(const std::string& bucket_name, const std::string& object_key, size_t offset, + size_t length, std::string* object_value, size_t* read_bytes) = 0; + + // `object_value` should already be allocated at least `length` bytes + virtual Status get_object_range(const std::string& bucket_name, const std::string& object_key, size_t offset, + size_t length, char* object_value, size_t* read_bytes) = 0; + + virtual Status exist_object(const std::string& bucket_name, const std::string& object_key) = 0; + + virtual Status get_object_size(const std::string& bucket_name, const std::string& object_key, size_t* size) = 0; + + virtual Status delete_object(const std::string& bucket_name, const std::string& object_key) = 0; + + virtual Status list_objects(const std::string& bucket_name, const std::string& object_prefix, + std::vector* result) = 0; +}; + +} // namespace starrocks diff --git a/be/src/object_store/placeholder.cpp b/be/src/object_store/placeholder.cpp new file mode 100644 index 00000000000..e69de29bb2d diff --git a/be/src/object_store/s3_client.cpp b/be/src/object_store/s3_client.cpp new file mode 100644 index 00000000000..0d3d1be3aaf --- /dev/null +++ b/be/src/object_store/s3_client.cpp @@ -0,0 +1,356 @@ +// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited. + +#include "object_store/s3_client.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "common/logging.h" +#include "gutil/strings/strip.h" +#include "gutil/strings/substitute.h" + +namespace starrocks { + +static inline Aws::String to_aws_string(const std::string& s) { + return Aws::String(s.data(), s.size()); +} + +S3Client::S3Client(const Aws::Client::ClientConfiguration& config, const S3Credential* cred, bool use_transfer_manager) + : _config(config) { + const char* access_key_id = (cred == nullptr || cred->access_key_id.empty()) ? getenv("AWS_ACCESS_KEY_ID") + : cred->access_key_id.c_str(); + const char* secret_access_key = (cred == nullptr || cred->secret_access_key.empty()) + ? getenv("AWS_SECRET_ACCESS_KEY") + : cred->secret_access_key.c_str(); + std::shared_ptr credentials = + std::make_shared(access_key_id, secret_access_key); + + _client = std::make_shared(credentials, _config); + if (use_transfer_manager) { + Aws::Transfer::TransferManagerConfiguration transfer_config(_get_transfer_manager_executor()); + transfer_config.s3Client = _client; + transfer_config.transferBufferMaxHeapSize = kTransferManagerMaxBufferSize; + transfer_config.bufferSize = kTransferManagerSingleBufferSize; + _transfer_manager = Aws::Transfer::TransferManager::Create(transfer_config); + } else { + _transfer_manager = nullptr; + } +} + +Status S3Client::create_bucket(const std::string& bucket_name) { + Aws::S3::Model::BucketLocationConstraint constraint = + Aws::S3::Model::BucketLocationConstraintMapper::GetBucketLocationConstraintForName(_config.region); + Aws::S3::Model::CreateBucketConfiguration bucket_config; + bucket_config.SetLocationConstraint(constraint); + + Aws::S3::Model::CreateBucketRequest request; + request.SetCreateBucketConfiguration(bucket_config); + request.SetBucket(to_aws_string(bucket_name)); + + Aws::S3::Model::CreateBucketOutcome outcome = _client->CreateBucket(request); + + if (outcome.IsSuccess()) { + return Status::OK(); + } else { + std::string error = + strings::Substitute("Create Bucket $0 failed. $1.", bucket_name, outcome.GetError().GetMessage()); + LOG(ERROR) << error; + return Status::IOError(error); + } +} + +Status S3Client::delete_bucket(const std::string& bucket_name) { + Aws::S3::Model::DeleteBucketRequest request; + request.SetBucket(to_aws_string(bucket_name)); + + Aws::S3::Model::DeleteBucketOutcome outcome = _client->DeleteBucket(request); + + if (outcome.IsSuccess()) { + return Status::OK(); + } else { + std::string error = + strings::Substitute("Delete Bucket $0 failed. $1.", bucket_name, outcome.GetError().GetMessage()); + LOG(ERROR) << error; + return Status::IOError(error); + } +} + +Status S3Client::put_object(const std::string& bucket_name, const std::string& object_key, + const std::string& object_path) { + if (_transfer_manager) { + auto handle = _transfer_manager->UploadFile(to_aws_string(object_path), to_aws_string(bucket_name), + to_aws_string(object_key), Aws::DEFAULT_CONTENT_TYPE, + Aws::Map()); + handle->WaitUntilFinished(); + if (handle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED) { + // TODO: log error + return Status::IOError(strings::Substitute("Put Object $0 failed.", object_key)); + } else { + return Status::OK(); + } + } else { + Aws::S3::Model::PutObjectRequest request; + request.SetBucket(to_aws_string(bucket_name)); + request.SetKey(to_aws_string(object_key)); + std::shared_ptr stream = Aws::MakeShared( + Aws::Utils::ARRAY_ALLOCATION_TAG, object_path.c_str(), std::ios_base::in | std::ios_base::binary); + if (!stream->good()) { + std::string error = + strings::Substitute("Put Object $0 failed, fail to open local file $1.", object_key, object_path); + LOG(ERROR) << error; + return Status::IOError(error); + } + request.SetBody(stream); + Aws::S3::Model::PutObjectOutcome outcome = _client->PutObject(request); + + if (outcome.IsSuccess()) { + return Status::OK(); + } else { + std::string error = + strings::Substitute("Put Object $0 failed. $1.", object_key, outcome.GetError().GetMessage()); + LOG(ERROR) << error; + return Status::IOError(error); + } + } +} + +Status S3Client::put_string_object(const std::string& bucket_name, const std::string& object_key, + const std::string& object_value) { + std::shared_ptr stream = Aws::MakeShared("", object_value); + + Aws::S3::Model::PutObjectRequest request; + request.SetBucket(to_aws_string(bucket_name)); + request.SetKey(to_aws_string(object_key)); + request.SetBody(stream); + + Aws::S3::Model::PutObjectOutcome outcome = _client->PutObject(request); + + if (outcome.IsSuccess()) { + return Status::OK(); + } else { + std::string error = + strings::Substitute("Put Object $0 failed. $1.", object_key, outcome.GetError().GetMessage()); + LOG(ERROR) << error; + return Status::IOError(error); + } +} + +Status S3Client::get_object_range(const std::string& bucket_name, const std::string& object_key, size_t offset, + size_t length, std::string* object_value, size_t* read_bytes) { + object_value->resize(length); + return get_object_range(bucket_name, object_key, offset, length, (char*)object_value->data(), read_bytes); +} + +Status S3Client::get_object_range(const std::string& bucket_name, const std::string& object_key, size_t offset, + size_t length, char* object_value, size_t* read_bytes) { + *read_bytes = 0; + length = (length ? length : 1); + char buffer[128]; + int ret = snprintf(buffer, sizeof(buffer), "bytes=%lu-%lu", offset, offset + length - 1); + if (ret < 0) { + std::string error = strings::Substitute("Get Object Range $0 failed, fail to set range.", object_key); + LOG(ERROR) << error; + return Status::IOError(error); + } + Aws::String range(buffer); + + Aws::S3::Model::GetObjectRequest request; + request.SetBucket(to_aws_string(bucket_name)); + request.SetKey(to_aws_string(object_key)); + request.SetRange(range); + + Aws::S3::Model::GetObjectOutcome outcome = _client->GetObject(request); + + if (outcome.IsSuccess()) { + if (object_value) { + Aws::IOStream& body = outcome.GetResult().GetBody(); + body.read(object_value, length); + *read_bytes = body.gcount(); + if (body.gcount() != length) { + std::string error = strings::Substitute("Get Object Range $0 failed. expected($1), read($2).", + object_key, length, *read_bytes); + LOG(ERROR) << error; + return Status::IOError(error); + } + } + return Status::OK(); + } else { + std::string error = + strings::Substitute("Get Object Range $0 failed. $1.", object_key, outcome.GetError().GetMessage()); + LOG(ERROR) << error; + return Status::IOError(error); + } +} + +Status S3Client::get_object(const std::string& bucket_name, const std::string& object_key, + const std::string& object_path) { + if (object_path.empty()) { + return Status::IOError(strings::Substitute("Get Object $0 failed, path empty.", object_key)); + } + if (_transfer_manager) { + Aws::Transfer::CreateDownloadStreamCallback stream; + if (!object_path.empty()) { + stream = [=]() -> Aws::IOStream* { + return Aws::New(Aws::Utils::ARRAY_ALLOCATION_TAG, object_path, + std::ios_base::out | std::ios_base::trunc); + }; + } else { + stream = [=]() -> Aws::IOStream* { return Aws::New(""); }; + } + auto handle = _transfer_manager->DownloadFile(to_aws_string(bucket_name), to_aws_string(object_key), + std::move(stream)); + handle->WaitUntilFinished(); + if (handle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED) { + // TODO: log error + return Status::IOError(strings::Substitute("Get Object $0 failed.", object_key)); + } else { + return Status::OK(); + } + } else { + Aws::S3::Model::GetObjectRequest request; + request.SetBucket(to_aws_string(bucket_name)); + request.SetKey(to_aws_string(object_key)); + + if (!object_path.empty()) { + auto stream = [=]() -> Aws::IOStream* { + return Aws::New(Aws::Utils::ARRAY_ALLOCATION_TAG, object_path, + std::ios_base::out | std::ios_base::trunc); + }; + request.SetResponseStreamFactory(std::move(stream)); + } + + Aws::S3::Model::GetObjectOutcome outcome = _client->GetObject(request); + + if (outcome.IsSuccess()) { + return Status::OK(); + } else { + std::string error = + strings::Substitute("Get Object $0 failed. $1.", object_key, outcome.GetError().GetMessage()); + LOG(ERROR) << error; + return Status::IOError(error); + } + } +} + +Status S3Client::_head_object(const std::string& bucket_name, const std::string& object_key, size_t* size) { + Aws::S3::Model::HeadObjectRequest request; + request.SetBucket(to_aws_string(bucket_name)); + request.SetKey(to_aws_string(object_key)); + + Aws::S3::Model::HeadObjectOutcome outcome = _client->HeadObject(request); + if (!outcome.IsSuccess()) { + if (_is_not_found(outcome.GetError().GetErrorType())) { + return Status::NotFound(strings::Substitute("Object $0 not found.", object_key)); + } else { + std::string error = + strings::Substitute("Head Object $0 failed. $1.", object_key, outcome.GetError().GetMessage()); + LOG(ERROR) << error; + return Status::IOError(error); + } + } else { + if (size != nullptr) { + *size = outcome.GetResult().GetContentLength(); + } + return Status::OK(); + } +} + +Status S3Client::exist_object(const std::string& bucket_name, const std::string& object_key) { + return _head_object(bucket_name, object_key, nullptr /* size */); +} + +Status S3Client::get_object_size(const std::string& bucket_name, const std::string& object_key, size_t* size) { + return _head_object(bucket_name, object_key, size); +} + +Status S3Client::delete_object(const std::string& bucket_name, const std::string& object_key) { + Aws::S3::Model::DeleteObjectRequest request; + request.SetBucket(to_aws_string(bucket_name)); + request.SetKey(to_aws_string(object_key)); + + Aws::S3::Model::DeleteObjectOutcome outcome = _client->DeleteObject(request); + + if (outcome.IsSuccess()) { + return Status::OK(); + } else { + std::string error = + strings::Substitute("Delete Object $0 failed. $1.", object_key, outcome.GetError().GetMessage()); + LOG(ERROR) << error; + return Status::IOError(error); + } +} + +Status S3Client::list_objects(const std::string& bucket_name, const std::string& object_prefix, + std::vector* result) { + result->clear(); + // S3 paths don't start with '/' + std::string prefix = StripPrefixString(object_prefix, "/"); + // S3 paths better end with '/', otherwise we might also get a list of files + // in a directory for which our path is a prefix + if (prefix.size() > 0 && prefix.back() != '/') { + prefix.push_back('/'); + } + // the starting object marker + Aws::String marker; + + // get info of bucket+object + while (1) { + Aws::S3::Model::ListObjectsRequest request; + request.SetBucket(to_aws_string(bucket_name)); + request.SetMaxKeys(kListObjectMaxKeys); + request.SetPrefix(to_aws_string(prefix)); + request.SetMarker(marker); + + Aws::S3::Model::ListObjectsOutcome outcome = _client->ListObjects(request); + if (!outcome.IsSuccess()) { + if (_is_not_found(outcome.GetError().GetErrorType())) { + return Status::OK(); + } + std::string error = strings::Substitute("List Objects prefix $0 failed. $1.", object_prefix, + outcome.GetError().GetMessage()); + LOG(ERROR) << error; + return Status::IOError(error); + } + const Aws::S3::Model::ListObjectsResult& res = outcome.GetResult(); + const Aws::Vector& objs = res.GetContents(); + for (auto o : objs) { + const Aws::String& key = o.GetKey(); + // Our path should be a prefix of the fetched value + std::string keystr(key.c_str(), key.size()); + if (keystr.find(prefix) != 0) { + return Status::IOError(strings::Substitute("List Objects prefix $0 not match.", object_prefix)); + } + const std::string fname = keystr.substr(prefix.size()); + result->push_back(fname); + } + + // If there are no more entries, then we are done. + if (!res.GetIsTruncated()) { + break; + } + // The new starting point + marker = res.GetNextMarker(); + if (marker.empty()) { + // If response does not include the NextMaker and it is + // truncated, you can use the value of the last Key in the response + // as the marker in the subsequent request because all objects + // are returned in alphabetical order + marker = objs.back().GetKey(); + } + } + return Status::OK(); +} + +} // namespace starrocks diff --git a/be/src/object_store/s3_client.h b/be/src/object_store/s3_client.h new file mode 100644 index 00000000000..6f83c252b46 --- /dev/null +++ b/be/src/object_store/s3_client.h @@ -0,0 +1,87 @@ +// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited. + +#pragma once + +#include +#include + +#include + +#include "object_store/object_store_client.h" + +namespace starrocks { + +class S3Credential { +public: + std::string access_key_id; + std::string secret_access_key; +}; + +class S3Client final : public ObjectStoreClient { +public: + S3Client(const Aws::Client::ClientConfiguration& config, const S3Credential* cred = nullptr, + bool use_transfer_manager = false); + ~S3Client() = default; + + /* + * Bucket Operation + */ + Status create_bucket(const std::string& bucket_name) override; + + Status delete_bucket(const std::string& bucket_name) override; + + /* + * Object Operation + */ + Status put_object(const std::string& bucket_name, const std::string& object_key, + const std::string& object_path) override; + + Status put_string_object(const std::string& bucket_name, const std::string& object_key, + const std::string& object_value) override; + + Status get_object(const std::string& bucket_name, const std::string& object_key, + const std::string& object_path) override; + + Status get_object_range(const std::string& bucket_name, const std::string& object_key, size_t offset, size_t length, + std::string* object_value, size_t* read_bytes) override; + + Status get_object_range(const std::string& bucket_name, const std::string& object_key, size_t offset, size_t length, + char* object_value, size_t* read_bytes) override; + + Status exist_object(const std::string& bucket_name, const std::string& object_key) override; + + Status get_object_size(const std::string& bucket_name, const std::string& object_key, size_t* size) override; + + Status delete_object(const std::string& bucket_name, const std::string& object_key) override; + + Status list_objects(const std::string& bucket_name, const std::string& object_prefix, + std::vector* result) override; + +private: + // transfer manager's thread pool. + static const int kThreadPoolNumber = 16; + // maximum size of the transfer manager's working buffer to use. + static const int kTransferManagerMaxBufferSize = 512 * 1024 * 1024; // 256MB + // maximum size that transfer manager will process in a single request. + static const int kTransferManagerSingleBufferSize = 32 * 1024 * 1024; // 32MB + // return how many keys each time call list_object. + static const int kListObjectMaxKeys = 1000; + + static Aws::Utils::Threading::Executor* _get_transfer_manager_executor() { + static Aws::Utils::Threading::PooledThreadExecutor executor(kThreadPoolNumber); + return &executor; + } + + Status _head_object(const std::string& bucket_name, const std::string& object_key, size_t* size); + + bool _is_not_found(const Aws::S3::S3Errors& err) { + return (err == Aws::S3::S3Errors::NO_SUCH_BUCKET || err == Aws::S3::S3Errors::NO_SUCH_KEY || + err == Aws::S3::S3Errors::RESOURCE_NOT_FOUND); + } + + Aws::Client::ClientConfiguration _config; + std::shared_ptr _client; + std::shared_ptr _transfer_manager; +}; + +} // namespace starrocks diff --git a/be/src/runtime/hdfs/hdfs_fs_cache.cpp b/be/src/runtime/hdfs/hdfs_fs_cache.cpp index 77950b38b1d..57d4e54af8a 100644 --- a/be/src/runtime/hdfs/hdfs_fs_cache.cpp +++ b/be/src/runtime/hdfs/hdfs_fs_cache.cpp @@ -5,77 +5,58 @@ #include #include +#include "common/config.h" #include "gutil/strings/substitute.h" #include "util/hdfs_util.h" namespace starrocks { -static Status parse_namenode(const std::string& path, std::string* namenode) { - const std::string local_fs("file:/"); - size_t n = path.find("://"); +static Status create_hdfs_fs_handle(const std::string& namenode, HdfsFsHandle* handle) { + const char* nn = namenode.c_str(); + if (is_hdfs_path(nn)) { + handle->type = HdfsFsHandle::Type::HDFS; + auto hdfs_builder = hdfsNewBuilder(); + hdfsBuilderSetNameNode(hdfs_builder, namenode.c_str()); + handle->hdfs_fs = hdfsBuilderConnect(hdfs_builder); + if (handle->hdfs_fs == nullptr) { + return Status::InternalError(strings::Substitute("fail to connect hdfs namenode, namenode=$0, err=$1", + namenode, get_hdfs_err_msg())); + } - if (n == std::string::npos) { - if (path.compare(0, local_fs.length(), local_fs) == 0) { - // Hadoop Path routines strip out consecutive /'s, so recognize 'file:/blah'. - *namenode = "file:///"; - } else { - // Path is not qualified, so use the default FS. - *namenode = "default"; - } - } else if (n == 0) { - return Status::InvalidArgument(strings::Substitute("Path missing scheme: $0", path)); + } else if (is_s3a_path(nn) || is_oss_path(nn)) { + handle->type = HdfsFsHandle::Type::S3; + Aws::Client::ClientConfiguration config; + config.scheme = Aws::Http::Scheme::HTTP; + config.endpointOverride = config::aws_s3_endpoint; + config.maxConnections = config::aws_s3_max_connection; + + S3Credential cred; + cred.access_key_id = config::aws_access_key_id; + cred.secret_access_key = config::aws_secret_access_key; + + S3Client* s3_client = new S3Client(config, &cred, false); + handle->s3_client = s3_client; } else { - // Path is qualified, i.e. "scheme://authority/path/to/file". Extract - // "scheme://authority/". - n = path.find('/', n + 3); - if (n == std::string::npos) { - return Status::InvalidArgument(strings::Substitute("Path missing '/' after authority: $0", path)); - } else { - // Include the trailing '/' for local filesystem case, i.e. "file:///". - *namenode = path.substr(0, n + 1); - } + return Status::InternalError(strings::Substitute("failed to make client, namenode=$0", namenode)); } return Status::OK(); } -Status HdfsFsCache::get_connection(const std::string& path, hdfsFS* fs, FileOpenLimitPtr* res, HdfsFsMap* local_cache) { - std::string namenode; - RETURN_IF_ERROR(parse_namenode(path, &namenode)); - - if (local_cache != nullptr) { - auto it = local_cache->find(namenode); - if (it != local_cache->end()) { - *fs = it->second.first; - *res = it->second.second.get(); - return Status::OK(); - } - } - +Status HdfsFsCache::get_connection(const std::string& namenode, HdfsFsHandle* handle, FileOpenLimitPtr* res) { { std::lock_guard l(_lock); - auto it = _cache.find(namenode); if (it != _cache.end()) { - *fs = it->second.first; + *handle = it->second.first; *res = it->second.second.get(); } else { - auto hdfs_builder = hdfsNewBuilder(); - hdfsBuilderSetNameNode(hdfs_builder, namenode.c_str()); + handle->namenode = namenode; + RETURN_IF_ERROR(create_hdfs_fs_handle(namenode, handle)); auto semaphore = std::make_unique(0); - *fs = hdfsBuilderConnect(hdfs_builder); *res = semaphore.get(); - if (*fs == nullptr) { - return Status::InternalError(strings::Substitute("fail to connect hdfs namenode, name=$0, err=$1", - namenode, get_hdfs_err_msg())); - } - _cache[namenode] = std::make_pair(*fs, std::move(semaphore)); + _cache[namenode] = std::make_pair(*handle, std::move(semaphore)); } } - - if (local_cache != nullptr) { - local_cache->emplace(namenode, std::make_pair(*fs, std::make_unique(0))); - } - return Status::OK(); } diff --git a/be/src/runtime/hdfs/hdfs_fs_cache.h b/be/src/runtime/hdfs/hdfs_fs_cache.h index c973a730c9a..4c7a6cf0850 100644 --- a/be/src/runtime/hdfs/hdfs_fs_cache.h +++ b/be/src/runtime/hdfs/hdfs_fs_cache.h @@ -2,8 +2,6 @@ #pragma once -#include - #include #include #include @@ -12,8 +10,7 @@ #include #include "common/status.h" -#include "gutil/macros.h" - +#include "env/env_hdfs.h" namespace starrocks { // Cache for HDFS file system @@ -21,7 +18,7 @@ class HdfsFsCache { public: using FileOpenLimit = std::atomic; using FileOpenLimitPtr = FileOpenLimit*; - using HdfsFsMap = std::unordered_map>>; + using HdfsFsMap = std::unordered_map>>; static HdfsFsCache* instance() { static HdfsFsCache s_instance; @@ -29,8 +26,7 @@ public: } // This function is thread-safe - Status get_connection(const std::string& path, hdfsFS* fs, FileOpenLimitPtr* semaphore = nullptr, - HdfsFsMap* map = nullptr); + Status get_connection(const std::string& namenode, HdfsFsHandle* handle, FileOpenLimitPtr* semaphore); private: std::mutex _lock; diff --git a/be/src/service/starrocks_main.cpp b/be/src/service/starrocks_main.cpp index fea32f5e28b..ab2b91cdca6 100644 --- a/be/src/service/starrocks_main.cpp +++ b/be/src/service/starrocks_main.cpp @@ -19,6 +19,7 @@ // specific language governing permissions and limitations // under the License. +#include #include #include #include @@ -271,13 +272,20 @@ int main(int argc, char** argv) { LOG(INFO) << "StarRocks BE HeartBeat Service started correctly."; } +#ifdef STARROCKS_WITH_AWS + Aws::SDKOptions aws_sdk_options; + Aws::InitAPI(aws_sdk_options); +#endif + while (!starrocks::k_starrocks_exit) { sleep(10); } - daemon->stop(); daemon.reset(); +#ifdef STARROCKS_WITH_AWS + Aws::ShutdownAPI(aws_sdk_options); +#endif heartbeat_thrift_server->stop(); heartbeat_thrift_server->join(); delete heartbeat_thrift_server; diff --git a/be/src/util/hdfs_util.cpp b/be/src/util/hdfs_util.cpp index 5ee73f0adfd..eb3fbea94ce 100644 --- a/be/src/util/hdfs_util.cpp +++ b/be/src/util/hdfs_util.cpp @@ -6,15 +6,15 @@ #include #include -using std::string; +#include "gutil/strings/substitute.h" #include "util/error_util.h" namespace starrocks { -const char* FILESYS_PREFIX_HDFS = "hdfs://"; -const char* FILESYS_PREFIX_S3 = "s3a://"; -const char* FILESYS_PREFIX_OSS = "oss://"; +static const char* kFileSysPrefixHdfs = "hdfs://"; +static const char* kFileSysPrefixS3 = "s3a://"; +static const char* KFileSysPrefixOSS = "oss://"; std::string get_hdfs_err_msg() { std::string error_msg = get_str_err_msg(); @@ -27,11 +27,11 @@ std::string get_hdfs_err_msg() { return ss.str(); } -Status get_name_node_from_path(const std::string& path, std::string* namenode) { - const string local_fs("file:/"); - size_t n = path.find("://"); +Status get_namenode_from_path(const std::string& path, std::string* namenode) { + const std::string local_fs("file:/"); + auto n = path.find("://"); - if (n == string::npos) { + if (n == std::string::npos) { if (path.compare(0, local_fs.length(), local_fs) == 0) { // Hadoop Path routines strip out consecutive /'s, so recognize 'file:/blah'. *namenode = "file:///"; @@ -40,35 +40,45 @@ Status get_name_node_from_path(const std::string& path, std::string* namenode) { *namenode = "default"; } } else if (n == 0) { - return Status::InternalError("Path missing schema"); + return Status::InvalidArgument(strings::Substitute("Path missing scheme: $0", path)); } else { // Path is qualified, i.e. "scheme://authority/path/to/file". Extract // "scheme://authority/". n = path.find('/', n + 3); - if (n == string::npos) { - return Status::InternalError("Path missing '/' after authority"); + if (n == std::string::npos) { + return Status::InvalidArgument(strings::Substitute("Path missing '/' after authority: $0", path)); + } else { + // Include the trailing '/' for local filesystem case, i.e. "file:///". + *namenode = path.substr(0, n + 1); } - // Include the trailing '/' for local filesystem case, i.e. "file:///". - *namenode = path.substr(0, n + 1); } return Status::OK(); } -bool is_specific_path(const char* path, const char* specific_prefix) { +std::string get_bucket_from_namenode(const std::string& namenode) { + auto n = namenode.find("://"); + if (n == std::string::npos) return ""; + n += 3; + auto n2 = namenode.find('/', n); + if (n2 == std::string::npos) return ""; + return namenode.substr(n, n2 - n); +} + +static bool is_specific_path(const char* path, const char* specific_prefix) { size_t prefix_len = strlen(specific_prefix); return strncmp(path, specific_prefix, prefix_len) == 0; } bool is_hdfs_path(const char* path) { - return is_specific_path(path, FILESYS_PREFIX_HDFS); + return is_specific_path(path, kFileSysPrefixHdfs); } bool is_s3a_path(const char* path) { - return is_specific_path(path, FILESYS_PREFIX_S3); + return is_specific_path(path, kFileSysPrefixS3); } bool is_oss_path(const char* path) { - return is_specific_path(path, FILESYS_PREFIX_OSS); + return is_specific_path(path, KFileSysPrefixOSS); } bool is_object_storage_path(const char* path) { diff --git a/be/src/util/hdfs_util.h b/be/src/util/hdfs_util.h index 647f196d336..9bd955febe3 100644 --- a/be/src/util/hdfs_util.h +++ b/be/src/util/hdfs_util.h @@ -10,12 +10,15 @@ namespace starrocks { std::string get_hdfs_err_msg(); -Status get_name_node_from_path(const std::string& path, std::string* namenode); +Status get_namenode_from_path(const std::string& path, std::string* namenode); +std::string get_bucket_from_namenode(const std::string& namenode); // Returns true if the path refers to a location on an HDFS filesystem. bool is_hdfs_path(const char* path); // Returns true if the path refers to a location on object storage filesystem. bool is_object_storage_path(const char* path); +bool is_s3a_path(const char* path); +bool is_oss_path(const char* path); } // namespace starrocks diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 12a4e72c063..6e7fda396b1 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -273,6 +273,10 @@ if (WITH_HDFS) set(EXEC_FILES ${EXEC_FILES} ./exec/vectorized/hdfs_scan_node_test.cpp) endif () +if (WITH_AWS) + set(EXEC_FILES ${EXEC_FILES} ./object_store/s3_client_test.cpp) +endif () + add_executable(starrocks_test ${EXEC_FILES}) TARGET_LINK_LIBRARIES(starrocks_test ${TEST_LINK_LIBS}) diff --git a/be/test/object_store/s3_client_test.cpp b/be/test/object_store/s3_client_test.cpp new file mode 100644 index 00000000000..1a7bcdb8cef --- /dev/null +++ b/be/test/object_store/s3_client_test.cpp @@ -0,0 +1,103 @@ +// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited. + +#include "object_store/s3_client.h" + +#include +#include + +#include + +#include "util/file_utils.h" + +namespace starrocks { + +static const std::string bucket_name = "starrocks-cloud-test-2022"; + +class S3Test : public testing::Test { +public: + S3Test() {} + virtual ~S3Test() {} + void SetUp() override { Aws::InitAPI(_options); } + void TearDown() override { Aws::ShutdownAPI(_options); } + +private: + Aws::SDKOptions _options; +}; + +TEST_F(S3Test, bucket_operation) { + Aws::Client::ClientConfiguration config; + config.region = Aws::Region::AP_SOUTHEAST_1; + + S3Client client(config); + + // create bucket + ASSERT_TRUE(client.create_bucket(bucket_name).ok()); + + // delete bucket + ASSERT_TRUE(client.delete_bucket(bucket_name).ok()); +} + +TEST_F(S3Test, object_operation) { + Aws::Client::ClientConfiguration config; + config.region = Aws::Region::AP_SOUTHEAST_1; + + S3Client client(config); + + // create bucket + ASSERT_TRUE(client.create_bucket(bucket_name).ok()); + + // put object + const std::string object_key = "hello"; + const std::string object_value = "world"; + const std::string object_path = "./S3Test_hello"; + ASSERT_TRUE(client.put_string_object(bucket_name, object_key, object_value).ok()); + + // test object exist + ASSERT_TRUE(client.exist_object(bucket_name, object_key).ok()); + + // get object size + size_t size = 0; + ASSERT_TRUE(client.get_object_size(bucket_name, object_key, &size).ok()); + ASSERT_EQ(size, object_value.size()); + + // get object + ASSERT_FALSE(client.get_object(bucket_name, object_key, "").ok()); + ASSERT_TRUE(client.get_object(bucket_name, object_key, object_path).ok()); + std::ifstream file(object_path.c_str(), std::ios::in); + ASSERT_TRUE(file.good()); + std::string object_value_tmp; + std::getline(file, object_value_tmp); + ASSERT_EQ(object_value_tmp, object_value); + FileUtils::remove(object_path); + + // get object range + std::string object_value_range; + size_t read_bytes; + ASSERT_TRUE(client.get_object_range(bucket_name, object_key, 1 /* offset */, 2 /* length */, &object_value_range, + &read_bytes) + .ok()); + ASSERT_EQ(read_bytes, 2); + ASSERT_EQ(object_value_range, "or"); + + // list object + const std::string object_key2 = "test/hello"; + ASSERT_TRUE(client.put_string_object(bucket_name, object_key2, object_value).ok()); + std::vector vector; + ASSERT_TRUE(client.list_objects(bucket_name, "/" /* object_prefix */, &vector).ok()); + ASSERT_EQ(vector.size(), 2); + ASSERT_EQ(vector[0], "hello"); + ASSERT_EQ(vector[1], "test/hello"); + + // delete object + ASSERT_TRUE(client.delete_object(bucket_name, object_key).ok()); + ASSERT_TRUE(client.delete_object(bucket_name, object_key2).ok()); + + // test object not exist + ASSERT_TRUE(client.exist_object(bucket_name, object_key).is_not_found()); + ASSERT_TRUE(client.exist_object(bucket_name, object_key2).is_not_found()); + + // delete bucket + ASSERT_TRUE(client.delete_bucket(bucket_name).ok()); +} + +} // namespace starrocks diff --git a/build.sh b/build.sh index 4649deae506..cfb6f4e43f9 100755 --- a/build.sh +++ b/build.sh @@ -62,6 +62,8 @@ Usage: $0 --without-gcov build Backend without gcov(default) --with-hdfs enable hdfs support --without-hdfs disable hdfs support + --with-aws enable aws support + --without-aws disable aws support Eg. $0 build all @@ -85,6 +87,8 @@ OPTS=$(getopt \ -l 'without-gcov' \ -l 'with-hdfs' \ -l 'without-hdfs' \ + -l 'with-aws' \ + -l 'without-aws' \ -l 'help' \ -- "$@") @@ -101,6 +105,7 @@ CLEAN= RUN_UT= WITH_GCOV=OFF WITH_HDFS=ON +WITH_AWS=ON if [[ -z ${USE_AVX2} ]]; then USE_AVX2=ON fi @@ -131,6 +136,8 @@ else --without-gcov) WITH_GCOV=OFF; shift ;; --with-hdfs) WITH_HDFS=ON; shift ;; --without-hdfs) WITH_HDFS=OFF; shift ;; + --with-aws) WITH_AWS=ON; shift ;; + --without-aws) WITH_AWS=OFF; shift ;; -h) HELP=1; shift ;; --help) HELP=1; shift ;; --) shift ; break ;; @@ -157,6 +164,7 @@ echo "Get params: RUN_UT -- $RUN_UT WITH_GCOV -- $WITH_GCOV WITH_HDFS -- $WITH_HDFS + WITH_AWS -- $WITH_AWS USE_AVX2 -- $USE_AVX2 " @@ -194,7 +202,7 @@ if [ ${BUILD_BE} -eq 1 ] ; then mkdir -p ${CMAKE_BUILD_DIR} cd ${CMAKE_BUILD_DIR} ${CMAKE_CMD} .. -DSTARROCKS_THIRDPARTY=${STARROCKS_THIRDPARTY} -DSTARROCKS_HOME=${STARROCKS_HOME} -DCMAKE_CXX_COMPILER_LAUNCHER=ccache -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} \ - -DMAKE_TEST=OFF -DWITH_HDFS=${WITH_HDFS} -DWITH_GCOV=${WITH_GCOV} -DUSE_AVX2=$USE_AVX2 -DCMAKE_EXPORT_COMPILE_COMMANDS=ON + -DMAKE_TEST=OFF -DWITH_HDFS=${WITH_HDFS} -DWITH_AWS=${WITH_AWS} -DWITH_GCOV=${WITH_GCOV} -DUSE_AVX2=$USE_AVX2 -DCMAKE_EXPORT_COMPILE_COMMANDS=ON time make -j${PARALLEL} make install cd ${STARROCKS_HOME} diff --git a/fe/fe-core/src/main/java/com/starrocks/external/ObejctStorageUtils.java b/fe/fe-core/src/main/java/com/starrocks/external/ObjectStorageUtils.java similarity index 66% rename from fe/fe-core/src/main/java/com/starrocks/external/ObejctStorageUtils.java rename to fe/fe-core/src/main/java/com/starrocks/external/ObjectStorageUtils.java index 8b94b5ddf6a..dd26991d03b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/external/ObejctStorageUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/external/ObjectStorageUtils.java @@ -2,10 +2,11 @@ package com.starrocks.external; -public class ObejctStorageUtils { +public class ObjectStorageUtils { private static final String SCHEME_S3A = "s3a://"; private static final String SCHEME_S3 = "s3://"; private static final String SCHEME_S3N = "s3n://"; + private static final String SCHEME_OSS = "oss://"; private static final String SCHEME_S3_PREFIX = "s3"; private static final String SCHEME_OSS_PREFIX = "oss"; @@ -15,12 +16,15 @@ public class ObejctStorageUtils { public static String formatObjectStoragePath(String path) { if (path.startsWith(SCHEME_S3)) { - return SCHEME_S3A + path.substring(5); + return SCHEME_S3A + path.substring(SCHEME_S3.length()); } if (path.startsWith(SCHEME_S3N)) { - return SCHEME_S3A + path.substring(6); + return SCHEME_S3A + path.substring(SCHEME_S3N.length()); + } + // use s3a to access oss. + if (path.startsWith(SCHEME_OSS)) { + return SCHEME_S3A + path.substring(SCHEME_OSS.length()); } - return path; } } diff --git a/fe/fe-core/src/main/java/com/starrocks/external/hive/HiveMetaCache.java b/fe/fe-core/src/main/java/com/starrocks/external/hive/HiveMetaCache.java index ea5a9ac40d4..760d9eba361 100644 --- a/fe/fe-core/src/main/java/com/starrocks/external/hive/HiveMetaCache.java +++ b/fe/fe-core/src/main/java/com/starrocks/external/hive/HiveMetaCache.java @@ -13,7 +13,7 @@ import com.starrocks.catalog.Column; import com.starrocks.catalog.PartitionKey; import com.starrocks.common.Config; import com.starrocks.common.DdlException; -import com.starrocks.external.ObejctStorageUtils; +import com.starrocks.external.ObjectStorageUtils; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -230,8 +230,8 @@ public class HiveMetaCache { private HivePartition getPartitionByEvent(StorageDescriptor sd) throws Exception { HdfsFileFormat format = HdfsFileFormat.fromHdfsInputFormatClass(sd.getInputFormat()); - String path = ObejctStorageUtils.formatObjectStoragePath(sd.getLocation()); - boolean isSplittable = ObejctStorageUtils.isObjectStorage(path) || + String path = ObjectStorageUtils.formatObjectStoragePath(sd.getLocation()); + boolean isSplittable = ObjectStorageUtils.isObjectStorage(path) || HdfsFileFormat.isSplittable(sd.getInputFormat()); List fileDescs = client.getHdfsFileDescs(path, isSplittable, sd); return new HivePartition(format, ImmutableList.copyOf(fileDescs), path); @@ -319,7 +319,8 @@ public class HiveMetaCache { public void refreshColumnStats(String dbName, String tableName, List partColumns, List columnNames) throws DdlException { try { - HiveTableColumnsKey hiveTableColumnsKey = HiveTableColumnsKey.gen(dbName, tableName, partColumns, columnNames); + HiveTableColumnsKey hiveTableColumnsKey = + HiveTableColumnsKey.gen(dbName, tableName, partColumns, columnNames); tableColumnStatsCache.put(hiveTableColumnsKey, loadTableColumnStats(hiveTableColumnsKey)); } catch (Exception e) { throw new DdlException("refresh table column statistic cached failed: " + e.getMessage()); diff --git a/fe/fe-core/src/main/java/com/starrocks/external/hive/HiveMetaClient.java b/fe/fe-core/src/main/java/com/starrocks/external/hive/HiveMetaClient.java index 6513929f07b..538516d2e58 100644 --- a/fe/fe-core/src/main/java/com/starrocks/external/hive/HiveMetaClient.java +++ b/fe/fe-core/src/main/java/com/starrocks/external/hive/HiveMetaClient.java @@ -14,7 +14,7 @@ import com.starrocks.catalog.PartitionKey; import com.starrocks.catalog.Type; import com.starrocks.common.Config; import com.starrocks.common.DdlException; -import com.starrocks.external.ObejctStorageUtils; +import com.starrocks.external.ObjectStorageUtils; import com.starrocks.external.hive.text.TextFileFormatDesc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -198,9 +198,9 @@ public class HiveMetaClient { throw new DdlException("unsupported file format [" + sd.getInputFormat() + "]"); } - String path = ObejctStorageUtils.formatObjectStoragePath(sd.getLocation()); + String path = ObjectStorageUtils.formatObjectStoragePath(sd.getLocation()); List fileDescs = getHdfsFileDescs(path, - ObejctStorageUtils.isObjectStorage(path) || HdfsFileFormat.isSplittable(sd.getInputFormat()), + ObjectStorageUtils.isObjectStorage(path) || HdfsFileFormat.isSplittable(sd.getInputFormat()), sd); return new HivePartition(format, ImmutableList.copyOf(fileDescs), path); } catch (NoSuchObjectException e) { diff --git a/licenses-binary/LICENSE-aws-sdk-cpp.txt b/licenses-binary/LICENSE-aws-sdk-cpp.txt new file mode 100644 index 00000000000..8dada3edaf5 --- /dev/null +++ b/licenses-binary/LICENSE-aws-sdk-cpp.txt @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/run-ut.sh b/run-ut.sh index 9e27aa61c93..cb29e951931 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -35,6 +35,8 @@ Usage: $0 --clean clean and build ut --run build and run ut --gtest_filter specify test cases + --with-hdfs enable to test hdfs + --with-aws enable to test aws Eg. $0 build ut @@ -53,6 +55,8 @@ OPTS=$(getopt \ -l 'run' \ -l 'clean' \ -l "gtest_filter:" \ + -l "with-hdfs" \ + -l 'with-aws' \ -l 'help' \ -- "$@") @@ -66,12 +70,16 @@ CLEAN=0 RUN=0 TEST_FILTER=* HELP=0 +WITH_AWS=OFF +WITH_HDFS=OFF while true; do case "$1" in --clean) CLEAN=1 ; shift ;; --run) RUN=1 ; shift ;; --gtest_filter) TEST_FILTER=$2 ; shift 2;; --help) HELP=1 ; shift ;; + --with-aws) WITH_AWS=ON; shift ;; + --with-hdfs) WITH_HDFS=ON; shift ;; --) shift ; break ;; *) echo "Internal error" ; exit 1 ;; esac @@ -102,7 +110,7 @@ fi cd ${CMAKE_BUILD_DIR} ${CMAKE_CMD} ../ -DSTARROCKS_THIRDPARTY=${STARROCKS_THIRDPARTY} -DSTARROCKS_HOME=${STARROCKS_HOME} -DCMAKE_CXX_COMPILER_LAUNCHER=ccache \ - -DWITH_HDFS=OFF -DMAKE_TEST=ON -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} -DUSE_AVX2=$USE_AVX2 -DCMAKE_EXPORT_COMPILE_COMMANDS=ON + -DWITH_HDFS=${WITH_HDFS} -DWITH_AWS=${WITH_AWS} -DMAKE_TEST=ON -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} -DUSE_AVX2=$USE_AVX2 -DCMAKE_EXPORT_COMPILE_COMMANDS=ON time make -j${PARALLEL} diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh index 8041ba5ce97..709cd157a64 100755 --- a/thirdparty/build-thirdparty.sh +++ b/thirdparty/build-thirdparty.sh @@ -737,7 +737,7 @@ build_aliyun_oss_jars() { build_aws_cpp_sdk() { check_if_source_exist $AWS_SDK_CPP_SOURCE cd $TP_SOURCE_DIR/$AWS_SDK_CPP_SOURCE - # only build s3 and s3-crt, you can add more components if you want. + # only build s3, s3-crt and transfer manager, you can add more components if you want. $CMAKE_CMD -Bbuild -DBUILD_ONLY="core;s3;s3-crt;transfer" -DCMAKE_BUILD_TYPE=RelWithDebInfo -DBUILD_SHARED_LIBS=OFF -DCMAKE_INSTALL_PREFIX=${TP_INSTALL_DIR} -DENABLE_TESTING=OFF cd build make -j$PARALLEL && make install