[Enhancement] add segment write time in lake compaction (#60891)
Signed-off-by: starrocks-xupeng <xupeng@starrocks.com>
This commit is contained in:
parent
b84d2051e4
commit
b0f5cbbbb1
|
|
@ -35,6 +35,8 @@ public:
|
|||
// Append slices to the shared file, and return the first offset of the slices.
|
||||
StatusOr<int64_t> appendv(const std::vector<Slice>& slices, const FileEncryptionInfo& info);
|
||||
|
||||
WritableFile* get_writable_file() { return _bundle_file.get(); }
|
||||
|
||||
private:
|
||||
Status _close();
|
||||
|
||||
|
|
@ -72,6 +74,10 @@ public:
|
|||
|
||||
int64_t bundle_file_offset() const override { return _bundle_file_offset; }
|
||||
|
||||
StatusOr<std::unique_ptr<io::NumericStatistics>> get_numeric_statistics() override {
|
||||
return _context->get_writable_file()->get_numeric_statistics();
|
||||
}
|
||||
|
||||
protected:
|
||||
// It will be shared with lots of threads.
|
||||
BundleWritableFileContext* _context = nullptr;
|
||||
|
|
|
|||
|
|
@ -457,6 +457,11 @@ public:
|
|||
virtual int64_t bundle_file_offset() const { return -1; }
|
||||
|
||||
virtual void set_encryption_info(const FileEncryptionInfo& info) {}
|
||||
|
||||
// Return statistics about file written, like how many time is spent on IO
|
||||
virtual StatusOr<std::unique_ptr<io::NumericStatistics>> get_numeric_statistics() {
|
||||
return Status::NotSupported("get_numeric_statistics");
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace starrocks
|
||||
|
|
|
|||
|
|
@ -202,7 +202,7 @@ public:
|
|||
stats->append(kIOCountRemote, read_stats.io_count_remote);
|
||||
stats->append(kIONsReadLocalDisk, read_stats.io_ns_read_local_disk);
|
||||
stats->append(kIONsWriteLocalDisk, read_stats.io_ns_write_local_disk);
|
||||
stats->append(kIONsRemote, read_stats.io_ns_read_remote);
|
||||
stats->append(kIONsReadRemote, read_stats.io_ns_read_remote);
|
||||
stats->append(kPrefetchHitCount, read_stats.prefetch_hit_count);
|
||||
stats->append(kPrefetchWaitFinishNs, read_stats.prefetch_wait_finish_ns);
|
||||
stats->append(kPrefetchPendingNs, read_stats.prefetch_pending_ns);
|
||||
|
|
@ -269,6 +269,19 @@ public:
|
|||
return to_status((*stream_st)->close());
|
||||
}
|
||||
|
||||
StatusOr<std::unique_ptr<io::NumericStatistics>> get_numeric_statistics() override {
|
||||
auto stream_st = _file_ptr->stream();
|
||||
if (!stream_st.ok()) {
|
||||
return to_status(stream_st.status());
|
||||
}
|
||||
const auto& io_stats = (*stream_st)->get_io_stats();
|
||||
auto stats = std::make_unique<io::NumericStatistics>();
|
||||
stats->reserve(2);
|
||||
stats->append(kIONsWriteRemote, io_stats.io_ns_write_remote);
|
||||
stats->append(kBytesWriteRemote, io_stats.bytes_write_remote);
|
||||
return std::move(stats);
|
||||
}
|
||||
|
||||
private:
|
||||
WritableFilePtr _file_ptr;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -55,6 +55,10 @@ public:
|
|||
|
||||
const std::string& filename() const override { return _name; }
|
||||
|
||||
StatusOr<std::unique_ptr<io::NumericStatistics>> get_numeric_statistics() override {
|
||||
return _os->get_numeric_statistics();
|
||||
}
|
||||
|
||||
private:
|
||||
std::unique_ptr<io::OutputStream> _os;
|
||||
std::string _name;
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
#include "common/ownership.h"
|
||||
#include "common/statusor.h"
|
||||
#include "io/input_stream.h"
|
||||
#include "io/writable.h"
|
||||
#include "util/slice.h"
|
||||
|
||||
|
|
@ -51,6 +52,11 @@ public:
|
|||
// returns NULL. The return pointer is invalidated as soon as any other
|
||||
// non-const method of OutputStream is called.
|
||||
virtual StatusOr<Position> get_direct_buffer_and_advance(int64_t size) = 0;
|
||||
|
||||
// Return statistics about file written, like how many time is spent on IO
|
||||
virtual StatusOr<std::unique_ptr<NumericStatistics>> get_numeric_statistics() {
|
||||
return Status::NotSupported("get_numeric_statistics");
|
||||
}
|
||||
};
|
||||
|
||||
class OutputStreamWrapper : public OutputStream {
|
||||
|
|
@ -85,6 +91,10 @@ public:
|
|||
return _impl->get_direct_buffer_and_advance(size);
|
||||
}
|
||||
|
||||
StatusOr<std::unique_ptr<NumericStatistics>> get_numeric_statistics() override {
|
||||
return _impl->get_numeric_statistics();
|
||||
}
|
||||
|
||||
private:
|
||||
OutputStream* _impl;
|
||||
Ownership _ownership;
|
||||
|
|
|
|||
|
|
@ -116,13 +116,14 @@ void CompactionTaskCallback::finish_task(std::unique_ptr<CompactionTaskContext>&
|
|||
// process compact stat
|
||||
auto compact_stat = _response->add_compact_stats();
|
||||
compact_stat->set_tablet_id(context->tablet_id);
|
||||
compact_stat->set_read_time_remote(context->stats->io_ns_remote);
|
||||
compact_stat->set_read_time_remote(context->stats->io_ns_read_remote);
|
||||
compact_stat->set_read_bytes_remote(context->stats->io_bytes_read_remote);
|
||||
compact_stat->set_read_time_local(context->stats->io_ns_local_disk);
|
||||
compact_stat->set_read_time_local(context->stats->io_ns_read_local_disk);
|
||||
compact_stat->set_read_bytes_local(context->stats->io_bytes_read_local_disk);
|
||||
compact_stat->set_read_segment_count(context->stats->read_segment_count);
|
||||
compact_stat->set_write_segment_count(context->stats->write_segment_count);
|
||||
compact_stat->set_write_segment_bytes(context->stats->write_segment_bytes);
|
||||
compact_stat->set_write_time_remote(context->stats->io_ns_write_remote);
|
||||
compact_stat->set_in_queue_time_sec(context->stats->in_queue_time_sec);
|
||||
compact_stat->set_sub_task_count(_request->tablet_ids_size());
|
||||
compact_stat->set_total_compact_input_file_size(context->stats->input_file_size);
|
||||
|
|
|
|||
|
|
@ -26,8 +26,8 @@ static constexpr long TIME_UNIT_NS_PER_SECOND = 1000000000;
|
|||
static constexpr long BYTES_UNIT_MB = 1048576;
|
||||
|
||||
void CompactionTaskStats::collect(const OlapReaderStatistics& reader_stats) {
|
||||
io_ns_remote = reader_stats.io_ns_remote;
|
||||
io_ns_local_disk = reader_stats.io_ns_read_local_disk;
|
||||
io_ns_read_remote = reader_stats.io_ns_remote;
|
||||
io_ns_read_local_disk = reader_stats.io_ns_read_local_disk;
|
||||
io_bytes_read_remote = reader_stats.compressed_bytes_read_remote;
|
||||
io_bytes_read_local_disk = reader_stats.compressed_bytes_read_local_disk;
|
||||
segment_init_ns = reader_stats.segment_init_ns;
|
||||
|
|
@ -40,13 +40,14 @@ void CompactionTaskStats::collect(const OlapReaderStatistics& reader_stats) {
|
|||
|
||||
void CompactionTaskStats::collect(const OlapWriterStatistics& writer_stats) {
|
||||
write_segment_count = writer_stats.segment_count;
|
||||
write_segment_bytes = writer_stats.bytes_write;
|
||||
write_segment_bytes = writer_stats.bytes_write_remote;
|
||||
io_ns_write_remote = writer_stats.write_remote_ns;
|
||||
}
|
||||
|
||||
CompactionTaskStats CompactionTaskStats::operator+(const CompactionTaskStats& that) const {
|
||||
CompactionTaskStats diff = *this;
|
||||
diff.io_ns_remote += that.io_ns_remote;
|
||||
diff.io_ns_local_disk += that.io_ns_local_disk;
|
||||
diff.io_ns_read_remote += that.io_ns_read_remote;
|
||||
diff.io_ns_read_local_disk += that.io_ns_read_local_disk;
|
||||
diff.io_bytes_read_remote += that.io_bytes_read_remote;
|
||||
diff.io_bytes_read_local_disk += that.io_bytes_read_local_disk;
|
||||
diff.segment_init_ns += that.segment_init_ns;
|
||||
|
|
@ -57,6 +58,7 @@ CompactionTaskStats CompactionTaskStats::operator+(const CompactionTaskStats& th
|
|||
// diff.read_segment_count += that.read_segment_count;
|
||||
diff.write_segment_count += that.write_segment_count;
|
||||
diff.write_segment_bytes += that.write_segment_bytes;
|
||||
diff.io_ns_write_remote += that.io_ns_write_remote;
|
||||
diff.in_queue_time_sec += that.in_queue_time_sec;
|
||||
diff.pk_sst_merge_ns += that.pk_sst_merge_ns;
|
||||
diff.input_file_size += that.input_file_size;
|
||||
|
|
@ -65,8 +67,8 @@ CompactionTaskStats CompactionTaskStats::operator+(const CompactionTaskStats& th
|
|||
|
||||
CompactionTaskStats CompactionTaskStats::operator-(const CompactionTaskStats& that) const {
|
||||
CompactionTaskStats diff = *this;
|
||||
diff.io_ns_remote -= that.io_ns_remote;
|
||||
diff.io_ns_local_disk -= that.io_ns_local_disk;
|
||||
diff.io_ns_read_remote -= that.io_ns_read_remote;
|
||||
diff.io_ns_read_local_disk -= that.io_ns_read_local_disk;
|
||||
diff.io_bytes_read_remote -= that.io_bytes_read_remote;
|
||||
diff.io_bytes_read_local_disk -= that.io_bytes_read_local_disk;
|
||||
diff.segment_init_ns -= that.segment_init_ns;
|
||||
|
|
@ -77,6 +79,7 @@ CompactionTaskStats CompactionTaskStats::operator-(const CompactionTaskStats& th
|
|||
// diff.read_segment_count -= that.read_segment_count;
|
||||
diff.write_segment_count -= that.write_segment_count;
|
||||
diff.write_segment_bytes -= that.write_segment_bytes;
|
||||
diff.io_ns_write_remote -= that.io_ns_write_remote;
|
||||
diff.in_queue_time_sec -= that.in_queue_time_sec;
|
||||
diff.pk_sst_merge_ns -= that.pk_sst_merge_ns;
|
||||
diff.input_file_size -= that.input_file_size;
|
||||
|
|
@ -88,9 +91,9 @@ std::string CompactionTaskStats::to_json_stats() {
|
|||
root.SetObject();
|
||||
auto& allocator = root.GetAllocator();
|
||||
// add stats
|
||||
root.AddMember("read_local_sec", rapidjson::Value(io_ns_local_disk / TIME_UNIT_NS_PER_SECOND), allocator);
|
||||
root.AddMember("read_local_sec", rapidjson::Value(io_ns_read_local_disk / TIME_UNIT_NS_PER_SECOND), allocator);
|
||||
root.AddMember("read_local_mb", rapidjson::Value(io_bytes_read_local_disk / BYTES_UNIT_MB), allocator);
|
||||
root.AddMember("read_remote_sec", rapidjson::Value(io_ns_remote / TIME_UNIT_NS_PER_SECOND), allocator);
|
||||
root.AddMember("read_remote_sec", rapidjson::Value(io_ns_read_remote / TIME_UNIT_NS_PER_SECOND), allocator);
|
||||
root.AddMember("read_remote_mb", rapidjson::Value(io_bytes_read_remote / BYTES_UNIT_MB), allocator);
|
||||
root.AddMember("read_remote_count", rapidjson::Value(io_count_remote), allocator);
|
||||
root.AddMember("read_local_count", rapidjson::Value(io_count_local_disk), allocator);
|
||||
|
|
@ -99,7 +102,8 @@ std::string CompactionTaskStats::to_json_stats() {
|
|||
allocator);
|
||||
root.AddMember("read_segment_count", rapidjson::Value(read_segment_count), allocator);
|
||||
root.AddMember("write_segment_count", rapidjson::Value(write_segment_count), allocator);
|
||||
root.AddMember("write_segment_mb", rapidjson::Value(write_segment_bytes / BYTES_UNIT_MB), allocator);
|
||||
root.AddMember("write_remote_mb", rapidjson::Value(write_segment_bytes / BYTES_UNIT_MB), allocator);
|
||||
root.AddMember("write_remote_sec", rapidjson::Value(io_ns_write_remote / TIME_UNIT_NS_PER_SECOND), allocator);
|
||||
root.AddMember("in_queue_sec", rapidjson::Value(in_queue_time_sec), allocator);
|
||||
root.AddMember("pk_sst_merge_sec", rapidjson::Value(pk_sst_merge_ns / TIME_UNIT_NS_PER_SECOND), allocator);
|
||||
root.AddMember("input_file_size", rapidjson::Value(input_file_size), allocator);
|
||||
|
|
|
|||
|
|
@ -42,8 +42,8 @@ private:
|
|||
};
|
||||
|
||||
struct CompactionTaskStats {
|
||||
int64_t io_ns_remote = 0;
|
||||
int64_t io_ns_local_disk = 0;
|
||||
int64_t io_ns_read_remote = 0;
|
||||
int64_t io_ns_read_local_disk = 0;
|
||||
int64_t io_bytes_read_remote = 0;
|
||||
int64_t io_bytes_read_local_disk = 0;
|
||||
int64_t segment_init_ns = 0;
|
||||
|
|
@ -54,6 +54,7 @@ struct CompactionTaskStats {
|
|||
int64_t read_segment_count = 0;
|
||||
int64_t write_segment_count = 0;
|
||||
int64_t write_segment_bytes = 0;
|
||||
int64_t io_ns_write_remote = 0;
|
||||
int64_t pk_sst_merge_ns = 0;
|
||||
int64_t input_file_size = 0;
|
||||
|
||||
|
|
|
|||
|
|
@ -31,6 +31,28 @@
|
|||
|
||||
namespace starrocks::lake {
|
||||
|
||||
void collect_writer_stats(OlapWriterStatistics& writer_stats, SegmentWriter* segment_writer) {
|
||||
if (segment_writer == nullptr) {
|
||||
return;
|
||||
}
|
||||
auto stats_or = segment_writer->get_numeric_statistics();
|
||||
if (!stats_or.ok()) {
|
||||
VLOG(3) << "failed to get statistics: " << stats_or.status();
|
||||
return;
|
||||
}
|
||||
|
||||
std::unique_ptr<io::NumericStatistics> stats = std::move(stats_or).value();
|
||||
for (int64_t i = 0, sz = (stats ? stats->size() : 0); i < sz; ++i) {
|
||||
auto&& name = stats->name(i);
|
||||
auto&& value = stats->value(i);
|
||||
if (name == kBytesWriteRemote) {
|
||||
writer_stats.bytes_write_remote += value;
|
||||
} else if (name == kIONsWriteRemote) {
|
||||
writer_stats.write_remote_ns += value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
HorizontalGeneralTabletWriter::HorizontalGeneralTabletWriter(TabletManager* tablet_mgr, int64_t tablet_id,
|
||||
std::shared_ptr<const TabletSchema> schema, int64_t txn_id,
|
||||
bool is_compaction, ThreadPool* flush_pool,
|
||||
|
|
@ -137,7 +159,7 @@ Status HorizontalGeneralTabletWriter::flush_segment_writer(SegmentPB* segment) {
|
|||
}
|
||||
_files.emplace_back(file_info);
|
||||
_data_size += segment_size;
|
||||
_stats.bytes_write += segment_size;
|
||||
collect_writer_stats(_stats, _seg_writer.get());
|
||||
_stats.segment_count++;
|
||||
if (segment) {
|
||||
segment->set_data_size(segment_size);
|
||||
|
|
@ -284,7 +306,7 @@ Status VerticalGeneralTabletWriter::finish(SegmentPB* segment) {
|
|||
std::string segment_name = std::string(basename(segment_path));
|
||||
_files.emplace_back(FileInfo{segment_name, segment_size, segment_writer->encryption_meta()});
|
||||
_data_size += segment_size;
|
||||
_stats.bytes_write += segment_size;
|
||||
collect_writer_stats(_stats, segment_writer.get());
|
||||
_stats.segment_count++;
|
||||
segment_writer.reset();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -147,4 +147,6 @@ protected:
|
|||
std::vector<std::future<Status>> _futures;
|
||||
};
|
||||
|
||||
void collect_writer_stats(OlapWriterStatistics& writer_stats, SegmentWriter* segment_writer);
|
||||
|
||||
} // namespace starrocks::lake
|
||||
|
|
|
|||
|
|
@ -101,7 +101,7 @@ Status HorizontalPkTabletWriter::flush_segment_writer(SegmentPB* segment) {
|
|||
}
|
||||
_files.emplace_back(file_info);
|
||||
_data_size += segment_size;
|
||||
_stats.bytes_write += segment_size;
|
||||
collect_writer_stats(_stats, _seg_writer.get());
|
||||
_stats.segment_count++;
|
||||
if (segment) {
|
||||
segment->set_data_size(segment_size);
|
||||
|
|
|
|||
|
|
@ -329,19 +329,21 @@ struct OlapReaderStatistics {
|
|||
|
||||
// OlapWriterStatistics used to collect statistics when write data to storage
|
||||
struct OlapWriterStatistics {
|
||||
int64_t bytes_write_ns = 0; // how much time is spent on write
|
||||
int64_t bytes_write = 0; // how many bytes are written
|
||||
int64_t segment_count = 0; // how many files are written
|
||||
int64_t write_remote_ns = 0; // how much time is spent on write
|
||||
int64_t bytes_write_remote = 0; // how many bytes are written
|
||||
int64_t segment_count = 0; // how many files are written
|
||||
};
|
||||
|
||||
const char* const kBytesReadLocalDisk = "bytes_read_local_disk";
|
||||
const char* const kBytesWriteLocalDisk = "bytes_write_local_disk";
|
||||
const char* const kBytesReadRemote = "bytes_read_remote";
|
||||
const char* const kBytesWriteRemote = "bytes_write_remote";
|
||||
const char* const kIOCountLocalDisk = "io_count_local_disk";
|
||||
const char* const kIOCountRemote = "io_count_remote";
|
||||
const char* const kIONsReadLocalDisk = "io_ns_read_local_disk";
|
||||
const char* const kIONsWriteLocalDisk = "io_ns_write_local_disk";
|
||||
const char* const kIONsRemote = "io_ns_remote";
|
||||
const char* const kIONsReadRemote = "io_ns_read_remote";
|
||||
const char* const kIONsWriteRemote = "io_ns_write_remote";
|
||||
const char* const kPrefetchHitCount = "prefetch_hit_count";
|
||||
const char* const kPrefetchWaitFinishNs = "prefetch_wait_finish_ns";
|
||||
const char* const kPrefetchPendingNs = "prefetch_pending_ns";
|
||||
|
|
|
|||
|
|
@ -2542,7 +2542,7 @@ void SegmentIterator::_update_stats(io::SeekableInputStream* rfile) {
|
|||
_opts.stats->io_ns_read_local_disk += value;
|
||||
} else if (name == kIONsWriteLocalDisk) {
|
||||
_opts.stats->io_ns_write_local_disk += value;
|
||||
} else if (name == kIONsRemote) {
|
||||
} else if (name == kIONsReadRemote) {
|
||||
_opts.stats->io_ns_remote += value;
|
||||
} else if (name == kPrefetchHitCount) {
|
||||
_opts.stats->prefetch_hit_count += value;
|
||||
|
|
|
|||
|
|
@ -445,4 +445,8 @@ int64_t SegmentWriter::bundle_file_offset() const {
|
|||
return _wfile->bundle_file_offset();
|
||||
}
|
||||
|
||||
StatusOr<std::unique_ptr<io::NumericStatistics>> SegmentWriter::get_numeric_statistics() {
|
||||
return _wfile->get_numeric_statistics();
|
||||
}
|
||||
|
||||
} // namespace starrocks
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@
|
|||
#include "common/status.h"
|
||||
#include "gen_cpp/segment.pb.h"
|
||||
#include "gutil/macros.h"
|
||||
#include "io/input_stream.h"
|
||||
#include "runtime/global_dict/types.h"
|
||||
#include "storage/row_store_encoder_factory.h"
|
||||
#include "storage/tablet_schema.h"
|
||||
|
|
@ -152,6 +153,8 @@ public:
|
|||
|
||||
int64_t bundle_file_offset() const;
|
||||
|
||||
StatusOr<std::unique_ptr<io::NumericStatistics>> get_numeric_statistics();
|
||||
|
||||
private:
|
||||
Status _write_short_key_index();
|
||||
Status _write_footer();
|
||||
|
|
|
|||
|
|
@ -161,12 +161,17 @@ TEST_P(StarletFileSystemTest, test_write_and_read) {
|
|||
EXPECT_OK(wf->append("hello"));
|
||||
EXPECT_OK(wf->append(" world!"));
|
||||
EXPECT_OK(wf->sync());
|
||||
ASSIGN_OR_ABORT(auto stats, wf->get_numeric_statistics());
|
||||
EXPECT_EQ((*stats).size(), 2);
|
||||
EXPECT_EQ((*stats).value(1), 12); // write bytes
|
||||
EXPECT_OK(wf->close());
|
||||
EXPECT_EQ(sizeof("hello world!"), wf->size() + 1);
|
||||
|
||||
char buf[1024];
|
||||
ASSIGN_OR_ABORT(auto rf, fs->new_random_access_file(uri));
|
||||
ASSIGN_OR_ABORT(auto nr, rf->read_at(0, buf, sizeof(buf)));
|
||||
ASSIGN_OR_ABORT(auto stats2, rf->get_numeric_statistics());
|
||||
EXPECT_EQ((*stats2).size(), 11);
|
||||
EXPECT_EQ("hello world!", std::string_view(buf, nr));
|
||||
|
||||
ASSIGN_OR_ABORT(nr, rf->read_at(3, buf, sizeof(buf)));
|
||||
|
|
|
|||
|
|
@ -45,6 +45,9 @@ public:
|
|||
StatusOr<int64_t> get_size() override { return _contents.size(); }
|
||||
|
||||
Status touch_cache(int64_t offset, size_t length) override { return Status::InvalidArgument("TestInputStream"); }
|
||||
StatusOr<std::unique_ptr<NumericStatistics>> get_numeric_statistics() override {
|
||||
return Status::InvalidArgument("TestInputStream");
|
||||
}
|
||||
|
||||
private:
|
||||
std::string _contents;
|
||||
|
|
@ -128,4 +131,11 @@ PARALLEL_TEST(SeekableInputStreamTest, test_touch_cache) {
|
|||
ASSERT_TRUE(wrapper.touch_cache(0, 0).is_invalid_argument());
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE
|
||||
PARALLEL_TEST(SeekableInputStreamTest, test_get_numeric_statistics) {
|
||||
TestInputStream in("0123456789", 5);
|
||||
SeekableInputStreamWrapper wrapper(&in, Ownership::kDontTakeOwnership);
|
||||
ASSERT_TRUE(wrapper.get_numeric_statistics().status().is_invalid_argument());
|
||||
}
|
||||
|
||||
} // namespace starrocks::io
|
||||
|
|
|
|||
|
|
@ -73,8 +73,8 @@ TEST_F(CompactionTaskContextTest, test_calculation) {
|
|||
stats.pk_sst_merge_ns = 5;
|
||||
stats.collect(reader_stats);
|
||||
|
||||
EXPECT_EQ(stats.io_ns_remote, 200);
|
||||
EXPECT_EQ(stats.io_ns_local_disk, 300);
|
||||
EXPECT_EQ(stats.io_ns_read_remote, 200);
|
||||
EXPECT_EQ(stats.io_ns_read_local_disk, 300);
|
||||
EXPECT_EQ(stats.segment_init_ns, 400);
|
||||
EXPECT_EQ(stats.column_iterator_init_ns, 500);
|
||||
EXPECT_EQ(stats.io_count_local_disk, 600);
|
||||
|
|
@ -86,8 +86,8 @@ TEST_F(CompactionTaskContextTest, test_calculation) {
|
|||
|
||||
CompactionTaskStats after_add = stats + stats;
|
||||
|
||||
EXPECT_EQ(after_add.io_ns_remote, 400);
|
||||
EXPECT_EQ(after_add.io_ns_local_disk, 600);
|
||||
EXPECT_EQ(after_add.io_ns_read_remote, 400);
|
||||
EXPECT_EQ(after_add.io_ns_read_local_disk, 600);
|
||||
EXPECT_EQ(after_add.segment_init_ns, 800);
|
||||
EXPECT_EQ(after_add.column_iterator_init_ns, 1000);
|
||||
EXPECT_EQ(after_add.io_count_local_disk, 1200);
|
||||
|
|
@ -99,8 +99,8 @@ TEST_F(CompactionTaskContextTest, test_calculation) {
|
|||
|
||||
CompactionTaskStats after_minus = stats - stats;
|
||||
|
||||
EXPECT_EQ(after_minus.io_ns_remote, 0);
|
||||
EXPECT_EQ(after_minus.io_ns_local_disk, 0);
|
||||
EXPECT_EQ(after_minus.io_ns_read_remote, 0);
|
||||
EXPECT_EQ(after_minus.io_ns_read_local_disk, 0);
|
||||
EXPECT_EQ(after_minus.segment_init_ns, 0);
|
||||
EXPECT_EQ(after_minus.column_iterator_init_ns, 0);
|
||||
EXPECT_EQ(after_minus.io_count_local_disk, 0);
|
||||
|
|
@ -109,6 +109,15 @@ TEST_F(CompactionTaskContextTest, test_calculation) {
|
|||
EXPECT_EQ(after_minus.io_bytes_read_local_disk, 0);
|
||||
EXPECT_EQ(after_minus.in_queue_time_sec, 0);
|
||||
EXPECT_EQ(after_minus.pk_sst_merge_ns, 0);
|
||||
|
||||
OlapWriterStatistics writer_stats;
|
||||
writer_stats.write_remote_ns = 10;
|
||||
writer_stats.bytes_write_remote = 100;
|
||||
writer_stats.segment_count = 1000;
|
||||
stats.collect(writer_stats);
|
||||
EXPECT_EQ(stats.io_ns_write_remote, 10);
|
||||
EXPECT_EQ(stats.write_segment_bytes, 100);
|
||||
EXPECT_EQ(stats.write_segment_count, 1000);
|
||||
}
|
||||
|
||||
TEST_F(CompactionTaskContextTest, test_to_json_stats) {
|
||||
|
|
@ -117,13 +126,16 @@ TEST_F(CompactionTaskContextTest, test_to_json_stats) {
|
|||
// Set up some stats to test the JSON output
|
||||
context.stats->io_bytes_read_remote = 1 * 1048576;
|
||||
context.stats->io_bytes_read_local_disk = 1 * 1048576;
|
||||
context.stats->io_ns_remote = 1 * TIME_UNIT_NS_PER_SECOND;
|
||||
context.stats->io_ns_local_disk = 9 * TIME_UNIT_NS_PER_SECOND;
|
||||
context.stats->io_ns_read_remote = 1 * TIME_UNIT_NS_PER_SECOND;
|
||||
context.stats->io_ns_read_local_disk = 9 * TIME_UNIT_NS_PER_SECOND;
|
||||
context.stats->segment_init_ns = 2 * TIME_UNIT_NS_PER_SECOND;
|
||||
context.stats->io_count_remote = 3;
|
||||
context.stats->io_count_local_disk = 2;
|
||||
context.stats->segment_init_ns = 3 * TIME_UNIT_NS_PER_SECOND;
|
||||
context.stats->column_iterator_init_ns = 4 * TIME_UNIT_NS_PER_SECOND;
|
||||
context.stats->write_segment_count = 2;
|
||||
context.stats->write_segment_bytes = 1 * 1048576;
|
||||
context.stats->io_ns_write_remote = 3 * TIME_UNIT_NS_PER_SECOND;
|
||||
context.stats->in_queue_time_sec = 5;
|
||||
context.stats->pk_sst_merge_ns = 5 * TIME_UNIT_NS_PER_SECOND;
|
||||
|
||||
|
|
@ -137,6 +149,9 @@ TEST_F(CompactionTaskContextTest, test_to_json_stats) {
|
|||
EXPECT_THAT(json_stats, testing::HasSubstr(R"("read_local_sec":9)"));
|
||||
EXPECT_THAT(json_stats, testing::HasSubstr(R"("read_remote_count":3)"));
|
||||
EXPECT_THAT(json_stats, testing::HasSubstr(R"("read_local_count":2)"));
|
||||
EXPECT_THAT(json_stats, testing::HasSubstr(R"("write_segment_count":2)"));
|
||||
EXPECT_THAT(json_stats, testing::HasSubstr(R"("write_remote_mb":1)"));
|
||||
EXPECT_THAT(json_stats, testing::HasSubstr(R"("write_remote_sec":3)"));
|
||||
EXPECT_THAT(json_stats, testing::HasSubstr(R"("in_queue_sec":5)"));
|
||||
EXPECT_THAT(json_stats, testing::HasSubstr(R"("pk_sst_merge_sec":5)"));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -193,6 +193,7 @@ public class CompactionJob {
|
|||
stat.readSegmentCount = 0L;
|
||||
stat.writeSegmentCount = 0L;
|
||||
stat.writeSegmentBytes = 0L;
|
||||
stat.writeTimeRemote = 0L;
|
||||
stat.inQueueTimeSec = 0;
|
||||
for (CompactionTask task : tasks) {
|
||||
List<CompactStat> subStats = task.getCompactStats();
|
||||
|
|
@ -228,6 +229,9 @@ public class CompactionJob {
|
|||
if (subStat.writeSegmentBytes != null) {
|
||||
stat.writeSegmentBytes += subStat.writeSegmentBytes;
|
||||
}
|
||||
if (subStat.writeTimeRemote != null) {
|
||||
stat.writeTimeRemote += subStat.writeTimeRemote;
|
||||
}
|
||||
}
|
||||
stat.subTaskCount += subTaskCount;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,6 +37,8 @@ class CompactionProfile {
|
|||
private long writeSegmentCount;
|
||||
@SerializedName(value = "write_segment_mb")
|
||||
private long writeSegmentMb;
|
||||
@SerializedName(value = "write_remote_sec")
|
||||
private long writeRemoteSec;
|
||||
@SerializedName(value = "in_queue_sec")
|
||||
private int inQueueSec;
|
||||
|
||||
|
|
@ -50,6 +52,7 @@ class CompactionProfile {
|
|||
readSegmentCount = stat.readSegmentCount;
|
||||
writeSegmentCount = stat.writeSegmentCount;
|
||||
writeSegmentMb = stat.writeSegmentBytes / 1048576;
|
||||
writeRemoteSec = stat.writeTimeRemote / 1000000000L;
|
||||
inQueueSec = stat.inQueueTimeSec;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -108,7 +108,8 @@ public class CompactionJobTest {
|
|||
stat.readSegmentCount = 6L;
|
||||
stat.writeSegmentCount = 7L;
|
||||
stat.writeSegmentBytes = 8L;
|
||||
stat.inQueueTimeSec = 9;
|
||||
stat.writeTimeRemote = 9L;
|
||||
stat.inQueueTimeSec = 10;
|
||||
list.add(stat);
|
||||
return list;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ class CompactionProfileTest {
|
|||
stat.readSegmentCount = 7L;
|
||||
stat.writeSegmentCount = 8L;
|
||||
stat.writeSegmentBytes = 9L;
|
||||
stat.writeTimeRemote = 10L;
|
||||
|
||||
CompactionProfile profile = new CompactionProfile(stat);
|
||||
|
||||
|
|
@ -43,6 +44,7 @@ class CompactionProfileTest {
|
|||
Assertions.assertTrue(s.contains("read_segment_count"));
|
||||
Assertions.assertTrue(s.contains("write_segment_count"));
|
||||
Assertions.assertTrue(s.contains("write_segment_mb"));
|
||||
Assertions.assertTrue(s.contains("write_remote_sec"));
|
||||
Assertions.assertTrue(s.contains("in_queue_sec"));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -137,6 +137,7 @@ message CompactStat {
|
|||
// write
|
||||
optional int64 write_segment_count = 31;
|
||||
optional int64 write_segment_bytes = 32;
|
||||
optional int64 write_time_remote = 33; // ns
|
||||
|
||||
// other
|
||||
optional int32 sub_task_count = 51; // tablet count in this compaction request
|
||||
|
|
|
|||
Loading…
Reference in New Issue