ATLAS-4881: minor improvements in notification processing

This commit is contained in:
Madhan Neethiraj 2024-06-18 00:18:19 -07:00
parent e9a36e75bc
commit 2369e7c8e0
6 changed files with 56 additions and 18 deletions

View File

@ -25,6 +25,7 @@ import java.util.Set;
public class AtlasPerfMetrics {
private final Map<String, Metric> metrics = new LinkedHashMap<>();
private long startTimeMs = -1;
public MetricRecorder getMetricRecorder(String name) {
@ -36,6 +37,10 @@ public class AtlasPerfMetrics {
final String name = recorder.name;
final long timeTaken = recorder.getElapsedTime();
if (startTimeMs == -1) {
startTimeMs = System.currentTimeMillis();
}
Metric metric = metrics.get(name);
if (metric == null) {
@ -51,6 +56,8 @@ public class AtlasPerfMetrics {
public void clear() {
metrics.clear();
startTimeMs = -1;
}
public boolean isEmpty() {
@ -76,7 +83,7 @@ public class AtlasPerfMetrics {
sb.append("\"").append(metric.getName()).append("\":{\"count\":").append(metric.getInvocations()).append(",\"timeTaken\":").append(metric.getTotalTimeMSecs()).append("},");
}
sb.setLength(sb.length() - 1); // remove last ","
sb.append("\"totalTime\":").append(System.currentTimeMillis() - startTimeMs);
}
sb.append("}");

View File

@ -84,6 +84,7 @@ import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import javax.ws.rs.core.Response;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
@ -932,18 +933,34 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
RequestContext.get().resetEntityGuidUpdates();
exceptionClassName = e.getClass().getSimpleName();
if (numRetries == (maxRetries - 1)) {
String strMessage = AbstractNotification.getMessageJson(message);
// don't retry in following conditions:
// 1. number of retry attempts reached configured count
// 2. notification processing failed due to invalid data (non-existing type, entity, ..)
boolean maxRetriesReached = numRetries == (maxRetries - 1);
AtlasErrorCode errorCode = (e instanceof AtlasBaseException) ? ((AtlasBaseException) e).getAtlasErrorCode() : null;
boolean unrecoverableFailure = errorCode != null && (Response.Status.NOT_FOUND.equals(errorCode.getHttpCode()) || Response.Status.BAD_REQUEST.equals(errorCode.getHttpCode()));
LOG.warn("Max retries exceeded for message {}", strMessage, e);
if (maxRetriesReached || unrecoverableFailure) {
try {
String strMessage = AbstractNotification.getMessageJson(message);
stats.isFailedMsg = true;
if (unrecoverableFailure) {
LOG.warn("Unrecoverable failure while processing message {}", strMessage, e);
} else {
LOG.warn("Max retries exceeded for message {}", strMessage, e);
}
failedMessages.add(strMessage);
stats.isFailedMsg = true;
if (failedMessages.size() >= failedMsgCacheSize) {
recordFailedMessages();
failedMessages.add(strMessage);
if (failedMessages.size() >= failedMsgCacheSize) {
recordFailedMessages();
}
} catch (Throwable t) {
LOG.warn("error while recording failed message: type={}, topic={}, partition={}, offset={}", message.getType(), kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), t);
}
return;
} else if (e instanceof org.apache.atlas.repository.graphdb.AtlasSchemaViolationException) {
LOG.warn("{}: Continuing: {}", exceptionClassName, e.getMessage());
@ -978,10 +995,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
metricsUtil.onNotificationProcessingComplete(kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), stats);
if (stats.timeTakenMs > largeMessageProcessingTimeThresholdMs) {
String strMessage = AbstractNotification.getMessageJson(message);
try {
String strMessage = AbstractNotification.getMessageJson(message);
LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset());
LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage);
LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset());
LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage);
} catch (Throwable t) {
LOG.warn("error while recording large message: msgProcessingTime={}, type={}, topic={}, partition={}, offset={}", stats.timeTakenMs, message.getType(), kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), t);
}
}
if (auditLog != null) {

View File

@ -47,6 +47,9 @@ public class HiveDbDDLPreprocessor extends EntityPreprocessor {
}
setObjectIdWithGuid(dbObject, guid);
LOG.info("{}: Preprocessor: Updated: {} -> {}", getTypeName(), qualifiedName, guid);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Preprocessor: Updated: {} -> {}", getTypeName(), qualifiedName, guid);
}
}
}

View File

@ -173,7 +173,9 @@ public class HivePreprocessor {
Object qualifiedName = entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
if (!Objects.equals(name, qualifiedName)) {
LOG.info("setting {}.name={}. topic-offset={}, partition={}", entity.getTypeName(), qualifiedName, context.getKafkaMessageOffset(), context.getKafkaPartition());
if (LOG.isDebugEnabled()) {
LOG.debug("setting {}.name={}. topic-offset={}, partition={}", entity.getTypeName(), qualifiedName, context.getKafkaMessageOffset(), context.getKafkaPartition());
}
entity.setAttribute(ATTRIBUTE_NAME, qualifiedName);
}

View File

@ -47,6 +47,9 @@ public class HiveTableDDLPreprocessor extends EntityPreprocessor {
}
setObjectIdWithGuid(tableObject, guid);
LOG.info("{}: Preprocessor: Updated: {} -> {}", getTypeName(), qualifiedName, guid);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Preprocessor: Updated: {} -> {}", getTypeName(), qualifiedName, guid);
}
}
}

View File

@ -365,10 +365,12 @@ public class PreprocessorContext {
}
}
if (firstEntity != null) {
LOG.info("moved {} referred-entities to end of entities-list (firstEntity:typeName={}, qualifiedName={}). topic-offset={}, partition={}", referredEntitiesToMove.size(), firstEntity.getTypeName(), EntityPreprocessor.getQualifiedName(firstEntity), kafkaMessage.getOffset(), kafkaMessage.getPartition());
} else {
LOG.info("moved {} referred-entities to entities-list. topic-offset={}, partition={}", referredEntitiesToMove.size(), kafkaMessage.getOffset(), kafkaMessage.getPartition());
if (LOG.isDebugEnabled()) {
if (firstEntity != null) {
LOG.debug("moved {} referred-entities to end of entities-list (firstEntity:typeName={}, qualifiedName={}). topic-offset={}, partition={}", referredEntitiesToMove.size(), firstEntity.getTypeName(), EntityPreprocessor.getQualifiedName(firstEntity), kafkaMessage.getOffset(), kafkaMessage.getPartition());
} else {
LOG.debug("moved {} referred-entities to entities-list. topic-offset={}, partition={}", referredEntitiesToMove.size(), kafkaMessage.getOffset(), kafkaMessage.getPartition());
}
}
referredEntitiesToMove.clear();