ATLAS-4830: Make ignore patterns generic for all the hooks

Signed-off-by: Radhika Kundam <radhika.kundam@gmail.com>
This commit is contained in:
Amruth S 2024-03-15 17:40:59 +05:30 committed by Radhika Kundam
parent 3879f46018
commit f3ef15afc9
5 changed files with 371 additions and 17 deletions

View File

@ -336,21 +336,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return ret;
}
private boolean isMatch(String name, List<Pattern> patterns) {
boolean ret = false;
for (Pattern p : patterns) {
if (p.matcher(name).matches()) {
ret = true;
break;
}
}
return ret;
}
public static HiveHookObjectNamesCache getKnownObjects() {
if (knownObjects != null && knownObjects.isCacheExpired()) {
LOG.info("HiveHook.run(): purging cached databaseNames ({}) and tableNames ({})", knownObjects.getCachedDbCount(), knownObjects.getCachedTableCount());

View File

@ -24,10 +24,12 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.utils.AtlasConfigurationUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.atlas.model.notification.MessageSource;
import org.apache.commons.lang.StringUtils;
@ -38,11 +40,15 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
/**
@ -65,6 +71,8 @@ public abstract class AtlasHook {
public static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String CONF_ATLAS_HOOK_MESSAGES_SORT_ENABLED = "atlas.hook.messages.sort.enabled";
public static final String ATLAS_HOOK_ENTITY_IGNORE_PATTERN = "atlas.hook.entity.ignore.pattern";
public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
protected static Configuration atlasProperties;
protected static NotificationInterface notificationInterface;
@ -79,6 +87,8 @@ public abstract class AtlasHook {
private static ExecutorService executor = null;
public static final boolean isRESTNotificationEnabled;
public static final boolean isHookMsgsSortEnabled;
private static final List<Pattern> entitiesToIgnore = new ArrayList<>();
private static boolean shouldPreprocess = false;
static {
@ -106,6 +116,23 @@ public abstract class AtlasHook {
notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
notificationInterface = NotificationProvider.get();
String[] patternsToIgnoreEntities = atlasProperties.getStringArray(ATLAS_HOOK_ENTITY_IGNORE_PATTERN);
if (patternsToIgnoreEntities != null) {
for (String pattern: patternsToIgnoreEntities) {
try {
entitiesToIgnore.add(Pattern.compile(pattern));
} catch (Throwable t) {
LOG.warn("failed to compile pattern {}", pattern, t);
LOG.warn("Ignoring invalid pattern in configuration {}: {}", ATLAS_HOOK_ENTITY_IGNORE_PATTERN, pattern);
}
}
LOG.info("{}={}", ATLAS_HOOK_ENTITY_IGNORE_PATTERN, entitiesToIgnore);
}
shouldPreprocess = CollectionUtils.isNotEmpty(entitiesToIgnore);
String currentUser = "";
try {
@ -163,6 +190,60 @@ public abstract class AtlasHook {
public abstract String getMessageSource();
protected static boolean isMatch(String qualifiedName, List<Pattern> patterns) {
return patterns.stream().anyMatch((Pattern pattern) -> pattern.matcher(qualifiedName).matches());
}
private static AtlasEntity.AtlasEntitiesWithExtInfo getAtlasEntitiesWithExtInfo(HookNotification hookNotification) {
AtlasEntity.AtlasEntitiesWithExtInfo entitiesWithExtInfo = null;
switch (hookNotification.getType()) {
case ENTITY_CREATE_V2:
entitiesWithExtInfo = ((HookNotification.EntityCreateRequestV2) hookNotification).getEntities();
break;
case ENTITY_FULL_UPDATE_V2:
entitiesWithExtInfo = ((HookNotification.EntityUpdateRequestV2) hookNotification).getEntities();
break;
}
return entitiesWithExtInfo;
}
private static void preprocessEntities(List<HookNotification> hookNotifications) {
for (int i = 0; i < hookNotifications.size(); i++) {
HookNotification hookNotification = hookNotifications.get(i);
AtlasEntity.AtlasEntitiesWithExtInfo entitiesWithExtInfo = getAtlasEntitiesWithExtInfo(hookNotification);
if (entitiesWithExtInfo == null) {
return;
}
List<AtlasEntity> entities = entitiesWithExtInfo.getEntities();
entities = ((entities != null) ? entities : Collections.emptyList());
entities.removeIf((AtlasEntity entity) -> isMatch(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString(), entitiesToIgnore));
Map<String, AtlasEntity> referredEntitiesMap = entitiesWithExtInfo.getReferredEntities();
referredEntitiesMap = ((referredEntitiesMap != null) ? referredEntitiesMap: Collections.emptyMap());
referredEntitiesMap.entrySet().removeIf((Map.Entry<String, AtlasEntity> entry) -> isMatch(entry.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString(), entitiesToIgnore));
if (CollectionUtils.isEmpty(entities) && CollectionUtils.isEmpty(referredEntitiesMap.values())) {
hookNotifications.remove(i--);
LOG.info("ignored message: {}", hookNotification);
}
}
}
private static void notifyEntitiesPostPreprocess(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries, MessageSource source) {
if (shouldPreprocess) {
preprocessEntities(messages);
}
if (CollectionUtils.isNotEmpty(messages)) {
notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger, source);
}
}
/**
* Notify atlas of the entity through message. The entity can be a
* complex entity with reference to other entities.
@ -174,12 +255,12 @@ public abstract class AtlasHook {
*/
public static void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries, MessageSource source) {
if (executor == null) { // send synchronously
notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger, source);
notifyEntitiesPostPreprocess(messages, ugi, maxRetries, source);
} else {
executor.submit(new Runnable() {
@Override
public void run() {
notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger, source);
notifyEntitiesPostPreprocess(messages, ugi, maxRetries, source);
}
});
}

