Co-authored-by: Murphy <96611012+murphyatwork@users.noreply.github.com> Co-authored-by: Murphy <mofei@starrocks.com>
This commit is contained in:
parent
7d30a58f88
commit
fadb8061f3
|
|
@ -127,106 +127,21 @@ protected:
|
|||
private:
|
||||
struct ScanContext {
|
||||
ScanContext() = default;
|
||||
|
||||
~ScanContext() = default;
|
||||
|
||||
void close() {
|
||||
_read_chunk.reset();
|
||||
_dict_chunk.reset();
|
||||
_final_chunk.reset();
|
||||
_adapt_global_dict_chunk.reset();
|
||||
}
|
||||
|
||||
Status seek_columns(ordinal_t pos) {
|
||||
for (auto iter : _column_iterators) {
|
||||
RETURN_IF_ERROR(iter->seek_to_ordinal(pos));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status read_columns(Chunk* chunk, const SparseRange<>& range) {
|
||||
bool may_has_del_row = chunk->delete_state() != DEL_NOT_SATISFIED;
|
||||
std::vector<size_t> pruned_cols;
|
||||
size_t pruned_col_size = 0;
|
||||
for (size_t i = 0; i < _column_iterators.size(); i++) {
|
||||
ColumnPtr& col = chunk->get_column_by_index(i);
|
||||
if (_prune_column_after_index_filter && _prune_cols.count(i)) {
|
||||
pruned_cols.push_back(i);
|
||||
continue;
|
||||
}
|
||||
RETURN_IF_ERROR(_column_iterators[i]->next_batch(range, col.get()));
|
||||
if (pruned_col_size == 0) {
|
||||
pruned_col_size = col->size();
|
||||
}
|
||||
DCHECK_EQ(pruned_col_size, col->size());
|
||||
may_has_del_row |= (col->delete_state() != DEL_NOT_SATISFIED);
|
||||
}
|
||||
for (size_t i : pruned_cols) {
|
||||
ColumnPtr& col = chunk->get_column_by_index(i);
|
||||
// make sure each pruned column has the same size as the unpruneable one.
|
||||
col->resize(pruned_col_size);
|
||||
}
|
||||
chunk->set_delete_state(may_has_del_row ? DEL_PARTIAL_SATISFIED : DEL_NOT_SATISFIED);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
int64_t memory_usage() const {
|
||||
int64_t usage = 0;
|
||||
usage += (_read_chunk != nullptr) ? _read_chunk->memory_usage() : 0;
|
||||
usage += (_dict_chunk.get() != _read_chunk.get()) ? _dict_chunk->memory_usage() : 0;
|
||||
usage += (_final_chunk.get() != _dict_chunk.get()) ? _final_chunk->memory_usage() : 0;
|
||||
usage += (_adapt_global_dict_chunk.get() != _final_chunk.get()) ? _adapt_global_dict_chunk->memory_usage()
|
||||
: 0;
|
||||
return usage;
|
||||
}
|
||||
|
||||
size_t column_size() { return _column_iterators.size(); }
|
||||
|
||||
std::string to_string() const {
|
||||
std::ostringstream oss;
|
||||
oss << "{";
|
||||
oss << "\"_read_schema\": " << _read_schema.to_string() << ",";
|
||||
oss << "\"_dict_decode_schema\": " << _dict_decode_schema.to_string() << ",";
|
||||
oss << "\"_is_dict_column\": [";
|
||||
for (size_t i = 0; i < _is_dict_column.size(); ++i) {
|
||||
oss << (_is_dict_column[i] ? "true" : "false");
|
||||
if (i + 1 < _is_dict_column.size()) oss << ", ";
|
||||
}
|
||||
oss << "],";
|
||||
oss << "\"_column_iterators\": " << _column_iterators.size() << ",";
|
||||
oss << "\"_subfield_columns\": [";
|
||||
for (size_t i = 0; i < _subfield_columns.size(); ++i) {
|
||||
oss << _subfield_columns[i];
|
||||
if (i + 1 < _subfield_columns.size()) oss << ", ";
|
||||
}
|
||||
oss << "],";
|
||||
oss << "\"_subfield_iterators\": " << _subfield_iterators.size() << ",";
|
||||
oss << "\"_skip_dict_decode_indexes\": [";
|
||||
for (size_t i = 0; i < _skip_dict_decode_indexes.size(); ++i) {
|
||||
oss << (_skip_dict_decode_indexes[i] ? "true" : "false");
|
||||
if (i + 1 < _skip_dict_decode_indexes.size()) oss << ", ";
|
||||
}
|
||||
oss << "],";
|
||||
oss << "\"_read_index_map\": [";
|
||||
for (size_t i = 0; i < _read_index_map.size(); ++i) {
|
||||
oss << _read_index_map[i];
|
||||
if (i + 1 < _read_index_map.size()) oss << ", ";
|
||||
}
|
||||
oss << "],";
|
||||
oss << "\"_has_dict_column\": " << (_has_dict_column ? "true" : "false") << ",";
|
||||
oss << "\"_late_materialize\": " << (_late_materialize ? "true" : "false") << ",";
|
||||
oss << "\"_has_force_dict_encode\": " << (_has_force_dict_encode ? "true" : "false") << ",";
|
||||
oss << "\"_prune_cols\": [";
|
||||
size_t prune_count = 0;
|
||||
for (auto idx : _prune_cols) {
|
||||
if (prune_count++ > 0) oss << ", ";
|
||||
oss << idx;
|
||||
}
|
||||
oss << "],";
|
||||
oss << "\"_prune_column_after_index_filter\": " << (_prune_column_after_index_filter ? "true" : "false");
|
||||
oss << "}";
|
||||
return oss.str();
|
||||
}
|
||||
// Release all chunk resources to free memory
|
||||
void close();
|
||||
// Seek all column iterators to the specified ordinal position
|
||||
Status seek_columns(ordinal_t pos);
|
||||
// Read column data from the specified range into the chunk
|
||||
// Handles column pruning and delete state tracking
|
||||
Status read_columns(Chunk* chunk, const SparseRange<>& range);
|
||||
// Calculate total memory usage of all chunks in this context
|
||||
int64_t memory_usage() const;
|
||||
// Get the number of column iterators in this context
|
||||
size_t column_size();
|
||||
// Generate a JSON string representation of the context state for debugging
|
||||
std::string to_string() const;
|
||||
|
||||
Schema _read_schema;
|
||||
Schema _dict_decode_schema;
|
||||
|
|
@ -265,6 +180,56 @@ private:
|
|||
bool _prune_column_after_index_filter = false;
|
||||
};
|
||||
|
||||
// Vector index related context, only created when needed
|
||||
struct VectorIndexContext {
|
||||
VectorIndexContext() = default;
|
||||
~VectorIndexContext() = default;
|
||||
|
||||
// Vector index parameters
|
||||
int64_t k = 0;
|
||||
bool use_vector_index = false;
|
||||
std::string vector_distance_column_name;
|
||||
int vector_column_id = -1;
|
||||
SlotId vector_slot_id = -1;
|
||||
std::unordered_map<rowid_t, float> id2distance_map;
|
||||
std::map<std::string, std::string> query_params;
|
||||
double vector_range = -1.0;
|
||||
int result_order = 0;
|
||||
bool use_ivfpq = false;
|
||||
|
||||
#ifdef WITH_TENANN
|
||||
tenann::PrimitiveSeqView query_view;
|
||||
std::shared_ptr<tenann::IndexMeta> index_meta;
|
||||
#endif
|
||||
|
||||
std::shared_ptr<VectorIndexReader> ann_reader;
|
||||
|
||||
// Helper method to check if rowid should always be built
|
||||
bool always_build_rowid() const { return use_vector_index && !use_ivfpq; }
|
||||
};
|
||||
|
||||
// Inverted index related context, only created when needed
|
||||
struct InvertedIndexContext {
|
||||
InvertedIndexContext() = default;
|
||||
~InvertedIndexContext() = default;
|
||||
|
||||
// Inverted index state
|
||||
bool has_inverted_index = false;
|
||||
std::vector<InvertedIndexIterator*> inverted_index_iterators;
|
||||
std::unordered_set<ColumnId> prune_cols_candidate_by_inverted_index;
|
||||
|
||||
// Cleanup method to properly delete iterators
|
||||
void cleanup() {
|
||||
for (auto* iter : inverted_index_iterators) {
|
||||
if (iter != nullptr) {
|
||||
delete iter;
|
||||
}
|
||||
}
|
||||
inverted_index_iterators.clear();
|
||||
has_inverted_index = false;
|
||||
}
|
||||
};
|
||||
|
||||
Status _init();
|
||||
Status _try_to_update_ranges_by_runtime_filter();
|
||||
Status _do_get_next(Chunk* result, vector<rowid_t>* rowid);
|
||||
|
|
@ -368,16 +333,19 @@ private:
|
|||
|
||||
IndexReadOptions _index_read_options(ColumnId cid) const;
|
||||
|
||||
Status _init_reader_from_file(const std::string& index_path, const std::shared_ptr<TabletIndex>& tablet_index_meta,
|
||||
const std::map<std::string, std::string>& query_params);
|
||||
|
||||
private:
|
||||
using RawColumnIterators = std::vector<std::unique_ptr<ColumnIterator>>;
|
||||
using ColumnDecoders = std::vector<ColumnDecoder>;
|
||||
|
||||
std::shared_ptr<Segment> _segment;
|
||||
std::unordered_map<std::string, std::shared_ptr<Segment>> _dcg_segments;
|
||||
SegmentReadOptions _opts;
|
||||
RawColumnIterators _column_iterators;
|
||||
std::vector<int> _io_coalesce_column_index;
|
||||
ColumnDecoders _column_decoders;
|
||||
std::shared_ptr<VectorIndexReader> _ann_reader;
|
||||
BitmapIndexEvaluator _bitmap_index_evaluator;
|
||||
// delete predicates
|
||||
std::map<ColumnId, ColumnOrPredicate> _del_predicates;
|
||||
|
|
@ -425,65 +393,146 @@ private:
|
|||
int _reserve_chunk_size = 0;
|
||||
|
||||
bool _inited = false;
|
||||
bool _has_inverted_index = false;
|
||||
|
||||
std::vector<InvertedIndexIterator*> _inverted_index_iterators;
|
||||
|
||||
std::unordered_map<ColumnId, ColumnAccessPath*> _column_access_paths;
|
||||
std::unordered_map<ColumnId, ColumnAccessPath*> _predicate_column_access_paths;
|
||||
|
||||
std::unordered_set<ColumnId> _prune_cols_candidate_by_inverted_index;
|
||||
// Vector index context - only created when needed
|
||||
std::unique_ptr<VectorIndexContext> _vector_index_ctx;
|
||||
|
||||
// vector index params
|
||||
int64_t _k;
|
||||
#ifdef WITH_TENANN
|
||||
tenann::PrimitiveSeqView _query_view;
|
||||
std::shared_ptr<tenann::IndexMeta> _index_meta;
|
||||
#endif
|
||||
|
||||
bool _always_build_rowid() const { return _use_vector_index && !_use_ivfpq; }
|
||||
|
||||
bool _use_vector_index;
|
||||
std::string _vector_distance_column_name;
|
||||
int _vector_column_id;
|
||||
SlotId _vector_slot_id;
|
||||
std::unordered_map<rowid_t, float> _id2distance_map;
|
||||
std::map<std::string, std::string> _query_params;
|
||||
double _vector_range;
|
||||
int _result_order;
|
||||
bool _use_ivfpq;
|
||||
|
||||
Status _init_reader_from_file(const std::string& index_path, const std::shared_ptr<TabletIndex>& tablet_index_meta,
|
||||
const std::map<std::string, std::string>& query_params);
|
||||
// Inverted index context - only created when needed
|
||||
std::unique_ptr<InvertedIndexContext> _inverted_index_ctx;
|
||||
};
|
||||
|
||||
// ScanContext method implementations
|
||||
|
||||
void SegmentIterator::ScanContext::close() {
|
||||
_read_chunk.reset();
|
||||
_dict_chunk.reset();
|
||||
_final_chunk.reset();
|
||||
_adapt_global_dict_chunk.reset();
|
||||
}
|
||||
|
||||
Status SegmentIterator::ScanContext::seek_columns(ordinal_t pos) {
|
||||
for (auto iter : _column_iterators) {
|
||||
RETURN_IF_ERROR(iter->seek_to_ordinal(pos));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SegmentIterator::ScanContext::read_columns(Chunk* chunk, const SparseRange<>& range) {
|
||||
bool may_has_del_row = chunk->delete_state() != DEL_NOT_SATISFIED;
|
||||
std::vector<size_t> pruned_cols;
|
||||
size_t pruned_col_size = 0;
|
||||
size_t num_rows = chunk->num_rows();
|
||||
for (size_t i = 0; i < _column_iterators.size(); i++) {
|
||||
ColumnPtr& col = chunk->get_column_by_index(i);
|
||||
if (_prune_column_after_index_filter && _prune_cols.count(i)) {
|
||||
pruned_cols.push_back(i);
|
||||
continue;
|
||||
}
|
||||
RETURN_IF_ERROR(_column_iterators[i]->next_batch(range, col.get()));
|
||||
if (pruned_col_size == 0) {
|
||||
pruned_col_size = col->size();
|
||||
}
|
||||
DCHECK_EQ(pruned_col_size, col->size());
|
||||
DCHECK_EQ(num_rows + range.span_size(), col->size());
|
||||
may_has_del_row |= (col->delete_state() != DEL_NOT_SATISFIED);
|
||||
}
|
||||
for (size_t i : pruned_cols) {
|
||||
ColumnPtr& col = chunk->get_column_by_index(i);
|
||||
// make sure each pruned column has the same size as the unpruneable one.
|
||||
col->resize(pruned_col_size);
|
||||
}
|
||||
chunk->set_delete_state(may_has_del_row ? DEL_PARTIAL_SATISFIED : DEL_NOT_SATISFIED);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
int64_t SegmentIterator::ScanContext::memory_usage() const {
|
||||
int64_t usage = 0;
|
||||
usage += (_read_chunk != nullptr) ? _read_chunk->memory_usage() : 0;
|
||||
usage += (_dict_chunk.get() != _read_chunk.get()) ? _dict_chunk->memory_usage() : 0;
|
||||
usage += (_final_chunk.get() != _dict_chunk.get()) ? _final_chunk->memory_usage() : 0;
|
||||
usage += (_adapt_global_dict_chunk.get() != _final_chunk.get()) ? _adapt_global_dict_chunk->memory_usage() : 0;
|
||||
return usage;
|
||||
}
|
||||
|
||||
size_t SegmentIterator::ScanContext::column_size() {
|
||||
return _column_iterators.size();
|
||||
}
|
||||
|
||||
std::string SegmentIterator::ScanContext::to_string() const {
|
||||
std::ostringstream oss;
|
||||
oss << "{";
|
||||
oss << "\"_read_schema\": " << _read_schema.to_string() << ",";
|
||||
oss << "\"_dict_decode_schema\": " << _dict_decode_schema.to_string() << ",";
|
||||
oss << "\"_is_dict_column\": [";
|
||||
for (size_t i = 0; i < _is_dict_column.size(); ++i) {
|
||||
oss << (_is_dict_column[i] ? "true" : "false");
|
||||
if (i + 1 < _is_dict_column.size()) oss << ", ";
|
||||
}
|
||||
oss << "],";
|
||||
oss << "\"_column_iterators\": " << _column_iterators.size() << ",";
|
||||
oss << "\"_subfield_columns\": [";
|
||||
for (size_t i = 0; i < _subfield_columns.size(); ++i) {
|
||||
oss << _subfield_columns[i];
|
||||
if (i + 1 < _subfield_columns.size()) oss << ", ";
|
||||
}
|
||||
oss << "],";
|
||||
oss << "\"_subfield_iterators\": " << _subfield_iterators.size() << ",";
|
||||
oss << "\"_skip_dict_decode_indexes\": [";
|
||||
for (size_t i = 0; i < _skip_dict_decode_indexes.size(); ++i) {
|
||||
oss << (_skip_dict_decode_indexes[i] ? "true" : "false");
|
||||
if (i + 1 < _skip_dict_decode_indexes.size()) oss << ", ";
|
||||
}
|
||||
oss << "],";
|
||||
oss << "\"_read_index_map\": [";
|
||||
for (size_t i = 0; i < _read_index_map.size(); ++i) {
|
||||
oss << _read_index_map[i];
|
||||
if (i + 1 < _read_index_map.size()) oss << ", ";
|
||||
}
|
||||
oss << "],";
|
||||
oss << "\"_has_dict_column\": " << (_has_dict_column ? "true" : "false") << ",";
|
||||
oss << "\"_late_materialize\": " << (_late_materialize ? "true" : "false") << ",";
|
||||
oss << "\"_has_force_dict_encode\": " << (_has_force_dict_encode ? "true" : "false") << ",";
|
||||
oss << "\"_prune_cols\": [";
|
||||
size_t prune_count = 0;
|
||||
for (auto idx : _prune_cols) {
|
||||
if (prune_count++ > 0) oss << ", ";
|
||||
oss << idx;
|
||||
}
|
||||
oss << "],";
|
||||
oss << "\"_prune_column_after_index_filter\": " << (_prune_column_after_index_filter ? "true" : "false");
|
||||
oss << "}";
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, Schema schema, SegmentReadOptions options)
|
||||
: ChunkIterator(std::move(schema), options.chunk_size),
|
||||
_segment(std::move(segment)),
|
||||
_opts(std::move(options)),
|
||||
_bitmap_index_evaluator(_schema, _opts.pred_tree),
|
||||
_predicate_columns(_opts.pred_tree.num_columns()),
|
||||
_use_vector_index(_opts.use_vector_index) {
|
||||
if (_use_vector_index) {
|
||||
// The K in front of Fe is long, which can be changed to uint32. This can be a problem,
|
||||
// but this k is wasted memory allocation, so it should not exceed the accuracy of uint32
|
||||
// options.query_vector is a string, passed to tenann as a float string, see if you need to use the stof function to convert
|
||||
// and consider precision loss
|
||||
_vector_distance_column_name = _opts.vector_search_option->vector_distance_column_name;
|
||||
_vector_column_id = _opts.vector_search_option->vector_column_id;
|
||||
_vector_slot_id = _opts.vector_search_option->vector_slot_id;
|
||||
_vector_range = _opts.vector_search_option->vector_range;
|
||||
_result_order = _opts.vector_search_option->result_order;
|
||||
_use_ivfpq = _opts.vector_search_option->use_ivfpq;
|
||||
_query_params = _opts.vector_search_option->query_params;
|
||||
if (_vector_range >= 0 && _use_ivfpq) {
|
||||
_k = _opts.vector_search_option->k * _opts.vector_search_option->pq_refine_factor *
|
||||
_opts.vector_search_option->k_factor;
|
||||
_predicate_columns(_opts.pred_tree.num_columns()) {
|
||||
// Initialize vector index context only when needed
|
||||
if (_opts.use_vector_index) {
|
||||
_vector_index_ctx = std::make_unique<VectorIndexContext>();
|
||||
_vector_index_ctx->use_vector_index = true;
|
||||
_vector_index_ctx->vector_distance_column_name = _opts.vector_search_option->vector_distance_column_name;
|
||||
_vector_index_ctx->vector_column_id = _opts.vector_search_option->vector_column_id;
|
||||
_vector_index_ctx->vector_slot_id = _opts.vector_search_option->vector_slot_id;
|
||||
_vector_index_ctx->vector_range = _opts.vector_search_option->vector_range;
|
||||
_vector_index_ctx->result_order = _opts.vector_search_option->result_order;
|
||||
_vector_index_ctx->use_ivfpq = _opts.vector_search_option->use_ivfpq;
|
||||
_vector_index_ctx->query_params = _opts.vector_search_option->query_params;
|
||||
|
||||
if (_vector_index_ctx->vector_range >= 0 && _vector_index_ctx->use_ivfpq) {
|
||||
_vector_index_ctx->k = _opts.vector_search_option->k * _opts.vector_search_option->pq_refine_factor *
|
||||
_opts.vector_search_option->k_factor;
|
||||
} else {
|
||||
_k = _opts.vector_search_option->k * _opts.vector_search_option->k_factor;
|
||||
_vector_index_ctx->k = _opts.vector_search_option->k * _opts.vector_search_option->k_factor;
|
||||
}
|
||||
#ifdef WITH_TENANN
|
||||
_query_view = tenann::PrimitiveSeqView{
|
||||
_vector_index_ctx->query_view = tenann::PrimitiveSeqView{
|
||||
.data = reinterpret_cast<uint8_t*>(_opts.vector_search_option->query_vector.data()),
|
||||
.size = static_cast<uint32_t>(_opts.vector_search_option->query_vector.size()),
|
||||
.elem_type = tenann::PrimitiveType::kFloatType};
|
||||
|
|
@ -610,13 +659,17 @@ inline Status SegmentIterator::_init_reader_from_file(const std::string& index_p
|
|||
const std::shared_ptr<TabletIndex>& tablet_index_meta,
|
||||
const std::map<std::string, std::string>& query_params) {
|
||||
#ifdef WITH_TENANN
|
||||
if (!_vector_index_ctx) {
|
||||
return Status::OK();
|
||||
}
|
||||
ASSIGN_OR_RETURN(auto meta, get_vector_meta(tablet_index_meta, query_params))
|
||||
_index_meta = std::make_shared<tenann::IndexMeta>(std::move(meta));
|
||||
RETURN_IF_ERROR(VectorIndexReaderFactory::create_from_file(index_path, _index_meta, &_ann_reader));
|
||||
auto status = _ann_reader->init_searcher(*_index_meta.get(), index_path);
|
||||
_vector_index_ctx->index_meta = std::make_shared<tenann::IndexMeta>(std::move(meta));
|
||||
RETURN_IF_ERROR(VectorIndexReaderFactory::create_from_file(index_path, _vector_index_ctx->index_meta,
|
||||
&_vector_index_ctx->ann_reader));
|
||||
auto status = _vector_index_ctx->ann_reader->init_searcher(*_vector_index_ctx->index_meta.get(), index_path);
|
||||
// means empty ann reader
|
||||
if (status.is_not_supported()) {
|
||||
_use_vector_index = false;
|
||||
_vector_index_ctx->use_vector_index = false;
|
||||
return Status::OK();
|
||||
}
|
||||
return status;
|
||||
|
|
@ -627,7 +680,9 @@ inline Status SegmentIterator::_init_reader_from_file(const std::string& index_p
|
|||
|
||||
Status SegmentIterator::_init_ann_reader() {
|
||||
#ifdef WITH_TENANN
|
||||
RETURN_IF(!_use_vector_index, Status::OK());
|
||||
if (!_vector_index_ctx || !_vector_index_ctx->use_vector_index) {
|
||||
return Status::OK();
|
||||
}
|
||||
std::unordered_map<int32_t, TabletIndex> col_map_index;
|
||||
for (const auto& index : *_segment->tablet_schema().indexes()) {
|
||||
if (index.index_type() == VECTOR) {
|
||||
|
|
@ -654,7 +709,7 @@ Status SegmentIterator::_init_ann_reader() {
|
|||
std::string index_path = IndexDescriptor::vector_index_file_path(_opts.rowset_path, _opts.rowsetid.to_string(),
|
||||
segment_id(), tablet_index_meta->index_id());
|
||||
|
||||
return _init_reader_from_file(index_path, tablet_index_meta, _query_params);
|
||||
return _init_reader_from_file(index_path, tablet_index_meta, _vector_index_ctx->query_params);
|
||||
#else
|
||||
return Status::OK();
|
||||
#endif
|
||||
|
|
@ -662,7 +717,9 @@ Status SegmentIterator::_init_ann_reader() {
|
|||
|
||||
Status SegmentIterator::_get_row_ranges_by_vector_index() {
|
||||
#ifdef WITH_TENANN
|
||||
RETURN_IF(!_use_vector_index, Status::OK());
|
||||
if (!_vector_index_ctx || !_vector_index_ctx->use_vector_index) {
|
||||
return Status::OK();
|
||||
}
|
||||
RETURN_IF(_scan_range.empty(), Status::OK());
|
||||
|
||||
SCOPED_RAW_TIMER(&_opts.stats->get_row_ranges_by_vector_index_timer);
|
||||
|
|
@ -676,14 +733,16 @@ Status SegmentIterator::_get_row_ranges_by_vector_index() {
|
|||
|
||||
{
|
||||
SCOPED_RAW_TIMER(&_opts.stats->vector_search_timer);
|
||||
if (_vector_range >= 0) {
|
||||
st = _ann_reader->range_search(_query_view, _k, &result_ids, &result_distances, &del_id_filter,
|
||||
static_cast<float>(_vector_range), _result_order);
|
||||
if (_vector_index_ctx->vector_range >= 0) {
|
||||
st = _vector_index_ctx->ann_reader->range_search(
|
||||
_vector_index_ctx->query_view, _vector_index_ctx->k, &result_ids, &result_distances, &del_id_filter,
|
||||
static_cast<float>(_vector_index_ctx->vector_range), _vector_index_ctx->result_order);
|
||||
} else {
|
||||
result_ids.resize(_k);
|
||||
result_distances.resize(_k);
|
||||
st = _ann_reader->search(_query_view, _k, (result_ids.data()),
|
||||
reinterpret_cast<uint8_t*>(result_distances.data()), &del_id_filter);
|
||||
result_ids.resize(_vector_index_ctx->k);
|
||||
result_distances.resize(_vector_index_ctx->k);
|
||||
st = _vector_index_ctx->ann_reader->search(
|
||||
_vector_index_ctx->query_view, _vector_index_ctx->k, (result_ids.data()),
|
||||
reinterpret_cast<uint8_t*>(result_distances.data()), &del_id_filter);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -713,9 +772,10 @@ Status SegmentIterator::_get_row_ranges_by_vector_index() {
|
|||
}
|
||||
}
|
||||
|
||||
_id2distance_map.reserve(filtered_result_ids.size());
|
||||
_vector_index_ctx->id2distance_map.reserve(filtered_result_ids.size());
|
||||
for (size_t i = 0; i < filtered_result_ids.size(); i++) {
|
||||
_id2distance_map[static_cast<rowid_t>(filtered_result_ids[i])] = id2distance_map[filtered_result_ids[i]];
|
||||
_vector_index_ctx->id2distance_map[static_cast<rowid_t>(filtered_result_ids[i])] =
|
||||
id2distance_map[filtered_result_ids[i]];
|
||||
}
|
||||
return Status::OK();
|
||||
#else
|
||||
|
|
@ -1442,7 +1502,8 @@ Status SegmentIterator::do_get_next(Chunk* chunk) {
|
|||
|
||||
Status st;
|
||||
std::vector<uint32_t> rowids;
|
||||
std::vector<uint32_t>* p_rowids = _always_build_rowid() ? &rowids : nullptr;
|
||||
std::vector<uint32_t>* p_rowids =
|
||||
(_vector_index_ctx && _vector_index_ctx->always_build_rowid()) ? &rowids : nullptr;
|
||||
do {
|
||||
st = _do_get_next(chunk, p_rowids);
|
||||
} while (st.ok() && chunk->num_rows() == 0);
|
||||
|
|
@ -1638,13 +1699,13 @@ Status SegmentIterator::_do_get_next(Chunk* result, vector<rowid_t>* rowid) {
|
|||
chunk = _context->_adapt_global_dict_chunk.get();
|
||||
}
|
||||
|
||||
if (_use_vector_index && !_use_ivfpq) {
|
||||
if (_vector_index_ctx && _vector_index_ctx->use_vector_index && !_vector_index_ctx->use_ivfpq) {
|
||||
DCHECK(rowid != nullptr);
|
||||
FloatColumn::MutablePtr distance_column = FloatColumn::create();
|
||||
vector<rowid_t> rowids;
|
||||
for (const auto& rid : *rowid) {
|
||||
auto it = _id2distance_map.find(rid);
|
||||
if (LIKELY(it != _id2distance_map.end())) {
|
||||
auto it = _vector_index_ctx->id2distance_map.find(rid);
|
||||
if (LIKELY(it != _vector_index_ctx->id2distance_map.end())) {
|
||||
rowids.emplace_back(it->first);
|
||||
} else {
|
||||
DCHECK(false) << "not found row id:" << rid << " in distance map";
|
||||
|
|
@ -1652,11 +1713,12 @@ Status SegmentIterator::_do_get_next(Chunk* result, vector<rowid_t>* rowid) {
|
|||
}
|
||||
}
|
||||
for (const auto& vrid : rowids) {
|
||||
distance_column->append(_id2distance_map[vrid]);
|
||||
distance_column->append(_vector_index_ctx->id2distance_map[vrid]);
|
||||
}
|
||||
|
||||
// TODO: plan vector column in FE Planner
|
||||
chunk->append_vector_column(std::move(distance_column), _make_field(_vector_column_id), _vector_slot_id);
|
||||
chunk->append_vector_column(std::move(distance_column), _make_field(_vector_index_ctx->vector_column_id),
|
||||
_vector_index_ctx->vector_slot_id);
|
||||
}
|
||||
|
||||
result->swap_chunk(*chunk);
|
||||
|
|
@ -1669,7 +1731,8 @@ Status SegmentIterator::_do_get_next(Chunk* result, vector<rowid_t>* rowid) {
|
|||
}
|
||||
|
||||
FieldPtr SegmentIterator::_make_field(size_t i) {
|
||||
return std::make_shared<Field>(i, _vector_distance_column_name, get_type_info(TYPE_FLOAT), false);
|
||||
DCHECK(_vector_index_ctx != nullptr);
|
||||
return std::make_shared<Field>(i, _vector_index_ctx->vector_distance_column_name, get_type_info(TYPE_FLOAT), false);
|
||||
}
|
||||
|
||||
Status SegmentIterator::_switch_context(ScanContext* to) {
|
||||
|
|
@ -1868,7 +1931,7 @@ Status SegmentIterator::_build_context(ScanContext* ctx) {
|
|||
ctx->_skip_dict_decode_indexes.push_back(false);
|
||||
} else {
|
||||
ctx->_skip_dict_decode_indexes.push_back(true);
|
||||
if (_prune_cols_candidate_by_inverted_index.count(f->id())) {
|
||||
if (_inverted_index_ctx && _inverted_index_ctx->prune_cols_candidate_by_inverted_index.count(f->id())) {
|
||||
// The column is pruneable if and only if:
|
||||
// 1. column in _prune_cols_candidate_by_inverted_index
|
||||
// 2. column not in output schema
|
||||
|
|
@ -2372,7 +2435,11 @@ Status SegmentIterator::_apply_del_vector() {
|
|||
}
|
||||
|
||||
Status SegmentIterator::_init_inverted_index_iterators() {
|
||||
_inverted_index_iterators.resize(ChunkHelper::max_column_id(_schema) + 1, nullptr);
|
||||
if (!_inverted_index_ctx) {
|
||||
_inverted_index_ctx = std::make_unique<InvertedIndexContext>();
|
||||
}
|
||||
|
||||
_inverted_index_ctx->inverted_index_iterators.resize(ChunkHelper::max_column_id(_schema) + 1, nullptr);
|
||||
std::unordered_map<ColumnId, ColumnUID> cid_2_ucid;
|
||||
|
||||
for (auto& field : _schema.fields()) {
|
||||
|
|
@ -2382,9 +2449,10 @@ Status SegmentIterator::_init_inverted_index_iterators() {
|
|||
ColumnId cid = pair.first;
|
||||
ColumnUID ucid = cid_2_ucid[cid];
|
||||
|
||||
if (_inverted_index_iterators[cid] == nullptr) {
|
||||
RETURN_IF_ERROR(_segment->new_inverted_index_iterator(ucid, &_inverted_index_iterators[cid], _opts));
|
||||
_has_inverted_index |= (_inverted_index_iterators[cid] != nullptr);
|
||||
if (_inverted_index_ctx->inverted_index_iterators[cid] == nullptr) {
|
||||
RETURN_IF_ERROR(_segment->new_inverted_index_iterator(
|
||||
ucid, &_inverted_index_ctx->inverted_index_iterators[cid], _opts));
|
||||
_inverted_index_ctx->has_inverted_index |= (_inverted_index_ctx->inverted_index_iterators[cid] != nullptr);
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
|
|
@ -2395,7 +2463,9 @@ Status SegmentIterator::_apply_inverted_index() {
|
|||
RETURN_IF(!_opts.enable_gin_filter, Status::OK());
|
||||
|
||||
RETURN_IF_ERROR(_init_inverted_index_iterators());
|
||||
RETURN_IF(!_has_inverted_index, Status::OK());
|
||||
if (!_inverted_index_ctx || !_inverted_index_ctx->has_inverted_index) {
|
||||
return Status::OK();
|
||||
}
|
||||
SCOPED_RAW_TIMER(&_opts.stats->gin_index_filter_ns);
|
||||
|
||||
roaring::Roaring row_bitmap = range2roaring(_scan_range);
|
||||
|
|
@ -2409,7 +2479,7 @@ Status SegmentIterator::_apply_inverted_index() {
|
|||
}
|
||||
|
||||
for (const auto& [cid, pred_list] : _opts.pred_tree.get_immediate_column_predicate_map()) {
|
||||
InvertedIndexIterator* inverted_iter = _inverted_index_iterators[cid];
|
||||
InvertedIndexIterator* inverted_iter = _inverted_index_ctx->inverted_index_iterators[cid];
|
||||
if (inverted_iter == nullptr) {
|
||||
continue;
|
||||
}
|
||||
|
|
@ -2418,8 +2488,10 @@ Status SegmentIterator::_apply_inverted_index() {
|
|||
Status::InternalError(strings::Substitute("No fid can be mapped by cid $0", cid)));
|
||||
std::string column_name(_schema.field(it->second)->name());
|
||||
for (const ColumnPredicate* pred : pred_list) {
|
||||
if (_inverted_index_iterators[cid]->is_untokenized() || pred->type() == PredicateType::kExpr) {
|
||||
Status res = pred->seek_inverted_index(column_name, _inverted_index_iterators[cid], &row_bitmap);
|
||||
if (_inverted_index_ctx->inverted_index_iterators[cid]->is_untokenized() ||
|
||||
pred->type() == PredicateType::kExpr) {
|
||||
Status res = pred->seek_inverted_index(column_name, _inverted_index_ctx->inverted_index_iterators[cid],
|
||||
&row_bitmap);
|
||||
if (res.ok()) {
|
||||
erased_preds.emplace(pred);
|
||||
erased_pred_col_ids.emplace(cid);
|
||||
|
|
@ -2441,7 +2513,7 @@ Status SegmentIterator::_apply_inverted_index() {
|
|||
if (!new_cid_to_predicates.contains(cid)) {
|
||||
// predicate for pred->column_id() has been total erased by
|
||||
// inverted index filtering.These columns may can be pruned.
|
||||
_prune_cols_candidate_by_inverted_index.insert(cid);
|
||||
_inverted_index_ctx->prune_cols_candidate_by_inverted_index.insert(cid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -2668,10 +2740,8 @@ void SegmentIterator::close() {
|
|||
|
||||
_bitmap_index_evaluator.close();
|
||||
|
||||
for (auto* iter : _inverted_index_iterators) {
|
||||
if (iter != nullptr) {
|
||||
delete iter;
|
||||
}
|
||||
if (_inverted_index_ctx) {
|
||||
_inverted_index_ctx->cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue