[BugFix] Fix csv header skip causing data loss in files() (#62719)

Signed-off-by: wyb <wybb86@gmail.com>
This commit is contained in:
wyb 2025-09-05 18:33:24 +08:00 committed by GitHub
parent b55a5247de
commit 353ee2e652
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 46 additions and 6 deletions

View File

@ -252,14 +252,13 @@ Status CSVScanner::_init_reader() {
_curr_reader = std::make_unique<ScannerCSVReader>(file, _state, _parse_options);
_curr_reader->set_counter(_counter);
if (_scan_range.ranges[_curr_file_index].size > 0 &&
_scan_range.ranges[_curr_file_index].format_type == TFileFormatType::FORMAT_CSV_PLAIN) {
if (range_desc.size > 0 && range_desc.format_type == TFileFormatType::FORMAT_CSV_PLAIN) {
// Does not set limit for compressed file.
_curr_reader->set_limit(_scan_range.ranges[_curr_file_index].size);
_curr_reader->set_limit(range_desc.size);
}
if (_scan_range.ranges[_curr_file_index].start_offset > 0) {
if (range_desc.start_offset > 0) {
// Skip the first record started from |start_offset|.
auto status = file->skip(_scan_range.ranges[_curr_file_index].start_offset);
auto status = file->skip(range_desc.start_offset);
if (status.is_time_out()) {
// open this file next time
--_curr_file_index;
@ -270,7 +269,8 @@ Status CSVScanner::_init_reader() {
RETURN_IF_ERROR(_curr_reader->next_record(&dummy));
}
if (_parse_options.skip_header) {
// only the first range needs to skip header
if (_parse_options.skip_header && range_desc.start_offset == 0) {
for (int64_t i = 0; i < _parse_options.skip_header; i++) {
CSVReader::Record dummy;
auto st = _curr_reader->next_record(&dummy);

View File

@ -792,6 +792,7 @@ TEST_P(CSVScannerTest, test_skip_header) {
std::vector<TBrokerRangeDesc> ranges;
TBrokerRangeDesc range;
range.__set_start_offset(0);
range.__set_num_of_columns_from_file(2);
range.__set_path("./be/test/exec/test_data/csv_scanner/csv_file15");
ranges.push_back(range);
@ -818,6 +819,45 @@ TEST_P(CSVScannerTest, test_skip_header) {
EXPECT_EQ(0, chunk->get(4)[1].get_int32());
}
TEST_P(CSVScannerTest, test_skip_header_start_offset_not_0) {
std::vector<TypeDescriptor> types{TypeDescriptor(TYPE_INT), TypeDescriptor(TYPE_INT)};
std::vector<TBrokerRangeDesc> ranges;
TBrokerRangeDesc range;
// the first line is not included
range.__set_start_offset(1);
range.__set_num_of_columns_from_file(2);
range.__set_path("./be/test/exec/test_data/csv_scanner/csv_file15");
ranges.push_back(range);
auto scanner = create_csv_scanner(types, ranges, "\n", "|", 4);
Status st = scanner->open();
ASSERT_TRUE(st.ok()) << st.to_string();
scanner->use_v2(_use_v2);
ChunkPtr chunk = scanner->get_next().value();
EXPECT_EQ(8, chunk->num_rows());
EXPECT_EQ(33, chunk->get(0)[0].get_int32());
EXPECT_EQ(55, chunk->get(1)[0].get_int32());
EXPECT_EQ(77, chunk->get(2)[0].get_int32());
EXPECT_EQ(1, chunk->get(3)[0].get_int32());
EXPECT_EQ(3, chunk->get(4)[0].get_int32());
EXPECT_EQ(5, chunk->get(5)[0].get_int32());
EXPECT_EQ(7, chunk->get(6)[0].get_int32());
EXPECT_EQ(9, chunk->get(7)[0].get_int32());
EXPECT_EQ(44, chunk->get(0)[1].get_int32());
EXPECT_EQ(66, chunk->get(1)[1].get_int32());
EXPECT_EQ(88, chunk->get(2)[1].get_int32());
EXPECT_EQ(2, chunk->get(3)[1].get_int32());
EXPECT_EQ(4, chunk->get(4)[1].get_int32());
EXPECT_EQ(6, chunk->get(5)[1].get_int32());
EXPECT_EQ(8, chunk->get(6)[1].get_int32());
EXPECT_EQ(0, chunk->get(7)[1].get_int32());
}
TEST_P(CSVScannerTrimSpaceTest, test_trim_space) {
std::vector<TypeDescriptor> types{TypeDescriptor(TYPE_INT), TypeDescriptor(TYPE_VARCHAR)};