Signed-off-by: zihe.liu <ziheliu1024@gmail.com> Co-authored-by: zihe.liu <ziheliu1024@gmail.com>
This commit is contained in:
parent
bd48411e3e
commit
bdb0b0e467
|
|
@ -83,35 +83,69 @@ void BinaryColumnBase<T>::append(const Column& src, size_t offset, size_t count)
|
|||
}
|
||||
|
||||
template <typename T>
|
||||
void BinaryColumnBase<T>::append_selective(const Column& src, const uint32_t* indexes, uint32_t from, uint32_t size) {
|
||||
void BinaryColumnBase<T>::append_selective(const Column& src, const uint32_t* indexes, uint32_t from,
|
||||
const uint32_t size) {
|
||||
if (src.is_binary_view()) {
|
||||
down_cast<const ColumnView*>(&src)->append_to(*this, indexes, from, size);
|
||||
return;
|
||||
}
|
||||
|
||||
indexes += from;
|
||||
|
||||
const auto& src_column = down_cast<const BinaryColumnBase<T>&>(src);
|
||||
const auto& src_offsets = src_column.get_offset();
|
||||
const auto& src_bytes = src_column.get_bytes();
|
||||
const auto* __restrict src_offsets = src_column.get_offset().data();
|
||||
const auto* __restrict src_bytes = src_column.get_bytes().data();
|
||||
|
||||
size_t cur_row_count = _offsets.size() - 1;
|
||||
size_t cur_byte_size = _bytes.size();
|
||||
const size_t prev_num_offsets = _offsets.size();
|
||||
const size_t prev_num_rows = prev_num_offsets - 1;
|
||||
|
||||
_offsets.resize(cur_row_count + size + 1);
|
||||
_offsets.resize(prev_num_offsets + size * 2);
|
||||
auto* __restrict new_offsets = _offsets.data() + prev_num_offsets;
|
||||
|
||||
// Buffer i-th start offset and end offset in new_offsets[i * 2] and new_offsets[i * 2 + 1].
|
||||
for (size_t i = 0; i < size; i++) {
|
||||
uint32_t row_idx = indexes[from + i];
|
||||
T str_size = src_offsets[row_idx + 1] - src_offsets[row_idx];
|
||||
_offsets[cur_row_count + i + 1] = _offsets[cur_row_count + i] + str_size;
|
||||
cur_byte_size += str_size;
|
||||
const uint32_t src_idx = indexes[i];
|
||||
new_offsets[i * 2] = src_offsets[src_idx];
|
||||
new_offsets[i * 2 + 1] = src_offsets[src_idx + 1];
|
||||
}
|
||||
_bytes.resize(cur_byte_size);
|
||||
|
||||
auto* dest_bytes = _bytes.data();
|
||||
for (size_t i = 0; i < size; i++) {
|
||||
uint32_t row_idx = indexes[from + i];
|
||||
T str_size = src_offsets[row_idx + 1] - src_offsets[row_idx];
|
||||
strings::memcpy_inlined(dest_bytes + _offsets[cur_row_count + i], src_bytes.data() + src_offsets[row_idx],
|
||||
str_size);
|
||||
// Write bytes
|
||||
{
|
||||
size_t num_bytes = _bytes.size();
|
||||
for (size_t i = 0; i < size; i++) {
|
||||
num_bytes += new_offsets[i * 2 + 1] - new_offsets[i * 2];
|
||||
}
|
||||
_bytes.resize(num_bytes);
|
||||
auto* __restrict dest_bytes = _bytes.data();
|
||||
size_t cur_offset = _offsets[prev_num_rows];
|
||||
|
||||
if (src_column.get_bytes().size() > 32 * 1024 * 1024ull) {
|
||||
for (size_t i = 0; i < size; i++) {
|
||||
if (i + 16 < size) {
|
||||
// If the source column is large enough, use prefetch to speed up copying.
|
||||
__builtin_prefetch(src_bytes + new_offsets[i * 2 + 32]);
|
||||
}
|
||||
const T str_size = new_offsets[i * 2 + 1] - new_offsets[i * 2];
|
||||
strings::memcpy_inlined(dest_bytes + cur_offset, src_bytes + new_offsets[i * 2], str_size);
|
||||
cur_offset += str_size;
|
||||
}
|
||||
} else {
|
||||
for (size_t i = 0; i < size; i++) {
|
||||
const T str_size = new_offsets[i * 2 + 1] - new_offsets[i * 2];
|
||||
// Only copy 16 bytes extra when src_column is small enough, because the overhead of copying 16 bytes
|
||||
// will be large when src_column is large enough.
|
||||
strings::memcpy_inlined_overflow16(dest_bytes + cur_offset, src_bytes + new_offsets[i * 2], str_size);
|
||||
cur_offset += str_size;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write offsets.
|
||||
for (int64_t i = 0; i < size; i++) {
|
||||
new_offsets[i] = new_offsets[i - 1] + (new_offsets[i * 2 + 1] - new_offsets[i * 2]);
|
||||
}
|
||||
_offsets.resize(prev_num_offsets + size);
|
||||
|
||||
_slices_cache = false;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -255,7 +255,7 @@ std::unique_ptr<Chunk> Chunk::clone_empty_with_slot(size_t size) const {
|
|||
columns[i] = _columns[i]->clone_empty();
|
||||
columns[i]->reserve(size);
|
||||
}
|
||||
return std::make_unique<Chunk>(columns, _slot_id_to_index);
|
||||
return std::make_unique<Chunk>(std::move(columns), _slot_id_to_index);
|
||||
}
|
||||
|
||||
std::unique_ptr<Chunk> Chunk::clone_empty_with_schema() const {
|
||||
|
|
|
|||
|
|
@ -37,8 +37,12 @@ StatusOr<ColumnPtr> FixedLengthColumnBase<T>::upgrade_if_overflow() {
|
|||
|
||||
template <typename T>
|
||||
void FixedLengthColumnBase<T>::append(const Column& src, size_t offset, size_t count) {
|
||||
const auto& num_src = down_cast<const FixedLengthColumnBase<T>&>(src);
|
||||
_data.insert(_data.end(), num_src._data.begin() + offset, num_src._data.begin() + offset + count);
|
||||
const T* src_data = reinterpret_cast<const T*>(src.raw_data());
|
||||
|
||||
const size_t orig_size = _data.size();
|
||||
raw::stl_vector_resize_uninitialized(&_data, orig_size + count);
|
||||
|
||||
strings::memcpy_inlined(_data.data() + orig_size, src_data + offset, count * sizeof(T));
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
|
|
@ -48,7 +52,7 @@ void FixedLengthColumnBase<T>::append_selective(const Column& src, const uint32_
|
|||
const T* src_data = reinterpret_cast<const T*>(src.raw_data());
|
||||
|
||||
const size_t orig_size = _data.size();
|
||||
_data.resize(orig_size + size);
|
||||
raw::stl_vector_resize_uninitialized(&_data, orig_size + size);
|
||||
auto* dest_data = _data.data() + orig_size;
|
||||
|
||||
SIMDGather::gather(dest_data, src_data, indexes, size);
|
||||
|
|
|
|||
|
|
@ -188,4 +188,25 @@ ALWAYS_INLINE inline void memcpy_inlined(void* __restrict _dst, const void* __re
|
|||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
ALWAYS_INLINE inline void memcpy_inlined_overflow16(void* __restrict _dst, const void* __restrict _src, ssize_t size) {
|
||||
auto* __restrict dst = static_cast<uint8_t*>(_dst);
|
||||
const auto* __restrict src = static_cast<const uint8_t*>(_src);
|
||||
|
||||
while (size > 0) {
|
||||
#ifdef __SSE2__
|
||||
_mm_storeu_si128(reinterpret_cast<__m128i*>(dst), _mm_loadu_si128(reinterpret_cast<const __m128i*>(src)));
|
||||
#elif defined(__ARM_NEON) && defined(__aarch64__)
|
||||
vst1q_u8(dst, vld1q_u8(src));
|
||||
#else
|
||||
std::memcpy(dst, src, 16);
|
||||
#endif
|
||||
dst += 16;
|
||||
src += 16;
|
||||
size -= 16;
|
||||
// Inhibit loop-idiom optimization of compilers, which would collapse the per-16B copies into a single memcpy.
|
||||
__asm__ __volatile__("" : : : "memory");
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace strings
|
||||
|
|
|
|||
|
|
@ -233,13 +233,14 @@ public:
|
|||
/// A counter that keeps track of the highest/lowest value seen (reporting that
|
||||
/// as value()) and the current value.
|
||||
template <bool is_high>
|
||||
class WaterMarkCounter : public Counter {
|
||||
class WaterMarkCounter final : public Counter {
|
||||
public:
|
||||
explicit WaterMarkCounter(TUnit::type type, int64_t value = 0) : Counter(type, value) { _set_init_value(); }
|
||||
explicit WaterMarkCounter(TUnit::type type, const TCounterStrategy& strategy, int64_t value = 0)
|
||||
: Counter(type, strategy, value) {
|
||||
_set_init_value();
|
||||
}
|
||||
~WaterMarkCounter() override = default;
|
||||
|
||||
virtual void add(int64_t delta) {
|
||||
int64_t new_val = current_value_.fetch_add(delta, std::memory_order_relaxed) + delta;
|
||||
|
|
|
|||
|
|
@ -680,4 +680,44 @@ PARALLEL_TEST(BinaryColumnTest, test_reference_memory_usage) {
|
|||
ASSERT_EQ(0, column->Column::reference_memory_usage());
|
||||
}
|
||||
|
||||
class BinaryColumnAppendSelectiveTestFixture : public ::testing::TestWithParam<std::tuple<uint32_t>> {};
|
||||
|
||||
TEST_P(BinaryColumnAppendSelectiveTestFixture, test_append_selective) {
|
||||
const uint32_t num_rows = std::get<0>(GetParam());
|
||||
|
||||
BinaryColumn::Ptr src_col = BinaryColumn::create();
|
||||
for (uint32_t i = 0; i < num_rows; i++) {
|
||||
const size_t str_len = i % 16 + 8; // Length between 8 and 23
|
||||
std::string str(str_len, 'a' + (i % 26));
|
||||
src_col->append(str);
|
||||
}
|
||||
|
||||
std::vector<uint32_t> indexes;
|
||||
indexes.reserve(num_rows / 16);
|
||||
for (uint32_t i = 0; i < num_rows; i++) {
|
||||
if (i % 16 == 0) {
|
||||
indexes.push_back(i);
|
||||
}
|
||||
}
|
||||
|
||||
BinaryColumn::Ptr dst_col = BinaryColumn::create();
|
||||
|
||||
dst_col->append_selective(*src_col, indexes.data(), 0, static_cast<uint32_t>(indexes.size()));
|
||||
const size_t num_dst_rows = dst_col->size();
|
||||
ASSERT_EQ(indexes.size(), num_dst_rows);
|
||||
for (uint32_t i = 0; i < num_dst_rows; i++) {
|
||||
ASSERT_EQ(src_col->get_slice(indexes[i]), dst_col->get_slice(i));
|
||||
}
|
||||
|
||||
dst_col->append_selective(*src_col, indexes.data(), 10, static_cast<uint32_t>(indexes.size()));
|
||||
ASSERT_EQ(num_dst_rows + indexes.size(), dst_col->size());
|
||||
for (uint32_t i = 10; i < indexes.size(); i++) {
|
||||
ASSERT_EQ(src_col->get_slice(indexes[i]), dst_col->get_slice(num_dst_rows + i - 10));
|
||||
}
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_SUITE_P(BinaryColumnAppendSelectiveTest, BinaryColumnAppendSelectiveTestFixture,
|
||||
::testing::Values(std::make_tuple(2048), std::make_tuple(4096), std::make_tuple(40960),
|
||||
std::make_tuple(32 * 1024 * 1024 + 100)));
|
||||
|
||||
} // namespace starrocks
|
||||
|
|
|
|||
Loading…
Reference in New Issue