[BugFix] fix iceberg manifest cache npe in data race condition (backport #63043) (#63051)

Signed-off-by: yan zhang <dirtysalt1987@gmail.com>
Co-authored-by: yan zhang <dirtysalt1987@gmail.com>
This commit is contained in:
mergify[bot] 2025-09-12 06:09:33 +00:00 committed by GitHub
parent fcb46895e3
commit 70280d3da6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 8 additions and 8 deletions

View File

@ -17,6 +17,7 @@ package org.apache.iceberg;
import com.github.benmanes.caffeine.cache.Cache;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.starrocks.common.Pair;
import com.starrocks.common.profile.Tracers;
import com.starrocks.connector.PlanMode;
import com.starrocks.connector.exception.StarRocksConnectorException;
@ -256,12 +257,12 @@ public class StarRocksIcebergTableScan
}
private CloseableIterable<FileScanTask> planTaskWithCache(List<ManifestFile> dataManifests) {
List<ManifestFile> dataManifestWithCache = new ArrayList<>();
List<Pair<ManifestFile, Set<DataFile>>> dataManifestWithCache = new ArrayList<>();
List<ManifestFile> dataManifestWithoutCache = new ArrayList<>();
for (ManifestFile manifestFile : dataManifests) {
Set<DataFile> dataFiles = dataFileCache.getIfPresent(manifestFile.path());
if (dataFiles != null && !dataFiles.isEmpty()) {
dataManifestWithCache.add(manifestFile);
dataManifestWithCache.add(new Pair(manifestFile, dataFiles));
scanMetrics().scannedDataManifests().increment();
} else {
if (!onlyReadCache) {
@ -284,20 +285,19 @@ public class StarRocksIcebergTableScan
}
}
private CloseableIterable<FileScanTask> filterDataFiles(ManifestFile manifestFile) {
CloseableIterable<DataFile> matchedDataFiles = CloseableIterable.withNoopClose(
dataFileCache.getIfPresent(manifestFile.path()));
private CloseableIterable<FileScanTask> filterDataFiles(Pair<ManifestFile, Set<DataFile>> manifestFile) {
CloseableIterable<DataFile> matchedDataFiles = CloseableIterable.withNoopClose(manifestFile.second);
if (filter() != Expressions.alwaysTrue()) {
matchedDataFiles = CloseableIterable.filter(
matchedDataFiles = CloseableIterable.filter(
scanMetrics().skippedDataFiles(),
CloseableIterable.withNoopClose(dataFileCache.getIfPresent(manifestFile.path())),
CloseableIterable.withNoopClose(manifestFile.second),
file -> partitionEvaluatorCache.get(file.specId()).eval(file.partition()));
}
if (dataFileCacheWithMetrics ||
(!tableSchema().identifierFieldIds().isEmpty() && enableCacheDataFileIdentifierColumnMetrics)) {
matchedDataFiles = CloseableIterable.filter(
matchedDataFiles = CloseableIterable.filter(
scanMetrics().skippedDataFiles(),
matchedDataFiles,
file -> inclusiveMetricsEvaluatorCache.get(file.specId()).eval(file));