View File

@ -39,6 +39,7 @@ import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV
import org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2;
import org.apache.atlas.notification.NotificationInterface.NotificationType;
import org.apache.atlas.notification.preprocessor.EntityPreprocessor;
import org.apache.atlas.notification.preprocessor.GenericEntityPreprocessor;
import org.apache.atlas.notification.preprocessor.PreprocessorContext;
import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
import org.apache.atlas.repository.store.graph.EntityCorrelationStore;
@ -149,6 +150,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633";
public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold";
public static final String CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.entity.type.ignore.pattern";
public static final String CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.entity.ignore.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.prune.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE = "atlas.notification.consumer.preprocess.hive_table.cache.size";
@ -182,6 +185,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final boolean updateHiveProcessNameWithQualifiedName;
private final int largeMessageProcessingTimeThresholdMs;
private final boolean consumerDisabled;
private final List<Pattern> entityTypesToIgnore = new ArrayList<>();
private final List<Pattern> entitiesToIgnore = new ArrayList<>();
private final List<Pattern> hiveTablesToIgnore = new ArrayList<>();
private final List<Pattern> hiveTablesToPrune = new ArrayList<>();
private final List<String> hiveDummyDatabasesToIgnore;
@ -246,9 +251,36 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
authnCache = (authorizeUsingMessageUser && authnCacheTtlSeconds > 0) ? new PassiveExpiringMap<>(authnCacheTtlSeconds * 1000) : null;
String[] patternEntityTypesToIgnore = applicationProperties.getStringArray(CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN);
String[] patternEntitiesToIgnore = applicationProperties.getStringArray(CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN);
String[] patternHiveTablesToIgnore = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN);
String[] patternHiveTablesToPrune = applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN);
if (patternEntityTypesToIgnore != null) {
for (String pattern: patternEntityTypesToIgnore) {
try {
this.entityTypesToIgnore.add(Pattern.compile(pattern));
} catch (Throwable t) {
LOG.warn("failed to compile pattern {}", pattern, t);
LOG.warn("Ignoring invalid pattern in configuration {}: {}", CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN, pattern);
}
}
LOG.info("{}={}", CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN, entityTypesToIgnore);
}
if (patternEntitiesToIgnore != null) {
for (String pattern: patternEntitiesToIgnore) {
try {
this.entitiesToIgnore.add(Pattern.compile(pattern));
} catch (Throwable t) {
LOG.warn("failed to compile pattern {}", pattern, t);
LOG.warn("Ignoring invalid pattern in configuration {}: {}", CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN, pattern);
}
}
LOG.info("{}={}", CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN, entitiesToIgnore);
}
if (patternHiveTablesToIgnore != null) {
for (String pattern : patternHiveTablesToIgnore) {
try {
@ -1073,6 +1105,36 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
private void preprocessEntities(PreprocessorContext context) {
GenericEntityPreprocessor genericEntityPreprocessor = new GenericEntityPreprocessor(this.entityTypesToIgnore, this.entitiesToIgnore);
List<AtlasEntity> entities = context.getEntities();
if (entities != null) {
for (int i = 0; i < entities.size(); i++) {
AtlasEntity entity = entities.get(i);
genericEntityPreprocessor.preprocess(entity, context);
if (context.isIgnoredEntity(entity.getGuid())) {
entities.remove(i--);
}
}
}
Map<String, AtlasEntity> referredEntities = context.getReferredEntities();
if (referredEntities != null) {
for (Iterator<Map.Entry<String, AtlasEntity>> iterator = referredEntities.entrySet().iterator(); iterator.hasNext(); ) {
AtlasEntity entity = iterator.next().getValue();
genericEntityPreprocessor.preprocess(entity, context);
if (context.isIgnoredEntity(entity.getGuid())) {
iterator.remove();
}
}
}
}
private PreprocessorContext preProcessNotificationMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
PreprocessorContext context = null;
@ -1081,6 +1143,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs,
rdbmsTypesRemoveOwnedRefAttrs, s3V2DirectoryPruneObjectPrefix, updateHiveProcessNameWithQualifiedName, entityCorrelationManager);
if (CollectionUtils.isNotEmpty(this.entityTypesToIgnore) || CollectionUtils.isNotEmpty(this.entitiesToIgnore)) {
preprocessEntities(context);
}
if (context.isHivePreprocessEnabled()) {
preprocessHiveTypes(context);
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.preprocessor;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.regex.Pattern;
public class GenericEntityPreprocessor extends EntityPreprocessor {
private static final Logger LOG = LoggerFactory.getLogger(GenericEntityPreprocessor.class);
private final List<Pattern> entitiesToIgnore;
private final List<Pattern> entityTypesToIgnore;
public GenericEntityPreprocessor(List<Pattern> entityTypesToIgnore, List<Pattern> entitiesToIgnore) {
super("Generic");
this.entityTypesToIgnore = entityTypesToIgnore;
this.entitiesToIgnore = entitiesToIgnore;
}
private boolean isMatch(String property, List<Pattern> patterns) {
return patterns.stream().anyMatch((Pattern pattern) -> pattern.matcher(property).matches());
}
private boolean isToBeIgnored(AtlasEntity entity) {
String qualifiedName = getQualifiedName(entity);
boolean decision = false;
if (CollectionUtils.isEmpty(this.entityTypesToIgnore)) { // Will Ignore all entities whose qualified name matches the ignore pattern.
decision = isMatch(qualifiedName, this.entitiesToIgnore);
} else if (CollectionUtils.isEmpty(this.entitiesToIgnore)) { // Will Ignore all entities whose type matches the regex given.
decision = isMatch(entity.getTypeName(), this.entityTypesToIgnore);
} else { // Combination of above 2 cases.
decision = isMatch(entity.getTypeName(), this.entityTypesToIgnore) && isMatch(qualifiedName, this.entitiesToIgnore);
}
return decision;
}
@Override
public void preprocess(AtlasEntity entity, PreprocessorContext context) {
if (entity != null && isToBeIgnored(entity)) {
context.addToIgnoredEntities(entity);
}
}
}

View File

@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.preprocessor;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.notification.hook.HookMessageDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import static org.apache.atlas.notification.preprocessor.EntityPreprocessor.getQualifiedName;
public class GenericEntityPreprocessorTest {
private static final Logger LOG = LoggerFactory.getLogger(GenericEntityPreprocessorTest.class);
private final HookMessageDeserializer deserializer = new HookMessageDeserializer();
private PreprocessorContext getPreprocessorContext(String msgJson) {
HookNotification hookNotification = deserializer.deserialize(msgJson);
AtlasKafkaMessage<HookNotification> kafkaMsg = new AtlasKafkaMessage<>(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1);
PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(), false, false, true,
false, null);
return context;
}
private boolean isMatch(List<Pattern> patterns, String property) {
for (Pattern p : patterns) {
if (p.matcher(property).matches()) {
return true;
}
}
return false;
}
public void testEntityTypesToIgnore(String msgJson, List<Pattern> entityTypesToIgnore) {
PreprocessorContext context = getPreprocessorContext(msgJson);
List<AtlasEntity> entities = context.getEntities();
Set<String> filteredEntitiesActual = filterEntity(entities, (AtlasEntity entity) -> isMatch(entityTypesToIgnore, entity.getTypeName()));
if (context.getReferredEntities() != null) {
filteredEntitiesActual.addAll(filterEntity(new ArrayList<>(context.getReferredEntities().values()), (AtlasEntity entity) -> isMatch(entityTypesToIgnore, entity.getTypeName())));
}
GenericEntityPreprocessor entityPreprocessor = new GenericEntityPreprocessor(entityTypesToIgnore, Collections.emptyList());
preprocessEntities(entityPreprocessor, context);
Assert.assertEquals(filteredEntitiesActual, context.getIgnoredEntities());
}
private void preprocessEntities(GenericEntityPreprocessor genericEntityPreprocessor, PreprocessorContext context) {
List<AtlasEntity> entities = context.getEntities();
if (entities != null) {
for (int i = 0; i < entities.size(); i++) {
AtlasEntity entity = entities.get(i);
genericEntityPreprocessor.preprocess(entity, context);
if (context.isIgnoredEntity(entity.getGuid())) {
entities.remove(i--);
}
}
}
Map<String, AtlasEntity> referredEntities = context.getReferredEntities();
if (referredEntities != null) {
for (Iterator<Map.Entry<String, AtlasEntity>> iterator = referredEntities.entrySet().iterator(); iterator.hasNext(); ) {
AtlasEntity entity = iterator.next().getValue();
genericEntityPreprocessor.preprocess(entity, context);
if (context.isIgnoredEntity(entity.getGuid())) {
iterator.remove();
}
}
}
}
public void testEntitiesToIgnoreByQName(String msgJson, List<Pattern> entitiesToIgnore) {
PreprocessorContext context = getPreprocessorContext(msgJson);
List<AtlasEntity> entities = context.getEntities();
Set<String> filteredEntitiesActual = filterEntity(entities, (AtlasEntity entity) -> isMatch(entitiesToIgnore, getQualifiedName(entity)));
if (context.getReferredEntities() != null) {
filteredEntitiesActual.addAll(filterEntity(new ArrayList<>(context.getReferredEntities().values()), (AtlasEntity entity) -> isMatch(entitiesToIgnore, getQualifiedName(entity))));
}
GenericEntityPreprocessor entityPreprocessor = new GenericEntityPreprocessor(Collections.emptyList(), entitiesToIgnore);
preprocessEntities(entityPreprocessor, context);
Assert.assertEquals(filteredEntitiesActual, context.getIgnoredEntities());
}
private Set<String> filterEntity(List<AtlasEntity> entities, Predicate<AtlasEntity> predicate) {
Set<String> filteredEntitiesActual = new HashSet<>();
if (entities != null) {
for (AtlasEntity entity: entities) {
if (predicate.test(entity)) {
filteredEntitiesActual.add(entity.getGuid());
}
}
}
return filteredEntitiesActual;
}
public void testEntitiesToIgnoreByAndTypeQName(String msgJson, List<Pattern> entityTypesToIgnore, List<Pattern> entitiesToIgnore) {
PreprocessorContext context = getPreprocessorContext(msgJson);
List<AtlasEntity> entities = context.getEntities();
Set<String> filteredEntitiesActual = filterEntity(entities, (AtlasEntity entity) ->
isMatch(entityTypesToIgnore, entity.getTypeName()) && isMatch(entitiesToIgnore, getQualifiedName(entity)));
if (context.getReferredEntities() != null) {
filteredEntitiesActual.addAll(filterEntity(new ArrayList<>(context.getReferredEntities().values()), (AtlasEntity entity) ->
isMatch(entityTypesToIgnore, entity.getTypeName()) && isMatch(entitiesToIgnore, getQualifiedName(entity))));
}
GenericEntityPreprocessor entityPreprocessor = new GenericEntityPreprocessor(entityTypesToIgnore, entitiesToIgnore);
preprocessEntities(entityPreprocessor, context);
Assert.assertEquals(filteredEntitiesActual, context.getIgnoredEntities());
}
}