[BugFix] Fix set_tablet_schema for partition_morsel_queue(split_morsel_queue) (backport #62034) (#62118)
Signed-off-by: zihe.liu <ziheliu1024@gmail.com> Co-authored-by: zihe.liu <ziheliu1024@gmail.com>
This commit is contained in:
parent
b68721abdc
commit
7fb868e211
|
|
@ -251,6 +251,9 @@ StatusOr<MorselPtr> BucketSequenceMorselQueue::try_get() {
|
|||
}
|
||||
ASSIGN_OR_RETURN(auto morsel, _morsel_queue->try_get());
|
||||
auto* m = down_cast<ScanMorsel*>(morsel.get());
|
||||
if (m == nullptr) {
|
||||
return nullptr;
|
||||
}
|
||||
DCHECK(m->has_owner_id());
|
||||
auto owner_id = m->owner_id();
|
||||
ASSIGN_OR_RETURN(int64_t next_owner_id, _peek_sequence_id());
|
||||
|
|
@ -309,10 +312,10 @@ void PhysicalSplitMorselQueue::set_key_ranges(const std::vector<std::unique_ptr<
|
|||
}
|
||||
}
|
||||
|
||||
void PhysicalSplitMorselQueue::set_key_ranges(TabletReaderParams::RangeStartOperation range_start_op,
|
||||
TabletReaderParams::RangeEndOperation range_end_op,
|
||||
std::vector<OlapTuple> range_start_key,
|
||||
std::vector<OlapTuple> range_end_key) {
|
||||
void PhysicalSplitMorselQueue::set_key_ranges(const TabletReaderParams::RangeStartOperation& range_start_op,
|
||||
const TabletReaderParams::RangeEndOperation& range_end_op,
|
||||
const std::vector<OlapTuple>& range_start_key,
|
||||
const std::vector<OlapTuple>& range_end_key) {
|
||||
_range_start_op = range_start_op;
|
||||
_range_end_op = range_end_op;
|
||||
_range_start_key = range_start_key;
|
||||
|
|
@ -575,10 +578,10 @@ void LogicalSplitMorselQueue::set_key_ranges(const std::vector<std::unique_ptr<O
|
|||
}
|
||||
}
|
||||
|
||||
void LogicalSplitMorselQueue::set_key_ranges(TabletReaderParams::RangeStartOperation range_start_op,
|
||||
TabletReaderParams::RangeEndOperation range_end_op,
|
||||
std::vector<OlapTuple> range_start_key,
|
||||
std::vector<OlapTuple> range_end_key) {
|
||||
void LogicalSplitMorselQueue::set_key_ranges(const TabletReaderParams::RangeStartOperation& range_start_op,
|
||||
const TabletReaderParams::RangeEndOperation& range_end_op,
|
||||
const std::vector<OlapTuple>& range_start_key,
|
||||
const std::vector<OlapTuple>& range_end_key) {
|
||||
_range_start_op = range_start_op;
|
||||
_range_end_op = range_end_op;
|
||||
_range_start_key = range_start_key;
|
||||
|
|
|
|||
|
|
@ -340,11 +340,15 @@ public:
|
|||
MorselQueue(Morsels&& morsels) : _morsels(std::move(morsels)), _num_morsels(_morsels.size()) {}
|
||||
virtual ~MorselQueue() = default;
|
||||
|
||||
// NOTE: some subclasses of MorselQueue nest another MorselQueue, such as BucketSequenceMorselQueue.
|
||||
// When adding a new virtual method, DO NOT forget to invoke it on the nested MorselQueue as well.
|
||||
|
||||
virtual std::vector<TInternalScanRange*> prepare_olap_scan_ranges() const;
|
||||
virtual void set_key_ranges(const std::vector<std::unique_ptr<OlapScanRange>>& key_ranges) {}
|
||||
virtual void set_key_ranges(TabletReaderParams::RangeStartOperation _range_start_op,
|
||||
TabletReaderParams::RangeEndOperation _range_end_op,
|
||||
std::vector<OlapTuple> _range_start_key, std::vector<OlapTuple> _range_end_key) {}
|
||||
virtual void set_key_ranges(const TabletReaderParams::RangeStartOperation& range_start_op,
|
||||
const TabletReaderParams::RangeEndOperation& range_end_op,
|
||||
const std::vector<OlapTuple>& range_start_key,
|
||||
const std::vector<OlapTuple>& range_end_key) {}
|
||||
virtual void set_tablets(const std::vector<BaseTabletSharedPtr>& tablets) { _tablets = tablets; }
|
||||
virtual void set_tablet_rowsets(const std::vector<std::vector<BaseRowsetSharedPtr>>& tablet_rowsets) {
|
||||
_tablet_rowsets = tablet_rowsets;
|
||||
|
|
@ -361,7 +365,7 @@ public:
|
|||
virtual StatusOr<bool> ready_for_next() const { return true; }
|
||||
virtual Status append_morsels(Morsels&& morsels);
|
||||
virtual Type type() const = 0;
|
||||
void set_tablet_schema(TabletSchemaCSPtr tablet_schema) {
|
||||
virtual void set_tablet_schema(const TabletSchemaCSPtr& tablet_schema) {
|
||||
DCHECK(tablet_schema != nullptr);
|
||||
_tablet_schema = tablet_schema;
|
||||
}
|
||||
|
|
@ -402,6 +406,13 @@ public:
|
|||
_morsel_queue->set_key_ranges(key_ranges);
|
||||
}
|
||||
|
||||
void set_key_ranges(const TabletReaderParams::RangeStartOperation& range_start_op,
|
||||
const TabletReaderParams::RangeEndOperation& range_end_op,
|
||||
const std::vector<OlapTuple>& range_start_key,
|
||||
const std::vector<OlapTuple>& range_end_key) override {
|
||||
_morsel_queue->set_key_ranges(range_start_op, range_end_op, range_start_key, range_end_key);
|
||||
}
|
||||
|
||||
void set_tablets(const std::vector<BaseTabletSharedPtr>& tablets) override { _morsel_queue->set_tablets(tablets); }
|
||||
|
||||
void set_tablet_rowsets(const std::vector<std::vector<BaseRowsetSharedPtr>>& tablet_rowsets) override {
|
||||
|
|
@ -422,6 +433,11 @@ public:
|
|||
Status append_morsels(Morsels&& morsels) override { return _morsel_queue->append_morsels(std::move(morsels)); }
|
||||
Type type() const override { return BUCKET_SEQUENCE; }
|
||||
|
||||
void set_tablet_schema(const TabletSchemaCSPtr& tablet_schema) override {
|
||||
MorselQueue::set_tablet_schema(tablet_schema);
|
||||
_morsel_queue->set_tablet_schema(tablet_schema);
|
||||
}
|
||||
|
||||
private:
|
||||
StatusOr<int64_t> _peek_sequence_id() const;
|
||||
mutable std::mutex _mutex;
|
||||
|
|
@ -470,9 +486,10 @@ public:
|
|||
~PhysicalSplitMorselQueue() override = default;
|
||||
|
||||
void set_key_ranges(const std::vector<std::unique_ptr<OlapScanRange>>& key_ranges) override;
|
||||
void set_key_ranges(TabletReaderParams::RangeStartOperation _range_start_op,
|
||||
TabletReaderParams::RangeEndOperation _range_end_op, std::vector<OlapTuple> _range_start_key,
|
||||
std::vector<OlapTuple> _range_end_key) override;
|
||||
void set_key_ranges(const TabletReaderParams::RangeStartOperation& range_start_op,
|
||||
const TabletReaderParams::RangeEndOperation& range_end_op,
|
||||
const std::vector<OlapTuple>& range_start_key,
|
||||
const std::vector<OlapTuple>& range_end_key) override;
|
||||
bool empty() const override { return _unget_morsel == nullptr && _tablet_idx >= _tablets.size(); }
|
||||
StatusOr<MorselPtr> try_get() override;
|
||||
|
||||
|
|
@ -527,9 +544,10 @@ public:
|
|||
~LogicalSplitMorselQueue() override = default;
|
||||
|
||||
void set_key_ranges(const std::vector<std::unique_ptr<OlapScanRange>>& key_ranges) override;
|
||||
void set_key_ranges(TabletReaderParams::RangeStartOperation range_start_op,
|
||||
TabletReaderParams::RangeEndOperation range_end_op, std::vector<OlapTuple> range_start_key,
|
||||
std::vector<OlapTuple> range_end_key) override;
|
||||
void set_key_ranges(const TabletReaderParams::RangeStartOperation& range_start_op,
|
||||
const TabletReaderParams::RangeEndOperation& range_end_op,
|
||||
const std::vector<OlapTuple>& range_start_key,
|
||||
const std::vector<OlapTuple>& range_end_key) override;
|
||||
bool empty() const override { return _unget_morsel == nullptr && _tablet_idx >= _tablets.size(); }
|
||||
StatusOr<MorselPtr> try_get() override;
|
||||
|
||||
|
|
|
|||
|
|
@ -83,9 +83,7 @@ StatusOr<ChunkPtr> OlapScanPrepareOperator::pull_chunk(RuntimeState* state) {
|
|||
}
|
||||
_morsel_queue->set_tablet_rowsets(std::move(tablet_rowsets));
|
||||
|
||||
if ((_morsel_queue->type() == MorselQueue::Type::LOGICAL_SPLIT ||
|
||||
_morsel_queue->type() == MorselQueue::Type::PHYSICAL_SPLIT) &&
|
||||
!tablets.empty()) {
|
||||
if (!tablets.empty()) {
|
||||
_morsel_queue->set_tablet_schema(tablets[0]->tablet_schema());
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,12 @@ select /*+ SET_VAR(enable_per_bucket_optimize=false,enable_query_cache=false) */
|
|||
2 5
|
||||
3 4
|
||||
-- !result
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=true,enable_query_cache=false,tablet_internal_parallel_mode='force_split',pipeline_dop=2) */c2, count() from t0 group by c2 order by c2;
|
||||
-- result:
|
||||
1 5
|
||||
2 5
|
||||
3 4
|
||||
-- !result
|
||||
CREATE TABLE t1 (
|
||||
c1 int,
|
||||
c2 int
|
||||
|
|
@ -51,6 +57,12 @@ select /*+ SET_VAR(enable_per_bucket_optimize=false,enable_query_cache=false) */
|
|||
2 6
|
||||
3 4
|
||||
-- !result
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=true,enable_query_cache=false,tablet_internal_parallel_mode='force_split',pipeline_dop=2) */c2, count() from t1 group by c2 order by c2;
|
||||
-- result:
|
||||
1 5
|
||||
2 6
|
||||
3 4
|
||||
-- !result
|
||||
CREATE TABLE t2 (
|
||||
c1 int,
|
||||
c2 int
|
||||
|
|
@ -77,6 +89,12 @@ select /*+ SET_VAR(enable_per_bucket_optimize=false,enable_query_cache=false) */
|
|||
2 6
|
||||
3 4
|
||||
-- !result
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=true,enable_query_cache=false,tablet_internal_parallel_mode='force_split',pipeline_dop=2) */c2, count() from t2 group by c2 order by c2;
|
||||
-- result:
|
||||
1 5
|
||||
2 6
|
||||
3 4
|
||||
-- !result
|
||||
CREATE TABLE t3 (
|
||||
c1 int,
|
||||
c2 int
|
||||
|
|
@ -105,6 +123,13 @@ select /*+ SET_VAR(enable_per_bucket_optimize=false,enable_query_cache=false) */
|
|||
3 4
|
||||
11 1
|
||||
-- !result
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=true,enable_query_cache=false,tablet_internal_parallel_mode='force_split',pipeline_dop=2) */c2, count() from t3 group by c2 order by c2;
|
||||
-- result:
|
||||
1 5
|
||||
2 6
|
||||
3 4
|
||||
11 1
|
||||
-- !result
|
||||
CREATE TABLE t4 (
|
||||
c1 int, c2 int
|
||||
) DUPLICATE KEY(c1, c2)
|
||||
|
|
@ -126,4 +151,10 @@ select /*+ SET_VAR(enable_per_bucket_optimize=false,enable_query_cache=false) */
|
|||
1 5
|
||||
2 6
|
||||
3 4
|
||||
-- !result
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=true,enable_query_cache=false,tablet_internal_parallel_mode='force_split',pipeline_dop=2) */c2, count() from t4 group by c2 order by c2;
|
||||
-- result:
|
||||
1 5
|
||||
2 6
|
||||
3 4
|
||||
-- !result
|
||||
|
|
@ -12,6 +12,7 @@ PROPERTIES( "replication_num"="1", "colocate_with"="5a5fd327dsdb_2806" );
|
|||
insert into t0 (c1, c2) values (1, 1), (2, 1), (3, 1), (4, 1), (11, 1), (11, 2), (1, 2), (2, 2), (3, 2), (11, 2), (12, 2), (1, 3), (2, 3), (3, 3), (11, 3);
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=true,enable_query_cache=false) */c2, count() from t0 group by c2 order by c2;
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=false,enable_query_cache=false) */c2, count() from t0 group by c2 order by c2;
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=true,enable_query_cache=false,tablet_internal_parallel_mode='force_split',pipeline_dop=2) */c2, count() from t0 group by c2 order by c2;
|
||||
|
||||
CREATE TABLE t1 (
|
||||
c1 int,
|
||||
|
|
@ -25,6 +26,7 @@ PROPERTIES( "replication_num"="1", "colocate_with"="5a5fd327dsdb_2806" );
|
|||
insert into t1 (c1, c2) values (1, 1), (2, 1), (3, 1), (4, 1), (11, 1), (11, 2), (1, 2), (2, 2), (3, 2), (11, 2), (12, 2), (1, 3), (2, 3), (3, 3), (11, 3);
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=true,enable_query_cache=false) */c2, count() from t1 group by c2 order by c2;
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=false,enable_query_cache=false) */c2, count() from t1 group by c2 order by c2;
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=true,enable_query_cache=false,tablet_internal_parallel_mode='force_split',pipeline_dop=2) */c2, count() from t1 group by c2 order by c2;
|
||||
|
||||
|
||||
CREATE TABLE t2 (
|
||||
|
|
@ -40,6 +42,7 @@ PROPERTIES( "replication_num"="1", "colocate_with"="5a5fd327dsdb_2806" );
|
|||
insert into t2 (c1, c2)values (1, 1), (2, 1), (3, 1), (4, 1), (11, 1), (11, 2), (1, 2), (2, 2), (3, 2), (11, 2), (12, 2), (1, 3), (2, 3), (3, 3), (11, 3);
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=true,enable_query_cache=false) */c2, count() from t2 group by c2 order by c2;
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=false,enable_query_cache=false) */c2, count() from t2 group by c2 order by c2;
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=true,enable_query_cache=false,tablet_internal_parallel_mode='force_split',pipeline_dop=2) */c2, count() from t2 group by c2 order by c2;
|
||||
|
||||
|
||||
CREATE TABLE t3 (
|
||||
|
|
@ -54,6 +57,7 @@ PROPERTIES( "replication_num"="1", "colocate_with"="5a5fd327dsdb_2806" );
|
|||
insert into t3 (c1, c2)values (1, 1), (2, 1), (3, 1), (4, 1), (11, 1), (11, 2), (1, 2), (2, 2), (3, 2), (11, 2), (12, 2), (1, 3), (2, 3), (3, 3), (11, 3),(1,11);
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=true,enable_query_cache=false) */c2, count() from t3 group by c2 order by c2;
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=false,enable_query_cache=false) */c2, count() from t3 group by c2 order by c2;
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=true,enable_query_cache=false,tablet_internal_parallel_mode='force_split',pipeline_dop=2) */c2, count() from t3 group by c2 order by c2;
|
||||
|
||||
|
||||
CREATE TABLE t4 (
|
||||
|
|
@ -64,3 +68,4 @@ PROPERTIES( "replication_num"="1", "colocate_with"="5a5fd327dsdb_2806" );
|
|||
insert into t4 (c1, c2)values (1, 1), (2, 1), (3, 1), (4, 1), (11, 1), (11, 2), (1, 2), (2, 2), (3, 2), (11, 2), (12, 2), (1, 3), (2, 3), (3, 3), (11, 3);
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=true,enable_query_cache=false) */c2, count() from t4 group by c2 order by c2;
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=false,enable_query_cache=false) */c2, count() from t4 group by c2 order by c2;
|
||||
select /*+ SET_VAR(enable_per_bucket_optimize=true,enable_query_cache=false,tablet_internal_parallel_mode='force_split',pipeline_dop=2) */c2, count() from t4 group by c2 order by c2;
|
||||
|
|
|
|||
Loading…
Reference in New Issue