[Enhancement] share file io in iceberg metadata scanner (#61012)

Signed-off-by: yan zhang <dirtysalt1987@gmail.com>
This commit is contained in:
yan zhang 2025-07-25 10:58:27 +08:00 committed by GitHub
parent 8a4ef5c2f1
commit f4626f289f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 73 additions and 7 deletions

View File

@ -61,11 +61,13 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
public class SerializableTable implements Table, Serializable, HasTableOperations {
private final String name;
private final String location;
private final UUID uuid;
private final String metadataFileLocation;
private final Map<String, String> properties;
private final String schemaAsJson;
@ -85,6 +87,7 @@ public class SerializableTable implements Table, Serializable, HasTableOperation
public SerializableTable(Table table, FileIO fileIO) {
this.name = table.name();
this.location = table.location();
this.uuid = table.uuid();
this.metadataFileLocation = metadataFileLocation(table);
this.properties = SerializableMap.copyOf(table.properties());
this.schemaAsJson = SchemaParser.toJson(table.schema());
@ -154,6 +157,11 @@ public class SerializableTable implements Table, Serializable, HasTableOperation
return properties;
}
@Override
public UUID uuid() {
return uuid;
}
@Override
public Schema schema() {
if (lazySchema == null) {

View File

@ -20,6 +20,7 @@ import com.starrocks.jni.connector.ScannerHelper;
import com.starrocks.jni.connector.SelectedFields;
import com.starrocks.utils.loader.ThreadContextClassLoader;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.FileIO;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -41,8 +42,13 @@ public abstract class AbstractIcebergMetadataScanner extends ConnectorScanner {
private final int fetchSize;
protected final ClassLoader classLoader;
protected Table table;
protected FileIO fileIO;
protected String timezone;
// expire after 600 seconds long enough for most credential expiration
// max 1000 entries large enough for most use cases
private static final TableFileIOCache TABLE_FILE_IO_CACHE = new TableFileIOCache(600, 1000);
public AbstractIcebergMetadataScanner(int fetchSize, Map<String, String> params) {
this.fetchSize = fetchSize;
this.requiredFields = params.get("required_fields").split(",");
@ -58,6 +64,7 @@ public abstract class AbstractIcebergMetadataScanner extends ConnectorScanner {
public void open() throws IOException {
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
this.table = deserializeFromBase64(serializedTable);
this.fileIO = TABLE_FILE_IO_CACHE.get(table);
parseRequiredTypes();
initOffHeapTableWriter(requiredTypes, requiredFields, fetchSize);
doOpen();

View File

@ -94,13 +94,13 @@ public class IcebergFilesTableScanner extends AbstractIcebergMetadataScanner {
List<String> scanColumns;
if (manifestFile.content() == ManifestContent.DATA) {
scanColumns = loadColumnStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS;
reader = ManifestFiles.read(manifestFile, table.io(), specs)
reader = ManifestFiles.read(manifestFile, fileIO, specs)
.select(scanColumns)
.caseSensitive(false)
.iterator();
} else {
scanColumns = loadColumnStats ? DELETE_SCAN_WITH_STATS_COLUMNS : DELETE_SCAN_COLUMNS;
reader = ManifestFiles.readDeleteManifest(manifestFile, table.io(), specs)
reader = ManifestFiles.readDeleteManifest(manifestFile, fileIO, specs)
.select(scanColumns)
.caseSensitive(false)
.iterator();

View File

@ -71,7 +71,7 @@ public class IcebergManifestsTableScanner extends AbstractIcebergMetadataScanner
@Override
protected void initReader() {
if (table.currentSnapshot() != null) {
reader = table.currentSnapshot().allManifests(table.io()).iterator();
reader = table.currentSnapshot().allManifests(fileIO).iterator();
}
}

View File

@ -146,14 +146,14 @@ public class IcebergMetadataScanner extends AbstractIcebergMetadataScanner {
List<String> scanColumns;
if (manifestFile.content() == ManifestContent.DATA) {
scanColumns = loadColumnStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS;
reader = ManifestFiles.read(manifestFile, table.io(), specs)
reader = ManifestFiles.read(manifestFile, fileIO, specs)
.select(scanColumns)
.filterRows(predicate)
.caseSensitive(false)
.iterator();
} else {
scanColumns = loadColumnStats ? DELETE_SCAN_WITH_STATS_COLUMNS : DELETE_SCAN_COLUMNS;
reader = ManifestFiles.readDeleteManifest(manifestFile, table.io(), specs)
reader = ManifestFiles.readDeleteManifest(manifestFile, fileIO, specs)
.select(scanColumns)
.filterRows(predicate)
.caseSensitive(false)

View File

@ -102,12 +102,12 @@ public class IcebergPartitionsTableScanner extends AbstractIcebergMetadataScanne
protected void initReader() {
Map<Integer, PartitionSpec> specs = table.specs();
if (manifestFile.content() == ManifestContent.DATA) {
reader = ManifestFiles.read(manifestFile, table.io(), specs)
reader = ManifestFiles.read(manifestFile, fileIO, specs)
.select(SCAN_COLUMNS)
.caseSensitive(false)
.iterator();
} else {
reader = ManifestFiles.readDeleteManifest(manifestFile, table.io(), specs)
reader = ManifestFiles.readDeleteManifest(manifestFile, fileIO, specs)
.select(SCAN_COLUMNS)
.caseSensitive(false)
.iterator();

View File

@ -0,0 +1,51 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.connector.iceberg;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.FileIO;
import java.util.concurrent.TimeUnit;
public class TableFileIOCache {
private final Cache<String, FileIO> cache;
public TableFileIOCache(long expireSeconds, long capacity) {
// note: reason why not use expireAfterAccess is that
// fileio could be bound to credentials, and credentials could be expired.
// some fileio implementations may not handle this well
// so for safety, we use expireAfterWrite
this.cache = CacheBuilder.newBuilder()
.expireAfterWrite(expireSeconds, TimeUnit.SECONDS)
.maximumSize(capacity)
.build();
}
public FileIO get(Table table) {
String cacheKey = generateCacheKey(table);
FileIO fileIO = cache.getIfPresent(cacheKey);
if (fileIO == null) {
fileIO = table.io();
cache.put(cacheKey, fileIO);
}
return fileIO;
}
private static String generateCacheKey(Table table) {
return table.name() + "_" + table.uuid();
}
}