[BugFix] fix iceberg table scan exception during scan range deploy (backport #62994) (#63018)

Signed-off-by: yan zhang <dirtysalt1987@gmail.com>
Co-authored-by: yan zhang <dirtysalt1987@gmail.com>
This commit is contained in:
mergify[bot] 2025-09-11 11:05:18 +00:00 committed by GitHub
parent 2ac795296e
commit b6c2478a24
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 30 additions and 16 deletions

View File

@ -76,10 +76,13 @@ import java.util.Map;
public class HDFSBackendSelector implements BackendSelector {
public static final Logger LOG = LogManager.getLogger(HDFSBackendSelector.class);
// be -> assigned scans
Map<ComputeNode, Long> assignedScansPerComputeNode = Maps.newHashMap();
// be -> re-balance bytes
Map<ComputeNode, Long> reBalanceBytesPerComputeNode = Maps.newHashMap();
// be -> assigned bytes
Map<ComputeNode, Long> assignedBytesPerComputeNode = Maps.newHashMap();
// be -> re-balanced bytes
Map<ComputeNode, Long> reBalancedBytesPerComputeNode = Maps.newHashMap();
// be -> assigned scan ranges
Map<ComputeNode, Long> assignedScanRangesPerComputeNode = Maps.newHashMap();
private final ScanNode scanNode;
private final List<TScanRangeLocations> locations;
private final FragmentScanRangeAssignment assignment;
@ -211,7 +214,7 @@ public class HDFSBackendSelector implements BackendSelector {
SessionVariable sessionVariable = connectContext.getSessionVariable();
boolean forceReBalance = sessionVariable.getHdfsBackendSelectorForceRebalance();
boolean enableDataCache = sessionVariable.isEnableScanDataCache();
// If force-rebalancing is not specified and cache is used, skip the rebalancing directly.
// If force re-balancing is not specified and cache is used, skip the rebalancing directly.
if (!forceReBalance && enableDataCache) {
return backends.get(0);
}
@ -219,7 +222,7 @@ public class HDFSBackendSelector implements BackendSelector {
ComputeNode node = null;
long addedScans = scanRangeLocations.scan_range.hdfs_scan_range.length;
for (ComputeNode backend : backends) {
long assignedScanRanges = assignedScansPerComputeNode.get(backend);
long assignedScanRanges = assignedBytesPerComputeNode.get(backend);
if (assignedScanRanges + addedScans < avgNodeScanRangeBytes * kMaxImbalanceRatio) {
node = backend;
break;
@ -326,8 +329,9 @@ public class HDFSBackendSelector implements BackendSelector {
long totalSize = computeTotalSize();
long avgNodeScanRangeBytes = totalSize / Math.max(workerProvider.getAllWorkers().size(), 1) + 1;
for (ComputeNode computeNode : workerProvider.getAllWorkers()) {
assignedScansPerComputeNode.put(computeNode, 0L);
reBalanceBytesPerComputeNode.put(computeNode, 0L);
assignedBytesPerComputeNode.put(computeNode, 0L);
assignedScanRangesPerComputeNode.put(computeNode, 0L);
reBalancedBytesPerComputeNode.put(computeNode, 0L);
}
// schedule scan ranges to co-located backends.
@ -341,7 +345,7 @@ public class HDFSBackendSelector implements BackendSelector {
}
// use consistent hashing to schedule remote scan ranges
HashRing hashRing = makeHashRing(assignedScansPerComputeNode.keySet());
HashRing hashRing = makeHashRing(assignedBytesPerComputeNode.keySet());
HashRing candidateHashRing = null;
if (candidateWorkerProvider != null) {
Collection<ComputeNode> candidateWorkers = candidateWorkerProvider.getAllWorkers();
@ -365,7 +369,7 @@ public class HDFSBackendSelector implements BackendSelector {
ComputeNode candidateNode = null;
if (candidateHashRing != null) {
List<ComputeNode> candidateBackends = candidateHashRing.get(scanRangeLocations, kCandidateNumber);
// if datacache is enable, skip rebalance because it make the cache position undefined.
// if data cache is enabled, skip re-balancing because it makes the cache position undefined.
candidateNode = candidateBackends.get(0);
}
recordScanRangeAssignment(node, candidateNode, backends, scanRangeLocations);
@ -381,11 +385,11 @@ public class HDFSBackendSelector implements BackendSelector {
// update statistic
long addedScans = scanRangeLocations.scan_range.hdfs_scan_range.length;
assignedScansPerComputeNode.put(worker, assignedScansPerComputeNode.get(worker) + addedScans);
assignedBytesPerComputeNode.put(worker, assignedBytesPerComputeNode.get(worker) + addedScans);
// the fist item in backends will be assigned if there is no re-balance, we compute re-balance bytes
// if the worker is not the first item in backends.
if (worker != backends.get(0)) {
reBalanceBytesPerComputeNode.put(worker, reBalanceBytesPerComputeNode.get(worker) + addedScans);
reBalancedBytesPerComputeNode.put(worker, reBalancedBytesPerComputeNode.get(worker) + addedScans);
}
// add scan range params
@ -396,22 +400,31 @@ public class HDFSBackendSelector implements BackendSelector {
String.format("%s:%d", candidateWorker.getHost(), candidateWorker.getBrpcPort()));
}
assignment.put(worker.getId(), scanNode.getId().asInt(), scanRangeParams);
assignedScanRangesPerComputeNode.put(worker,
assignedScanRangesPerComputeNode.get(worker) + 1);
}
private void recordScanRangeStatistic() {
// record scan range size for each backend
for (Map.Entry<ComputeNode, Long> entry : assignedScansPerComputeNode.entrySet()) {
for (Map.Entry<ComputeNode, Long> entry : assignedBytesPerComputeNode.entrySet()) {
String host = entry.getKey().getAddress().hostname.replace('.', '_');
long value = entry.getValue();
String key = String.format("Placement.%s.assign.%s", scanNode.getTableName(), host);
Tracers.count(Tracers.Module.EXTERNAL, key, value);
}
// record re-balance bytes for each backend
for (Map.Entry<ComputeNode, Long> entry : reBalanceBytesPerComputeNode.entrySet()) {
for (Map.Entry<ComputeNode, Long> entry : reBalancedBytesPerComputeNode.entrySet()) {
String host = entry.getKey().getAddress().hostname.replace('.', '_');
long value = entry.getValue();
String key = String.format("Placement.%s.balance.%s", scanNode.getTableName(), host);
Tracers.count(Tracers.Module.EXTERNAL, key, value);
}
// record split number for each backend
for (Map.Entry<ComputeNode, Long> entry : assignedScanRangesPerComputeNode.entrySet()) {
String host = entry.getKey().getAddress().hostname.replace('.', '_');
long value = entry.getValue();
String key = String.format("Placement.%s.split.%s", scanNode.getTableName(), host);
Tracers.count(Tracers.Module.EXTERNAL, key, value);
}
}
}

View File

@ -88,7 +88,8 @@ public class AllAtOnceExecutionSchedule implements ExecutionSchedule {
for (DeployState state : states) {
deployer.deployFragments(state);
}
} catch (StarRocksException | RpcException e) {
} catch (Exception e) {
// there could be a lot of reasons to fail, just cancel the query
LOG.warn("Failed to assign incremental scan ranges to deploy states", e);
coordinator.cancel(PPlanFragmentCancelReason.INTERNAL_ERROR, e.getMessage());
throw new RuntimeException(e);

View File

@ -225,7 +225,7 @@ public class HDFSBackendSelectorTest {
variance = 0.4 / 100 * scanRangeNumber * scanRangeSize;
double actual = 0;
for (Map.Entry<ComputeNode, Long> entry : selector.reBalanceBytesPerComputeNode.entrySet()) {
for (Map.Entry<ComputeNode, Long> entry : selector.reBalancedBytesPerComputeNode.entrySet()) {
System.out.printf("%s -> %d bytes re-balance\n", entry.getKey(), entry.getValue());
actual = actual + entry.getValue();
}