ATLAS-4768: Implement aging for audits stored by Atlas.

Signed-off-by: radhikakundam <radhikakundam@apache.org>
This commit is contained in:
radhikakundam 2023-09-29 10:39:51 -07:00
parent 5f38e92559
commit 79ef3cce1b
21 changed files with 1544 additions and 35 deletions

View File

@ -31,6 +31,7 @@ import com.sun.jersey.multipart.file.StreamDataBodyPart;
import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.model.SearchFilter;
import org.apache.atlas.model.audit.AtlasAuditEntry;
import org.apache.atlas.model.audit.AuditReductionCriteria;
import org.apache.atlas.model.audit.AuditSearchParameters;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.discovery.AtlasQuickSearchResult;
@ -838,6 +839,14 @@ public class AtlasClientV2 extends AtlasBaseClient {
});
}
public void ageoutAtlasAudits(AuditReductionCriteria auditReductionCriteria, boolean useAuditConfig) throws AtlasServiceException {
MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
queryParams.add("useAuditConfig", String.valueOf(useAuditConfig));
callAPI(API_V2.AGEOUT_ATLAS_AUDITS, List.class, auditReductionCriteria, queryParams);
}
// Glossary APIs
public List<AtlasGlossary> getAllGlossaries(String sortByAttribute, int limit, int offset) throws AtlasServiceException {
@ -1269,6 +1278,7 @@ public class AtlasClientV2 extends AtlasBaseClient {
// Admin APIs
public static final API_V2 GET_ATLAS_AUDITS = new API_V2(ATLAS_AUDIT_API, HttpMethod.POST, Response.Status.OK);
public static final API_V2 AGEOUT_ATLAS_AUDITS = new API_V2(ATLAS_AUDIT_API + "ageout/", HttpMethod.POST, Response.Status.OK);
// Glossary APIs
public static final API_V2 GET_ALL_GLOSSARIES = new API_V2(GLOSSARY_URI, HttpMethod.GET, Response.Status.OK);

View File

@ -239,6 +239,15 @@ public final class Constants {
public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime");
public static final String PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "customTime");
/**
* Audit Reduction vertex property keys.
*/
public static final String AUDIT_REDUCTION_PREFIX = INTERNAL_PROPERTY_KEY_PREFIX + "auditReduction_";
public static final String PROPERTY_KEY_AUDIT_REDUCTION_NAME = encodePropertyKey(AUDIT_REDUCTION_PREFIX + "name");
public static final String PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_DEFAULT = encodePropertyKey(AUDIT_REDUCTION_PREFIX + "default");
public static final String PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_CUSTOM = encodePropertyKey(AUDIT_REDUCTION_PREFIX + "custom");
public static final String PROPERTY_KEY_GUIDS_TO_SWEEPOUT = encodePropertyKey(AUDIT_REDUCTION_PREFIX + "sweepout");
public static final String SQOOP_SOURCE = "sqoop";
public static final String FALCON_SOURCE = "falcon";
public static final String HBASE_SOURCE = "hbase";
@ -248,6 +257,20 @@ public final class Constants {
public static final String STORM_SOURCE = "storm";
public static final String FILE_SPOOL_SOURCE = "file_spool";
/**
* Audit Reduction related constants
*/
public enum AtlasAuditAgingType { DEFAULT, CUSTOM, SWEEP }
public static final String AUDIT_REDUCTION_TYPE_NAME = "__auditReductionInfo";
public static final String AUDIT_AGING_TYPE_KEY = "auditAgingType";
public static final String AUDIT_AGING_TTL_KEY = "ttl";
public static final String AUDIT_AGING_COUNT_KEY = "auditCount";
public static final String AUDIT_AGING_ENTITY_TYPES_KEY = "entityTypes";
public static final String AUDIT_AGING_ACTION_TYPES_KEY = "actionTypes";
public static final String AUDIT_AGING_EXCLUDE_ENTITY_TYPES_KEY = "excludeEntityTypes";
public static final String CREATE_EVENTS_AGEOUT_ALLOWED_KEY = "createEventsAgeoutAllowed";
public static final String AUDIT_AGING_SUBTYPES_INCLUDED_KEY = "subTypesIncluded";
/*
* All supported file-format extensions for Bulk Imports through file upload
*/

View File

@ -93,7 +93,23 @@ public enum AtlasConfiguration {
SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1),
UPDATE_COMPOSITE_INDEX_STATUS("atlas.update.composite.index.status", true),
METRICS_TIME_TO_LIVE_HOURS( "atlas.metrics.ttl.hours", 336), // 14 days default
SOLR_INDEX_TX_LOG_TTL_CONF("write.ahead.log.ttl.in.hours", 240); //10 days default
SOLR_INDEX_TX_LOG_TTL_CONF("write.ahead.log.ttl.in.hours", 240), //10 days default
ATLAS_AUDIT_AGING_ENABLED("atlas.audit.aging.enabled", false),
ATLAS_AUDIT_DEFAULT_AGEOUT_ENABLED("atlas.audit.default.ageout.enabled", true),
ATLAS_AUDIT_DEFAULT_AGEOUT_TTL("atlas.audit.default.ageout.ttl.in.days", 90),
ATLAS_AUDIT_DEFAULT_AGEOUT_COUNT("atlas.audit.default.ageout.count", 0),
ATLAS_AUDIT_CUSTOM_AGEOUT_TTL("atlas.audit.custom.ageout.ttl.in.days", 0),
ATLAS_AUDIT_CUSTOM_AGEOUT_COUNT("atlas.audit.custom.ageout.count", 0),
ATLAS_AUDIT_SWEEP_OUT("atlas.audit.sweep.out.enabled", false),
ATLAS_AUDIT_CREATE_EVENTS_AGEOUT_ALLOWED("atlas.audit.create.events.ageout.allowed", false),
ATLAS_AUDIT_AGING_SCHEDULER_FREQUENCY("atlas.audit.aging.scheduler.frequency.in.days", 30),
ATLAS_AUDIT_AGING_SUBTYPES_INCLUDED("atlas.audit.aging.subtypes.included", true),
MIN_TTL_TO_MAINTAIN("atlas.audit.min.ttl.to.maintain", 7),
MIN_AUDIT_COUNT_TO_MAINTAIN("atlas.audit.min.count.to.maintain", 50),
ATLAS_AUDIT_AGING_SEARCH_MAX_LIMIT("atlas.audit.aging.search.maxlimit", 10000),
ATLAS_AUDIT_DEFAULT_AGEOUT_IGNORE_TTL("atlas.audit.default.ageout.ignore.ttl", false),
ATLAS_AUDIT_AGING_TTL_TEST_AUTOMATION("atlas.audit.aging.ttl.test.automation", false); //Only for test automation
private static final Configuration APPLICATION_PROPERTIES;

View File

@ -0,0 +1,226 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.audit;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.io.Serializable;
import java.util.*;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class AuditReductionCriteria implements Serializable {
private static final long serialVersionUID = 1L;
private boolean auditAgingEnabled = false;
private boolean defaultAgeoutEnabled = false;
private boolean auditSweepoutEnabled = false;
private boolean createEventsAgeoutAllowed = false;
private boolean subTypesIncluded = true;
private boolean ignoreDefaultAgeoutTTL = false;
private int defaultAgeoutAuditCount;
private int defaultAgeoutTTLInDays;
private int customAgeoutAuditCount;
private int customAgeoutTTLInDays;
private String customAgeoutEntityTypes;
private String customAgeoutActionTypes;
private String sweepoutEntityTypes;
private String sweepoutActionTypes;
public boolean isAuditAgingEnabled() {
return auditAgingEnabled;
}
public void setAuditAgingEnabled(boolean auditAgingEnabled) {
this.auditAgingEnabled = auditAgingEnabled;
}
public boolean isDefaultAgeoutEnabled() {
return defaultAgeoutEnabled;
}
public void setDefaultAgeoutEnabled(boolean defaultAgeoutEnabled) {
this.defaultAgeoutEnabled = defaultAgeoutEnabled;
}
public boolean isAuditSweepoutEnabled() {
return auditSweepoutEnabled;
}
public void setAuditSweepoutEnabled(boolean auditSweepoutEnabled) {
this.auditSweepoutEnabled = auditSweepoutEnabled;
}
public boolean isCreateEventsAgeoutAllowed() {
return createEventsAgeoutAllowed;
}
public void setCreateEventsAgeoutAllowed(boolean createEventsAgeoutAllowed) {
this.createEventsAgeoutAllowed = createEventsAgeoutAllowed;
}
public boolean isSubTypesIncluded() {
return subTypesIncluded;
}
public void setSubTypesIncluded(boolean subTypesIncluded) {
this.subTypesIncluded = subTypesIncluded;
}
public boolean ignoreDefaultAgeoutTTL() {
return ignoreDefaultAgeoutTTL;
}
public void setIgnoreDefaultAgeoutTTL(boolean ignoreDefaultAgeoutTTL) {
this.ignoreDefaultAgeoutTTL = ignoreDefaultAgeoutTTL;
}
public int getDefaultAgeoutTTLInDays() {
return defaultAgeoutTTLInDays;
}
public void setDefaultAgeoutTTLInDays(int defaultAgeoutTTLInDays) {
this.defaultAgeoutTTLInDays = defaultAgeoutTTLInDays;
}
public int getDefaultAgeoutAuditCount() {
return defaultAgeoutAuditCount;
}
public void setDefaultAgeoutAuditCount(int defaultAgeoutAuditCount) {
this.defaultAgeoutAuditCount = defaultAgeoutAuditCount;
}
public int getCustomAgeoutTTLInDays() {
return customAgeoutTTLInDays;
}
public void setCustomAgeoutTTLInDays(int customAgeoutTTLInDays) {
this.customAgeoutTTLInDays = customAgeoutTTLInDays;
}
public int getCustomAgeoutAuditCount() {
return customAgeoutAuditCount;
}
public void setCustomAgeoutAuditCount(int customAgeoutAuditCount) {
this.customAgeoutAuditCount = customAgeoutAuditCount;
}
public String getCustomAgeoutEntityTypes() {
return customAgeoutEntityTypes;
}
public void setCustomAgeoutEntityTypes(String customAgeoutEntityTypes) {
this.customAgeoutEntityTypes = customAgeoutEntityTypes;
}
public String getCustomAgeoutActionTypes() {
return customAgeoutActionTypes;
}
public void setCustomAgeoutActionTypes(String customAgeoutActionTypes) {
this.customAgeoutActionTypes = customAgeoutActionTypes;
}
public String getSweepoutEntityTypes() {
return sweepoutEntityTypes;
}
public void setSweepoutEntityTypes(String sweepoutEntityTypes) {
this.sweepoutEntityTypes = sweepoutEntityTypes;
}
public String getSweepoutActionTypes() {
return sweepoutActionTypes;
}
public void setSweepoutActionTypes(String sweepoutActionTypes) {
this.sweepoutActionTypes = sweepoutActionTypes;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AuditReductionCriteria that = (AuditReductionCriteria) o;
return auditAgingEnabled == that.auditAgingEnabled &&
defaultAgeoutEnabled == that.defaultAgeoutEnabled &&
auditSweepoutEnabled == that.auditSweepoutEnabled &&
createEventsAgeoutAllowed == that.createEventsAgeoutAllowed &&
subTypesIncluded == that.subTypesIncluded &&
ignoreDefaultAgeoutTTL == that.ignoreDefaultAgeoutTTL &&
defaultAgeoutAuditCount == that.defaultAgeoutAuditCount &&
defaultAgeoutTTLInDays == that.defaultAgeoutTTLInDays &&
customAgeoutAuditCount == that.customAgeoutAuditCount &&
customAgeoutTTLInDays == that.customAgeoutTTLInDays &&
Objects.equals(customAgeoutEntityTypes, that.customAgeoutEntityTypes) &&
Objects.equals(customAgeoutActionTypes, that.customAgeoutActionTypes) &&
Objects.equals(sweepoutEntityTypes, that.sweepoutEntityTypes) &&
Objects.equals(sweepoutActionTypes, that.sweepoutActionTypes);
}
@Override
public int hashCode() {
return Objects.hash(auditAgingEnabled, defaultAgeoutEnabled, auditSweepoutEnabled, createEventsAgeoutAllowed, subTypesIncluded, ignoreDefaultAgeoutTTL, defaultAgeoutAuditCount, defaultAgeoutTTLInDays, customAgeoutAuditCount, customAgeoutTTLInDays,
customAgeoutEntityTypes, customAgeoutActionTypes, sweepoutEntityTypes, sweepoutActionTypes);
}
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
sb = new StringBuilder();
}
sb.append('{');
sb.append("auditAgingEnabled='").append(auditAgingEnabled).append('\'');
sb.append(", createEventsAgeoutAllowed='").append(createEventsAgeoutAllowed).append('\'');
sb.append(", subTypesIncluded='").append(subTypesIncluded).append('\'');
sb.append(", defaultAgeoutEnabled='").append(defaultAgeoutEnabled).append('\'');
sb.append(", ignoreDefaultAgeoutTTL='").append(ignoreDefaultAgeoutTTL).append('\'');
sb.append(", defaultAgeoutTTLInDays='").append(defaultAgeoutTTLInDays).append('\'');
sb.append(", defaultAgeoutAuditCount='").append(defaultAgeoutAuditCount).append('\'');
sb.append(", auditSweepoutEnabled='").append(auditSweepoutEnabled).append('\'');
sb.append(", customAgeoutAuditCount='").append(customAgeoutAuditCount).append('\'');
sb.append(", customAgeoutTTLInDays='").append(customAgeoutTTLInDays).append('\'');
sb.append(", customAgeoutEntityTypes=").append(customAgeoutEntityTypes);
sb.append(", customAgeoutActionTypes=").append(customAgeoutActionTypes);
sb.append(", sweepoutEntityTypes=").append(sweepoutEntityTypes);
sb.append(", sweepoutActionTypes=").append(sweepoutActionTypes);
sb.append('}');
return sb;
}
@Override
public String toString() {
return toString(new StringBuilder()).toString();
}
}

View File

@ -22,10 +22,13 @@ package org.apache.atlas.discovery;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.*;
import org.apache.atlas.model.profile.AtlasUserSavedSearch;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
public interface AtlasDiscoveryService {
/**
@ -79,6 +82,14 @@ public interface AtlasDiscoveryService {
*/
AtlasSearchResult searchWithParameters(SearchParameters searchParameters) throws AtlasBaseException;
/**
* Search for guids of entities matching the search criteria
* @param searchParameters Search criteria
* @return GUIDs of matching entities
* @throws AtlasBaseException
*/
Set<String> searchGUIDsWithParameters(AtlasAuditAgingType auditAgingType, Set<String> entityTypes, SearchParameters searchParameters) throws AtlasBaseException;
/**
* Search for relations (edges) matching the search criteria
* @param searchParameters Search criteria
@ -173,4 +184,11 @@ public interface AtlasDiscoveryService {
* @throws IOException
*/
AtlasSearchResultDownloadStatus getSearchResultDownloadStatus() throws IOException;
/**
* Creates task to age out audits
* @param taskParams parameters of AtlasTask
* @return Task created to perform audit aging
*/
AtlasTask createAndQueueAuditReductionTask(Map<String, Object> taskParams, String taskType) throws AtlasBaseException;
}

View File

@ -50,6 +50,7 @@ import org.apache.atlas.query.executors.DSLQueryExecutor;
import org.apache.atlas.query.executors.ScriptEngineBasedExecutor;
import org.apache.atlas.query.executors.TraversalBasedExecutor;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
@ -60,6 +61,7 @@ import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory;
import org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTask;
import org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTaskFactory;
import org.apache.atlas.repository.userprofile.UserProfileService;
@ -467,6 +469,12 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
return searchWithSearchContext(new SearchContext(searchParameters, typeRegistry, graph, indexer.getVertexIndexKeys()));
}
@Override
@GraphTransaction
public Set<String> searchGUIDsWithParameters(AtlasAuditAgingType auditAgingType, Set<String> entityTypes, SearchParameters searchParameters) throws AtlasBaseException {
return searchEntityGUIDs(auditAgingType, entityTypes, new SearchContext(searchParameters, typeRegistry, graph, indexer.getVertexIndexKeys()));
}
@Override
@GraphTransaction
public void createAndQueueSearchResultDownloadTask(Map<String, Object> taskParams) throws AtlasBaseException {
@ -509,6 +517,19 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
return result;
}
@Override
@GraphTransaction
public AtlasTask createAndQueueAuditReductionTask(Map<String, Object> taskParams, String taskType) throws AtlasBaseException {
List<AtlasTask> pendingTasks = taskManagement.getPendingTasksByType(taskType);
if (CollectionUtils.isNotEmpty(pendingTasks) && pendingTasks.size() > AuditReductionTaskFactory.MAX_PENDING_TASKS_ALLOWED) {
throw new AtlasBaseException(PENDING_TASKS_ALREADY_IN_PROGRESS, String.valueOf(pendingTasks.size()));
}
AtlasTask task = taskManagement.createTask(taskType, RequestContext.getCurrentUser(), taskParams);
RequestContext.get().queueTask(task);
return task;
}
@Override
@GraphTransaction
public AtlasSearchResult searchRelationsWithParameters(RelationshipSearchParameters searchParameters) throws AtlasBaseException {
@ -549,6 +570,45 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
return ret;
}
public Set<String> searchEntityGUIDs(AtlasAuditAgingType auditAgingType, Set<String> entityTypes, SearchContext searchContext) throws AtlasBaseException {
SearchParameters searchParameters = searchContext.getSearchParameters();
final QueryParams params = QueryParams.getNormalizedParams(searchParameters.getLimit(),searchParameters.getOffset());
String searchID = searchTracker.add(searchContext); // For future cancellations
searchParameters.setLimit(params.limit());
searchParameters.setOffset(params.offset());
Set<String> guids = new HashSet<>();
try {
List<AtlasVertex> resultList = searchContext.getSearchProcessor().execute();
do {
for (AtlasVertex atlasVertex : resultList) {
if (atlasVertex != null && checkVertexMatchesSearchCriteria(atlasVertex, auditAgingType, entityTypes)) {
guids.add(atlasVertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class));
}
}
searchParameters.setOffset(searchParameters.getOffset() + searchParameters.getLimit());
resultList = searchContext.getSearchProcessor().execute();
} while (CollectionUtils.isNotEmpty(resultList));
LOG.info("Total {} entities are eligible for Audit aging", guids.size());
} catch (Throwable t) {
LOG.error("Error while retrieving eligible entities for audit aging");
} finally {
searchTracker.remove(searchID);
}
return guids;
}
private boolean checkVertexMatchesSearchCriteria(AtlasVertex vertex, AtlasAuditAgingType auditAgingType, Set<String> entityTypes) {
if (CollectionUtils.isEmpty(entityTypes)) {
return true;
}
String typeName = vertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class);
boolean typeNameMatchesCriteria = entityTypes.contains(typeName);
return (auditAgingType == AtlasAuditAgingType.DEFAULT) ? !typeNameMatchesCriteria : typeNameMatchesCriteria;
}
private AtlasSearchResult searchWithSearchContext(SearchContext searchContext) throws AtlasBaseException {
SearchParameters searchParameters = searchContext.getSearchParameters();
AtlasSearchResult ret = new AtlasSearchResult(searchParameters);

View File

@ -24,6 +24,7 @@ import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import org.apache.atlas.service.Service;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
@ -31,10 +32,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
/**
* This abstract base class should be used when adding support for an audit storage backend.
@ -162,4 +160,9 @@ public abstract class AbstractStorageBasedAuditRepository implements Service, En
return Bytes.toBytes(keyStr);
}
@Override
public List<EntityAuditEventV2> deleteEventsV2(String entityId, Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short auditCount, int ttlInDays, boolean createEventsAgeoutAllowed, AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException {
return null;
}
}

View File

@ -0,0 +1,363 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.audit;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.discovery.AtlasDiscoveryService;
import org.apache.atlas.model.audit.AuditReductionCriteria;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.IntervalTask;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory.ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL;
@Component
public class AtlasAuditReductionService implements SchedulingConfigurer {
private static final Logger LOG = LoggerFactory.getLogger(AtlasAuditReductionService.class);
private final AtlasGraph graph;
private final AtlasDiscoveryService discoveryService;
private final AtlasTypeRegistry typeRegistry;
private static final String VALUE_DELIMITER = ",";
private static final String ATLAS_AUDIT_SWEEP_OUT_ENTITY_TYPES = "atlas.audit.sweep.out.entity.types";
private static final String ATLAS_AUDIT_SWEEP_OUT_ACTION_TYPES = "atlas.audit.sweep.out.action.types";
private static final String ATLAS_AUDIT_CUSTOM_AGEOUT_ENTITY_TYPES = "atlas.audit.custom.ageout.entity.types";
private static final String ATLAS_AUDIT_CUSTOM_AGEOUT_ACTION_TYPES = "atlas.audit.custom.ageout.action.types";
private static final String ATLAS_AUDIT_AGING_SCHEDULER_INITIAL_DELAY = "atlas.audit.aging.scheduler.initial.delay.in.min";
private static final int MIN_TTL_TO_MAINTAIN = AtlasConfiguration.MIN_TTL_TO_MAINTAIN.getInt();
private static final int MIN_AUDIT_COUNT_TO_MAINTAIN = AtlasConfiguration.MIN_AUDIT_COUNT_TO_MAINTAIN.getInt();
private Configuration atlasConfiguration;
private AuditReductionCriteria ageoutCriteriaByConfig;
private List<Map<String, Object>> ageoutTypeCriteriaMap;
@Inject
public AtlasAuditReductionService(Configuration config, AtlasGraph graph, AtlasDiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) {
this.atlasConfiguration = config;
this.graph = graph;
this.discoveryService = discoveryService;
this.typeRegistry = typeRegistry;
}
public List<AtlasTask> startAuditAgingByConfig() {
List<AtlasTask> tasks = null;
try {
if (ageoutCriteriaByConfig == null) {
ageoutCriteriaByConfig = convertConfigToAuditReductionCriteria();
LOG.info("Audit aging is enabled by configuration");
}
LOG.info("Audit aging is triggered with configuration: {}", ageoutCriteriaByConfig.toString());
if (ageoutTypeCriteriaMap == null) {
ageoutTypeCriteriaMap = buildAgeoutCriteriaForAllAgingTypes(ageoutCriteriaByConfig);
}
tasks = startAuditAgingByCriteria(ageoutTypeCriteriaMap);
} catch (Exception e) {
LOG.error("Error while aging out audits by configuration: ", e);
}
return tasks;
}
public List<AtlasTask> startAuditAgingByCriteria(List<Map<String, Object>> ageoutTypeCriteriaMap) {
if (CollectionUtils.isEmpty(ageoutTypeCriteriaMap)) {
return null;
}
List<AtlasTask> tasks = new ArrayList<>();
try {
for (Map<String, Object> eachCriteria : ageoutTypeCriteriaMap) {
AtlasTask auditAgingTask = discoveryService.createAndQueueAuditReductionTask(eachCriteria, ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL);
if (auditAgingTask != null) {
tasks.add(auditAgingTask);
}
}
} catch (Exception e) {
LOG.error("Error while aging out audits by criteria: ", e);
}
return tasks;
}
private AuditReductionCriteria convertConfigToAuditReductionCriteria() {
boolean auditAgingEnabled = AtlasConfiguration.ATLAS_AUDIT_AGING_ENABLED.getBoolean();
boolean createAuditsAgeoutAllowed = AtlasConfiguration.ATLAS_AUDIT_CREATE_EVENTS_AGEOUT_ALLOWED.getBoolean();
boolean subTypesIncluded = AtlasConfiguration.ATLAS_AUDIT_AGING_SUBTYPES_INCLUDED.getBoolean();
boolean ignoreDefaultAgeoutTTL = AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_IGNORE_TTL.getBoolean();
int defaultAgeoutTTLConfigured = AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_TTL.getInt();
int defaultAgeoutAuditCountConfigured = AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_COUNT.getInt();
int customAgeoutTTLConfigured = AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_TTL.getInt();
int customAgeoutAuditCountConfigured = AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_COUNT.getInt();
boolean defaultAgeoutEnabled = AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_ENABLED.getBoolean();
int defaultAgeoutTTL = getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_TTL, defaultAgeoutTTLConfigured, MIN_TTL_TO_MAINTAIN);
int defaultAgeoutAuditCount = defaultAgeoutAuditCountConfigured <= 0 ? defaultAgeoutAuditCountConfigured
: getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_COUNT, defaultAgeoutAuditCountConfigured, MIN_AUDIT_COUNT_TO_MAINTAIN);
int customAgeoutTTL = customAgeoutTTLConfigured <= 0 ? customAgeoutTTLConfigured
: getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_TTL, customAgeoutTTLConfigured, MIN_TTL_TO_MAINTAIN);
int customAgeoutAuditCount = customAgeoutAuditCountConfigured <= 0 ? customAgeoutAuditCountConfigured
: getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_COUNT, customAgeoutAuditCountConfigured, MIN_AUDIT_COUNT_TO_MAINTAIN);
String customAgeoutEntityTypes = getStringOf(ATLAS_AUDIT_CUSTOM_AGEOUT_ENTITY_TYPES);
String customAgeoutActionTypes = getStringOf(ATLAS_AUDIT_CUSTOM_AGEOUT_ACTION_TYPES);
AuditReductionCriteria auditReductionCriteria = new AuditReductionCriteria();
auditReductionCriteria.setAuditAgingEnabled(auditAgingEnabled);
auditReductionCriteria.setCreateEventsAgeoutAllowed(createAuditsAgeoutAllowed);
auditReductionCriteria.setSubTypesIncluded(subTypesIncluded);
auditReductionCriteria.setIgnoreDefaultAgeoutTTL(ignoreDefaultAgeoutTTL);
auditReductionCriteria.setDefaultAgeoutEnabled(defaultAgeoutEnabled);
auditReductionCriteria.setDefaultAgeoutTTLInDays(defaultAgeoutTTL);
auditReductionCriteria.setDefaultAgeoutAuditCount(defaultAgeoutAuditCount);
auditReductionCriteria.setCustomAgeoutTTLInDays(customAgeoutTTL);
auditReductionCriteria.setCustomAgeoutAuditCount(customAgeoutAuditCount);
auditReductionCriteria.setCustomAgeoutEntityTypes(customAgeoutEntityTypes);
auditReductionCriteria.setCustomAgeoutActionTypes(customAgeoutActionTypes);
boolean isSweepOutEnabled = AtlasConfiguration.ATLAS_AUDIT_SWEEP_OUT.getBoolean();
auditReductionCriteria.setAuditSweepoutEnabled(isSweepOutEnabled);
if (isSweepOutEnabled) {
String sweepoutEntityTypes = getStringOf(ATLAS_AUDIT_SWEEP_OUT_ENTITY_TYPES);
String sweepoutActionTypes = getStringOf(ATLAS_AUDIT_SWEEP_OUT_ACTION_TYPES);
auditReductionCriteria.setSweepoutEntityTypes(sweepoutEntityTypes);
auditReductionCriteria.setSweepoutActionTypes(sweepoutActionTypes);
}
return auditReductionCriteria;
}
public List<Map<String, Object>> buildAgeoutCriteriaForAllAgingTypes(AuditReductionCriteria auditReductionCriteria) {
if (auditReductionCriteria == null || !auditReductionCriteria.isAuditAgingEnabled()) {
return null;
}
List<Map<String, Object>> auditAgeoutCriteriaByType = new ArrayList<>();
Set<String> defaultAgeoutEntityTypesToExclude = new HashSet<>();
Set<String> defaultAgeoutActionTypes = Arrays.stream(EntityAuditEventV2.EntityAuditActionV2.values()).map(x -> x.toString()).collect(Collectors.toSet());
boolean createEventsAgeoutAllowed = auditReductionCriteria.isCreateEventsAgeoutAllowed();
boolean subTypesIncluded = auditReductionCriteria.isSubTypesIncluded();
boolean ignoreDefaultAgeoutTTL = auditReductionCriteria.ignoreDefaultAgeoutTTL();
boolean defaultAgeoutEnabled = auditReductionCriteria.isDefaultAgeoutEnabled();
int defaultAgeoutTTL = ignoreDefaultAgeoutTTL ? 0 : getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_TTL, auditReductionCriteria.getDefaultAgeoutTTLInDays(), MIN_TTL_TO_MAINTAIN);
int defaultAgeoutAuditCount = auditReductionCriteria.getDefaultAgeoutAuditCount() <= 0 ? auditReductionCriteria.getDefaultAgeoutAuditCount()
: getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_COUNT, auditReductionCriteria.getDefaultAgeoutAuditCount(), MIN_AUDIT_COUNT_TO_MAINTAIN);
int customAgeoutTTL = auditReductionCriteria.getCustomAgeoutTTLInDays() <= 0 ? auditReductionCriteria.getCustomAgeoutTTLInDays()
: getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_TTL, auditReductionCriteria.getCustomAgeoutTTLInDays(), MIN_TTL_TO_MAINTAIN);
int customAgeoutAuditCount = auditReductionCriteria.getCustomAgeoutAuditCount() <= 0 ? auditReductionCriteria.getCustomAgeoutAuditCount()
: getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_COUNT, auditReductionCriteria.getCustomAgeoutAuditCount(), MIN_AUDIT_COUNT_TO_MAINTAIN);
Set<String> customAgeoutEntityTypes = getUniqueListOf(auditReductionCriteria.getCustomAgeoutEntityTypes());
Set<String> customAgeoutActionTypes = getValidActionTypes(AtlasAuditAgingType.CUSTOM, getUniqueListOf(auditReductionCriteria.getCustomAgeoutActionTypes()));
defaultAgeoutEntityTypesToExclude.addAll(customAgeoutEntityTypes);
if (CollectionUtils.isEmpty(customAgeoutEntityTypes)) {
defaultAgeoutActionTypes.removeAll(customAgeoutActionTypes);
}
boolean isSweepOutEnabled = auditReductionCriteria.isAuditSweepoutEnabled();
if (isSweepOutEnabled) {
Set<String> sweepOutEntityTypes = getUniqueListOf(auditReductionCriteria.getSweepoutEntityTypes());
Set<String> sweepOutActionTypes = getValidActionTypes(AtlasAuditAgingType.SWEEP, getUniqueListOf(auditReductionCriteria.getSweepoutActionTypes()));
if (CollectionUtils.isNotEmpty(sweepOutEntityTypes) || CollectionUtils.isNotEmpty(sweepOutActionTypes)) {
Map<String, Object> sweepAgeoutCriteria = getAgeoutCriteriaMap(AtlasAuditAgingType.SWEEP, 0, 0, sweepOutEntityTypes, sweepOutActionTypes, createEventsAgeoutAllowed, subTypesIncluded);
auditAgeoutCriteriaByType.add(sweepAgeoutCriteria);
} else {
LOG.error("Sweepout of audits is skipped.At least one of two properties: entity types/action types should be configured.");
}
defaultAgeoutEntityTypesToExclude.addAll(sweepOutEntityTypes);
customAgeoutEntityTypes.removeAll(sweepOutEntityTypes);
if (CollectionUtils.isEmpty(sweepOutEntityTypes)) {
defaultAgeoutActionTypes.removeAll(sweepOutActionTypes);
customAgeoutActionTypes.removeAll(sweepOutActionTypes);
}
}
if ((customAgeoutTTL > 0 || customAgeoutAuditCount > 0) && (CollectionUtils.isNotEmpty(customAgeoutEntityTypes) || CollectionUtils.isNotEmpty(customAgeoutActionTypes))) {
Map<String, Object> customAgeoutCriteria = getAgeoutCriteriaMap(AtlasAuditAgingType.CUSTOM, customAgeoutTTL, customAgeoutAuditCount, customAgeoutEntityTypes, customAgeoutActionTypes, createEventsAgeoutAllowed, subTypesIncluded);
auditAgeoutCriteriaByType.add(customAgeoutCriteria);
} else if (customAgeoutTTL <= 0 && customAgeoutAuditCount <= 0 && CollectionUtils.isEmpty(customAgeoutEntityTypes) && CollectionUtils.isEmpty(customAgeoutActionTypes)) {
//Do Nothing
} else if (customAgeoutTTL <= 0 && customAgeoutAuditCount <= 0) {
LOG.error("Custom Audit aging is skipped.At least one of two properties: TTL/Audit Count should be configured.");
} else {
LOG.error("Custom Audit aging is skipped.At least one of two properties: entity types/action types should be configured.");
}
if (defaultAgeoutEnabled) {
if (ignoreDefaultAgeoutTTL) {
LOG.info("'{}' config property or 'ignoreDefaultAgeoutTTL' property in API configured as: {}, Default audit aging will be done by audit count only", AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_IGNORE_TTL.getPropertyName(), ignoreDefaultAgeoutTTL);
}
/**In case of default ageout with all available audit actions, query to ATLAS_ENTITY_AUDIT_EVENTS table
* without any action type provides data for all audit actions and is more performant than
* multiple queries to ATLAS_ENTITY_AUDIT_EVENTS with each action type
*/
if (defaultAgeoutActionTypes.size() == EntityAuditEventV2.EntityAuditActionV2.values().length) {
defaultAgeoutActionTypes.clear();
}
if (!ignoreDefaultAgeoutTTL || defaultAgeoutAuditCount > 0) {
Map<String, Object> defaultAgeoutCriteria = getAgeoutCriteriaMap(AtlasAuditAgingType.DEFAULT, defaultAgeoutTTL, defaultAgeoutAuditCount, defaultAgeoutEntityTypesToExclude, defaultAgeoutActionTypes, createEventsAgeoutAllowed, subTypesIncluded);
auditAgeoutCriteriaByType.add(defaultAgeoutCriteria);
} else {
LOG.error("Default Audit aging is skipped. Valid audit count should be configured when TTL criteria is ignored.");
}
}
return auditAgeoutCriteriaByType;
}
private Map<String, Object> getAgeoutCriteriaMap(AtlasAuditAgingType agingOption, int ttl, int minCount, Set<String> entityTypes, Set<String> actionTypes, boolean createEventsAgeoutAllowed, boolean subTypesIncluded) {
Map<String, Object> auditAgingOptions = new HashMap<>();
auditAgingOptions.put(AUDIT_AGING_TYPE_KEY, agingOption);
auditAgingOptions.put(AUDIT_AGING_TTL_KEY, ttl);
auditAgingOptions.put(AUDIT_AGING_COUNT_KEY, minCount);
auditAgingOptions.put(AUDIT_AGING_ENTITY_TYPES_KEY, entityTypes);
auditAgingOptions.put(AUDIT_AGING_ACTION_TYPES_KEY, actionTypes);
auditAgingOptions.put(CREATE_EVENTS_AGEOUT_ALLOWED_KEY, createEventsAgeoutAllowed);
auditAgingOptions.put(AUDIT_AGING_SUBTYPES_INCLUDED_KEY, subTypesIncluded);
return auditAgingOptions;
}
private int getGuaranteedMinValueOf(AtlasConfiguration configuration, int configuredValue, int minValueToMaintain) {
if (configuredValue < minValueToMaintain) {
LOG.info("Minimum value for '{}' should be {}", configuration.getPropertyName(), minValueToMaintain);
}
return configuredValue < minValueToMaintain ? minValueToMaintain : configuredValue;
}
private String getStringOf(String configProperty) {
String configuredValue = null;
if (StringUtils.isNotEmpty(configProperty)) {
configuredValue = String.join(VALUE_DELIMITER , (List) atlasConfiguration.getList(configProperty));
}
return configuredValue;
}
private Set<String> getUniqueListOf(String value) {
Set<String> configuredValues = null;
if (StringUtils.isNotEmpty(value)) {
configuredValues = Stream.of(value.split(VALUE_DELIMITER)).map(String::trim).collect(Collectors.toSet());
}
return configuredValues == null ? new HashSet<>() : configuredValues;
}
private Set<String> getValidActionTypes(AtlasAuditAgingType auditAgingType, Set<String> actionTypes) {
if (CollectionUtils.isEmpty(actionTypes)) {
return Collections.emptySet();
}
Set<String> allActionTypes = Arrays.stream(EntityAuditEventV2.EntityAuditActionV2.values()).map(x -> x.toString()).collect(Collectors.toSet());
Set<String> entityAuditActions = new HashSet<>();
Set<String> invalidActionTypes = new HashSet<>();
for (String actionType : actionTypes) {
Set<String> matchedActionTypes;
final String actionTypeToMatch = actionType.contains("*") ? actionType.replace("*", "") : actionType;
if (actionTypeToMatch.startsWith("*")) {
matchedActionTypes = allActionTypes.stream().filter(x -> x.contains(actionTypeToMatch)).collect(Collectors.toSet());
} else {
matchedActionTypes = allActionTypes.stream().filter(x -> x.startsWith(actionTypeToMatch)).collect(Collectors.toSet());
}
if (CollectionUtils.isEmpty(matchedActionTypes)) {
invalidActionTypes.add(actionType);
} else {
entityAuditActions.addAll(matchedActionTypes);
}
}
if (CollectionUtils.isNotEmpty(actionTypes) && CollectionUtils.isEmpty(entityAuditActions)) {
throw new IllegalArgumentException("No enum constant " + EntityAuditEventV2.EntityAuditActionV2.class.getCanonicalName() + "." + String.join(VALUE_DELIMITER, invalidActionTypes));
} else {
LOG.info("Action type name(s) {} provided for aging type-{}", String.join(VALUE_DELIMITER, entityAuditActions), auditAgingType);
}
if (CollectionUtils.isNotEmpty(invalidActionTypes)){
LOG.warn("Invalid action type name(s) {} provided for aging type-{}", String.join(VALUE_DELIMITER, invalidActionTypes), auditAgingType);
}
return entityAuditActions;
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
if (!AtlasConfiguration.ATLAS_AUDIT_AGING_ENABLED.getBoolean()) {
LOG.warn("Audit aging is not enabled");
return;
}
IntervalTask task = new IntervalTask(new Runnable() {
@Override
public void run() {
startAuditAgingByConfig();
}
}, getAuditAgingFrequencyInMillis(), getAuditAgingInitialDelayInMillis());
taskRegistrar.addFixedRateTask(task);
}
private long getAuditAgingFrequencyInMillis() {
int frequencyInDays = getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_AGING_SCHEDULER_FREQUENCY, AtlasConfiguration.ATLAS_AUDIT_AGING_SCHEDULER_FREQUENCY.getInt(), 1);
return frequencyInDays * DateUtils.MILLIS_PER_DAY;
}
private long getAuditAgingInitialDelayInMillis() {
int initialDelayInMins = 1;
try {
initialDelayInMins = ApplicationProperties.get().getInt(ATLAS_AUDIT_AGING_SCHEDULER_INITIAL_DELAY, 1);
} catch (AtlasException ex) {
LOG.error("Error while fetching application properties", ex);
}
return (initialDelayInMins < 1 ? 1 : initialDelayInMins) * DateUtils.MILLIS_PER_MINUTE;
}
}

View File

@ -22,6 +22,7 @@ import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import java.util.List;
import java.util.Set;
@ -92,6 +93,17 @@ public interface EntityAuditRepository {
*/
List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException;
/**
* Delete all events for the given entity id by keeping only auditCount number of events with entityAuditActions
* @param entityId entity id
* @param entityAuditActions operation(s) to be used to filter at HBase column
* @param auditCount Max numbers of events to keep without deleting
* @param ttlInDays time-to-live of events
* @return list of events
* @throws AtlasBaseException
*/
List<EntityAuditEventV2> deleteEventsV2(String entityId, Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short auditCount, int ttlInDays, boolean createEventsAgeoutAllowed, AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException;
/***
* List events for given time range where classifications have been added, deleted or updated.
* @param fromTimestamp from timestamp

View File

@ -20,6 +20,7 @@ package org.apache.atlas.repository.audit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.RequestContext;
@ -28,10 +29,12 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2;
import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
@ -39,6 +42,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -48,6 +52,7 @@ import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
@ -63,14 +68,18 @@ import org.springframework.core.annotation.Order;
import javax.inject.Singleton;
import java.io.Closeable;
import java.io.IOException;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
/**
@ -329,8 +338,13 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
@Override
public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException {
return listEventsV2(entityId, auditAction, sortByColumn, sortOrderDesc, offset, limit, false, true, false, null);
}
private List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit, boolean isAgeoutTransaction, boolean createEventsAgeoutAllowed, boolean allowAgeoutByAuditCount, List<EntityAuditEventV2> eventsToKeep) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={})", entityId, auditAction, sortByColumn, offset, limit);
LOG.debug("==> HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={})",
entityId, auditAction, sortByColumn, sortOrderDesc, offset, limit);
}
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("listEventsV2");
@ -343,7 +357,7 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
offset = 0;
}
if (limit < 0) {
if (!isAgeoutTransaction && limit < 0) {
limit = 100;
}
@ -356,19 +370,29 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
* MultiRowRangeFilter and then again scan the table to get all the columns from the table this time.
*/
Scan scan = new Scan().setReversed(true)
.setCaching(DEFAULT_CACHING)
.setSmall(true)
.setStopRow(Bytes.toBytes(entityId))
.setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE))
.addColumn(COLUMN_FAMILY, COLUMN_ACTION)
.addColumn(COLUMN_FAMILY, COLUMN_USER);
.setCaching(DEFAULT_CACHING)
.setSmall(true)
.setStopRow(Bytes.toBytes(entityId))
.setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE))
.addColumn(COLUMN_FAMILY, COLUMN_ACTION)
.addColumn(COLUMN_FAMILY, COLUMN_USER);
FilterList filterList = new FilterList();
if (auditAction != null) {
Filter filterAction = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(auditAction.toString())));
scan.setFilter(filterAction);
filterList.addFilter(filterAction);
}
if (!createEventsAgeoutAllowed) {
FilterList createEventFilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
Filter filterByCreateActionType = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(EntityAuditActionV2.ENTITY_CREATE.toString())));
Filter filterByImportCreateActionType = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(EntityAuditActionV2.ENTITY_IMPORT_CREATE.toString())));
createEventFilterList.addFilter(filterByCreateActionType);
createEventFilterList.addFilter(filterByImportCreateActionType);
filterList.addFilter(createEventFilterList);
}
scan.setFilter(filterList);
List<EntityAuditEventV2> events = new ArrayList<>();
try (ResultScanner scanner = table.getScanner(scan)) {
@ -383,10 +407,25 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
}
EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);
int fromIndex = Math.min(events.size(), offset);
int endIndex = events.size();
if (limit > 0) {
endIndex = Math.min(events.size(), offset + limit);
}
if (isAgeoutTransaction) {
if (!allowAgeoutByAuditCount) { //No audit events allowed to age-out by audit count
eventsToKeep.addAll(events);
return Collections.emptyList();
}
eventsToKeep.addAll(events.subList(0, fromIndex));
}
events = events.subList(fromIndex, endIndex);
events = events.subList(Math.min(events.size(), offset), Math.min(events.size(), offset + limit));
if (events.size() > 0) {
List<MultiRowRangeFilter.RowRange> ranges = new ArrayList<>();
events.forEach(e -> {
@ -394,11 +433,11 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
});
scan = new Scan().setReversed(true)
.setCaching(DEFAULT_CACHING)
.setSmall(true)
.setStopRow(Bytes.toBytes(entityId))
.setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE))
.setFilter(new MultiRowRangeFilter(ranges));
.setCaching(DEFAULT_CACHING)
.setSmall(true)
.setStopRow(Bytes.toBytes(entityId))
.setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE))
.setFilter(new MultiRowRangeFilter(ranges));
try (ResultScanner scanner = table.getScanner(scan)) {
events = new ArrayList<>();
@ -426,7 +465,8 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={}): #recored returned {}", entityId, auditAction, sortByColumn, offset, limit, events.size());
LOG.debug("<== HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={}): #records returned {}",
entityId, auditAction, sortByColumn, sortOrderDesc, offset, limit, events.size());
}
return events;
@ -452,6 +492,88 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito
return ret;
}
@Override
public List<EntityAuditEventV2> deleteEventsV2(String entityId, Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short allowedAuditCount, int ttlInDays, boolean createEventsAgeoutAllowed, AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException {
final String SORT_BY_COLUMN = EntityAuditEventV2.SORT_COLUMN_TIMESTAMP;
final boolean SORT_ORDER_DESC = true;
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseBasedAuditRepository.deleteEventsV2(entityId={}, auditActions={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={})",
entityId, Arrays.toString(entityAuditActions.toArray()), SORT_BY_COLUMN, SORT_ORDER_DESC, allowedAuditCount, -1);
}
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("deleteEventsV2");
Table table = null;
List<EntityAuditEventV2> eventsEligibleForAgeout = new ArrayList<>();
try {
table = connection.getTable(tableName);
List<EntityAuditEventV2> eventsToKeep = new ArrayList<>();
boolean allowAgeoutByAuditCount = allowedAuditCount > 0 || (auditAgingType == AtlasAuditAgingType.SWEEP);
if (CollectionUtils.isEmpty(entityAuditActions)) {
eventsEligibleForAgeout.addAll(listEventsV2(entityId, null, SORT_BY_COLUMN, SORT_ORDER_DESC,
allowedAuditCount, (short) -1, true, createEventsAgeoutAllowed, allowAgeoutByAuditCount, eventsToKeep));
} else {
for (EntityAuditActionV2 eachAuditAction : entityAuditActions) {
List<EntityAuditEventV2> eventsByEachAuditAction = listEventsV2(entityId, eachAuditAction, SORT_BY_COLUMN, SORT_ORDER_DESC,
allowedAuditCount, (short) -1, true, createEventsAgeoutAllowed, allowAgeoutByAuditCount, eventsToKeep);
if (CollectionUtils.isNotEmpty(eventsByEachAuditAction)) {
eventsEligibleForAgeout.addAll(eventsByEachAuditAction);
}
}
}
if (CollectionUtils.isNotEmpty(eventsToKeep)) {
//Limit events based on configured audit count by grouping events of all action types
if (allowAgeoutByAuditCount && (auditAgingType == AtlasAuditAgingType.DEFAULT || CollectionUtils.isEmpty(entityAuditActions))) {
LOG.debug("Aging out audit events by audit count for entity: {}", entityId);
EntityAuditEventV2.sortEvents(eventsToKeep, SORT_BY_COLUMN, SORT_ORDER_DESC);
if (allowedAuditCount < eventsToKeep.size()) {
eventsEligibleForAgeout.addAll(eventsToKeep.subList(allowedAuditCount, eventsToKeep.size()));
eventsToKeep = eventsToKeep.subList(0, allowedAuditCount);
}
}
//TTL based aging
LocalDateTime now = LocalDateTime.now();
boolean isTTLTestAutomation = AtlasConfiguration.ATLAS_AUDIT_AGING_TTL_TEST_AUTOMATION.getBoolean();
if (ttlInDays > 0) {
LOG.debug("Aging out audit events by TTL for entity: {}", entityId);
long ttlTimestamp = Timestamp.valueOf(isTTLTestAutomation ? now.minusMinutes(ttlInDays) : now.minusDays(ttlInDays)).getTime();
eventsToKeep.forEach(e -> {
if (e.getTimestamp() < ttlTimestamp) {
eventsEligibleForAgeout.add(e);
}
});
}
}
List<Delete> eventsToDelete = new ArrayList<>();
for (EntityAuditEventV2 event : eventsEligibleForAgeout) {
Delete delete = new Delete(Bytes.toBytes(event.getEventKey()));
eventsToDelete.add(delete);
}
if (CollectionUtils.isNotEmpty(eventsToDelete)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting events from table:{} are {}", tableName, Arrays.toString(eventsToDelete.toArray()));
}
table.delete(eventsToDelete);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseBasedAuditRepository.deleteEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={}): ",
entityId, Arrays.toString(entityAuditActions.toArray()), SORT_BY_COLUMN, SORT_ORDER_DESC, allowedAuditCount, -1);
}
} catch (IOException e) {
LOG.error("Failed deleting audit events for guid:{}", entityId);
} finally {
RequestContext.get().endMetricRecord(metric);
close(table);
}
return eventsEligibleForAgeout;
}
private <T> void addColumn(Put put, byte[] columnName, T columnValue) {
if (columnValue != null && !columnValue.toString().isEmpty()) {
put.addColumn(COLUMN_FAMILY, columnName, Bytes.toBytes(columnValue.toString()));

View File

@ -23,10 +23,13 @@ import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Component;
import javax.inject.Singleton;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@ -103,17 +106,59 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository {
@Override
public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException {
return listEventsV2(entityId, auditAction, sortByColumn, sortOrderDesc, 0, offset, limit, true, true);
}
private List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int ttlInDays, int offset, short limit, boolean allowMaxResults, boolean createEventsAgeoutAllowed) throws AtlasBaseException {
List<EntityAuditEventV2> events = new ArrayList<>();
SortedMap<String, EntityAuditEventV2> subMap = auditEventsV2.tailMap(entityId);
for (EntityAuditEventV2 event : subMap.values()) {
if (event.getEntityId().equals(entityId)) {
events.add(event);
if (auditAction == null || event.getAction() == auditAction) {
if (event.getAction() == EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE && !createEventsAgeoutAllowed) {
continue;
}
events.add(event);
}
}
}
if (allowMaxResults && limit == -1) {
limit = (short) events.size();
}
EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);
events = events.subList(
Math.min(events.size(), offset),
Math.min(events.size(), offset + limit));
int fromIndex = Math.min(events.size(), offset);
int endIndex = Math.min(events.size(), offset + limit);
List<EntityAuditEventV2> possibleExpiredEvents = events.subList(0, fromIndex);
events = new ArrayList<>(events.subList(fromIndex, endIndex));
// This is only for Audit Aging, including expired audit events to result
if (CollectionUtils.isNotEmpty(possibleExpiredEvents) && ttlInDays > 0 ) {
LocalDateTime now = LocalDateTime.now();
long ttlTimestamp = Timestamp.valueOf(now.minusDays(ttlInDays)).getTime();
possibleExpiredEvents.removeIf(e -> (auditAction!= null && e.getAction() != auditAction) || e.getTimestamp() > ttlTimestamp);
if (CollectionUtils.isNotEmpty(possibleExpiredEvents)) {
events.addAll(possibleExpiredEvents);
}
}
return events;
}
@Override
public List<EntityAuditEventV2> deleteEventsV2(String entityId, Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short auditCount, int ttlInDays, boolean createEventsAgeoutAllowed, AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException {
List<EntityAuditEventV2> events = new ArrayList<>();
if (CollectionUtils.isEmpty(entityAuditActions)) {
events = listEventsV2(entityId, null, "timestamp", true, ttlInDays, auditCount, (short) -1, true, createEventsAgeoutAllowed);
} else {
for (EntityAuditEventV2.EntityAuditActionV2 auditAction : entityAuditActions) {
List<EntityAuditEventV2> eventsByAction = listEventsV2(entityId, auditAction, "timestamp", true, ttlInDays, auditCount, (short) -1, true, createEventsAgeoutAllowed);
if (CollectionUtils.isNotEmpty(eventsByAction)) {
events.addAll(eventsByAction);
}
}
}
return events;
}

View File

@ -18,10 +18,12 @@
package org.apache.atlas.repository.audit;
import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import org.springframework.stereotype.Component;
import javax.inject.Singleton;
@ -67,6 +69,11 @@ public class NoopEntityAuditRepository implements EntityAuditRepository {
return Collections.emptyList();
}
@Override
public List<EntityAuditEventV2> deleteEventsV2(String entityId, Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short auditCount, int ttlInDays, boolean createEventsAgeoutAllowed, AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException {
return null;
}
@Override
public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String startKey, short maxResultCount) {
return Collections.emptyList();

View File

@ -398,6 +398,9 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
// index recovery
createCommonVertexIndex(management, PROPERTY_KEY_INDEX_RECOVERY_NAME, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
// audit reduction
createCommonVertexIndex(management, PROPERTY_KEY_AUDIT_REDUCTION_NAME, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
//metrics
createCommonVertexIndex(management," __AtlasMetricsStat.metricsId", UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
createCommonVertexIndex(management," __AtlasMetricsStat.__u_metricsId", UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);

View File

@ -0,0 +1,231 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.tasks;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.discovery.AtlasDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.Constants.*;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.tasks.AbstractTask;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import static org.apache.atlas.model.tasks.AtlasTask.Status.COMPLETE;
import static org.apache.atlas.model.tasks.AtlasTask.Status.FAILED;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
import static org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory.AGING_TYPE_PROPERTY_KEY_MAP;
import static org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory.ATLAS_AUDIT_REDUCTION;
public class AuditReductionEntityRetrievalTask extends AbstractTask {
private static final Logger LOG = LoggerFactory.getLogger(AuditReductionEntityRetrievalTask.class);
private static final String VALUE_DELIMITER = ",";
private final AtlasDiscoveryService discoveryService;
private final AtlasTypeRegistry typeRegistry;
private final AtlasGraph graph;
public AuditReductionEntityRetrievalTask(AtlasTask task, AtlasGraph graph, AtlasDiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) {
super(task);
this.graph = graph;
this.discoveryService = discoveryService;
this.typeRegistry = typeRegistry;
}
@Override
public AtlasTask.Status perform() throws Exception {
RequestContext.clear();
Map<String, Object> params = getTaskDef().getParameters();
if (MapUtils.isEmpty(params)) {
LOG.warn("Task: {}: Unable to process task: Parameters is not readable!", getTaskGuid());
return FAILED;
}
String userName = getTaskDef().getCreatedBy();
if (StringUtils.isEmpty(userName)) {
LOG.warn("Task: {}: Unable to process task as user name is empty!", getTaskGuid());
return FAILED;
}
RequestContext.get().setUser(userName, null);
try {
run(params);
setStatus(COMPLETE);
} catch (Exception e) {
LOG.error("Task: {}: Error performing task!", getTaskGuid(), e);
setStatus(FAILED);
throw e;
} finally {
RequestContext.clear();
}
return getStatus();
}
protected void run(Map<String, Object> parameters) throws AtlasBaseException, IOException, AtlasException {
try {
AtlasTask auditAgingTask = createAgingTaskWithEligibleGUIDs(parameters);
if (auditAgingTask != null) {
LOG.info("{} task created for audit aging type-{}", ATLAS_AUDIT_REDUCTION, parameters.get(AUDIT_AGING_TYPE_KEY));
}
} catch (Exception e) {
LOG.error("Error while retrieving entities eligible for audit aging and creating audit aging tasks", e.getMessage());
}
}
protected AtlasTask createAgingTaskWithEligibleGUIDs(Map<String, Object> parameters) throws AtlasBaseException {
final String ALL_ENTITY_TYPES = "_ALL_ENTITY_TYPES";
final int SEARCH_OFFSET = 0;
final int SEARCH_LIMIT = AtlasConfiguration.ATLAS_AUDIT_AGING_SEARCH_MAX_LIMIT.getInt();
Set<String> entityTypes = ((Collection<String>) parameters.get(AUDIT_AGING_ENTITY_TYPES_KEY)).stream().collect(Collectors.toSet());
AtlasAuditAgingType auditAgingType = (AtlasAuditAgingType)parameters.get(AUDIT_AGING_TYPE_KEY);
boolean subTypesIncluded = (boolean)parameters.get(AUDIT_AGING_SUBTYPES_INCLUDED_KEY);
SearchParameters searchEntitiesToReduceAudit = new SearchParameters();
searchEntitiesToReduceAudit.setTypeName(ALL_ENTITY_TYPES);
searchEntitiesToReduceAudit.setOffset(SEARCH_OFFSET);
searchEntitiesToReduceAudit.setLimit(SEARCH_LIMIT);
searchEntitiesToReduceAudit.setIncludeSubTypes(subTypesIncluded);
if (CollectionUtils.isNotEmpty(entityTypes)) {
if (!validateTypesAndIncludeSubTypes(entityTypes, auditAgingType, subTypesIncluded)) {
LOG.error("All entity type names provided for audit aging type-{} are invalid", auditAgingType);
return null;
}
String queryString = String.join(VALUE_DELIMITER, entityTypes);
if (auditAgingType == AtlasAuditAgingType.DEFAULT && StringUtils.isNotEmpty(queryString)) {
queryString = new StringBuilder().append("!").append(queryString).toString();
}
searchEntitiesToReduceAudit.setQuery(queryString);
}
LOG.info("Getting GUIDs eligible for Audit aging type-{} with SearchParameters: {}", auditAgingType.toString(), searchEntitiesToReduceAudit.toString());
Set<String> guids = discoveryService.searchGUIDsWithParameters(auditAgingType, entityTypes, searchEntitiesToReduceAudit);
AtlasVertex auditReductionVertex = getOrCreateVertex();
AtlasTask ageoutTask = updateVertexWithGuidsAndCreateAgingTask(auditReductionVertex, AGING_TYPE_PROPERTY_KEY_MAP.get(auditAgingType), guids, parameters);
/** For DEFAULT audit aging, "entityTypes" should be excluded from _ALL_ENTITY_TYPES i.e., negating the queryString
* Including AUDIT_AGING_EXCLUDE_ENTITY_TYPES_KEY to indicate the same in AtlasTask response to user
*/
if (ageoutTask != null) {
if (auditAgingType == AtlasAuditAgingType.DEFAULT && CollectionUtils.isNotEmpty(entityTypes)) {
ageoutTask.getParameters().put(AUDIT_AGING_EXCLUDE_ENTITY_TYPES_KEY, true);
} else {
ageoutTask.getParameters().put(AUDIT_AGING_EXCLUDE_ENTITY_TYPES_KEY, false);
}
}
return ageoutTask;
}
private boolean validateTypesAndIncludeSubTypes(Set<String> entityTypes, AtlasAuditAgingType auditAgingType, boolean subTypesIncluded) throws AtlasBaseException {
Collection<String> allEntityTypeNames = typeRegistry.getAllEntityDefNames();
Set<String> entityTypesToSearch = new HashSet<>();
Set<String> invalidEntityTypeNames = new HashSet<>();
entityTypes.stream().forEach(entityType -> {
if (entityType.endsWith("*")) {
String suffix = entityType.replace("*", "");
entityTypesToSearch.addAll(allEntityTypeNames.stream().filter(e -> e.startsWith(suffix)).collect(Collectors.toSet()));
} else if (allEntityTypeNames.contains(entityType)) {
entityTypesToSearch.add(entityType);
} else {
invalidEntityTypeNames.add(entityType);
}
});
if (auditAgingType != AtlasAuditAgingType.DEFAULT) {
if (CollectionUtils.isNotEmpty(invalidEntityTypeNames)) {
LOG.warn("Invalid entity type name(s) {} provided for aging type-{}", String.join(VALUE_DELIMITER, invalidEntityTypeNames), auditAgingType);
}
if (CollectionUtils.isEmpty(entityTypesToSearch)) {
return false;
}
}
entityTypes.clear();
entityTypes.addAll(subTypesIncluded ? AtlasEntityType.getEntityTypesAndAllSubTypes(entityTypesToSearch, typeRegistry) : entityTypesToSearch);
return true;
}
@GraphTransaction
private AtlasTask updateVertexWithGuidsAndCreateAgingTask(AtlasVertex vertex, String vertexProperty, Set<String> guids, Map<String, Object> params) throws AtlasBaseException {
List<String> guidsEligibleForAuditReduction = vertex.getProperty(vertexProperty, List.class);
if (CollectionUtils.isEmpty(guidsEligibleForAuditReduction) && CollectionUtils.isEmpty(guids)) {
return null;
}
if (CollectionUtils.isEmpty(guidsEligibleForAuditReduction)) {
guidsEligibleForAuditReduction = new ArrayList<>();
}
if (CollectionUtils.isNotEmpty(guids)) {
guidsEligibleForAuditReduction.addAll(guids);
setEncodedProperty(vertex, vertexProperty, guidsEligibleForAuditReduction);
}
return discoveryService.createAndQueueAuditReductionTask(params, ATLAS_AUDIT_REDUCTION);
}
private AtlasVertex getOrCreateVertex() {
AtlasGraphQuery query = graph.query().has(PROPERTY_KEY_AUDIT_REDUCTION_NAME, AUDIT_REDUCTION_TYPE_NAME);
Iterator<AtlasVertex> results = query.vertices().iterator();
AtlasVertex auditReductionVertex = results.hasNext() ? results.next() : null;
if (auditReductionVertex == null) {
auditReductionVertex = graph.addVertex();
setEncodedProperty(auditReductionVertex, PROPERTY_KEY_AUDIT_REDUCTION_NAME, AUDIT_REDUCTION_TYPE_NAME);
}
return auditReductionVertex;
}
}

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.tasks;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.tasks.AbstractTask;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import static org.apache.atlas.model.tasks.AtlasTask.Status.COMPLETE;
import static org.apache.atlas.model.tasks.AtlasTask.Status.FAILED;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory.AGING_TYPE_PROPERTY_KEY_MAP;
public class AuditReductionTask extends AbstractTask {
private static final Logger LOG = LoggerFactory.getLogger(AuditReductionTask.class);
private static final int GUID_BATCH_SIZE_PER_AGE_OUT_TASK = 100;
private final EntityAuditRepository auditRepository;
private final AtlasGraph graph;
public AuditReductionTask(AtlasTask task, EntityAuditRepository auditRepository, AtlasGraph graph) {
super(task);
this.auditRepository = auditRepository;
this.graph = graph;
}
@Override
public AtlasTask.Status perform() throws Exception {
RequestContext.clear();
Map<String, Object> params = getTaskDef().getParameters();
if (MapUtils.isEmpty(params)) {
LOG.warn("Task: {}: Unable to process task: Parameters is not readable!", getTaskGuid());
return FAILED;
}
String userName = getTaskDef().getCreatedBy();
if (StringUtils.isEmpty(userName)) {
LOG.warn("Task: {}: Unable to process task as user name is empty!", getTaskGuid());
return FAILED;
}
RequestContext.get().setUser(userName, null);
try {
run(params);
setStatus(COMPLETE);
} catch (Exception e) {
LOG.error("Task: {}: Error performing task!", getTaskGuid(), e);
setStatus(FAILED);
throw e;
} finally {
RequestContext.clear();
}
return getStatus();
}
protected void run(Map<String, Object> parameters) throws AtlasBaseException, IOException, AtlasException {
AtlasVertex vertex = findVertex();
if (vertex == null) {
return;
}
Map<String, List<EntityAuditEventV2>> entitiesWithSucceededAgeout = new HashMap<>();
AtlasAuditAgingType auditAgingType = AtlasAuditAgingType.valueOf(String.valueOf(parameters.get(AUDIT_AGING_TYPE_KEY)));
Set<String> actionTypes = ((Collection<String>) parameters.get(AUDIT_AGING_ACTION_TYPES_KEY)).stream().collect(Collectors.toSet());
int auditCountInput = (int) parameters.get(AUDIT_AGING_COUNT_KEY);
short auditCount = auditCountInput > Short.MAX_VALUE ? Short.MAX_VALUE : auditCountInput < Short.MIN_VALUE ? Short.MIN_VALUE : (short)auditCountInput;
int ttl = (int) parameters.get(AUDIT_AGING_TTL_KEY);
boolean createEventsAgeoutAllowed = (boolean) parameters.get(CREATE_EVENTS_AGEOUT_ALLOWED_KEY);
String vertexPropertyKeyForGuids = AGING_TYPE_PROPERTY_KEY_MAP.get(auditAgingType);
List<String> entityGuidsEligibleForAuditAgeout = vertex.getProperty(vertexPropertyKeyForGuids, List.class);
int guidsCount = CollectionUtils.isNotEmpty(entityGuidsEligibleForAuditAgeout) ? entityGuidsEligibleForAuditAgeout.size() : 0;
int batchIndex = 1;
Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions = actionTypes.stream().map(x -> EntityAuditEventV2.EntityAuditActionV2.fromString(x)).collect(Collectors.toSet());
for (int startIndex = 0; startIndex < guidsCount; ) {
int endIndex = startIndex + GUID_BATCH_SIZE_PER_AGE_OUT_TASK < guidsCount ? startIndex + GUID_BATCH_SIZE_PER_AGE_OUT_TASK : guidsCount;
List<String> guidsBatch = entityGuidsEligibleForAuditAgeout.subList(startIndex, endIndex);
for (String guid : guidsBatch) {
List<EntityAuditEventV2> deletedAuditEvents = auditRepository.deleteEventsV2(guid, entityAuditActions, auditCount, ttl, createEventsAgeoutAllowed, auditAgingType);
entitiesWithSucceededAgeout.put(guid, deletedAuditEvents);
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} Audit aging completed for batch-{} with guids: {}", auditAgingType.toString(), batchIndex, Arrays.toString(entitiesWithSucceededAgeout.keySet().toArray()));
}
entitiesWithSucceededAgeout.clear();
startIndex = endIndex;
batchIndex++;
List<String> remainingGuids = startIndex < guidsCount ? new ArrayList<>(entityGuidsEligibleForAuditAgeout.subList(startIndex, guidsCount)) : null;
vertex.setProperty(vertexPropertyKeyForGuids, remainingGuids);
}
}
public AtlasVertex findVertex() {
AtlasGraphQuery query = graph.query().has(PROPERTY_KEY_AUDIT_REDUCTION_NAME, AUDIT_REDUCTION_TYPE_NAME);
Iterator<AtlasVertex> results = query.vertices().iterator();
return results.hasNext() ? results.next() : null;
}
}

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.tasks;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.discovery.AtlasDiscoveryService;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.tasks.AbstractTask;
import org.apache.atlas.tasks.TaskFactory;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.*;
import static org.apache.atlas.repository.Constants.PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_DEFAULT;
import static org.apache.atlas.repository.Constants.PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_CUSTOM;
import static org.apache.atlas.repository.Constants.PROPERTY_KEY_GUIDS_TO_SWEEPOUT;
@Component
public class AuditReductionTaskFactory implements TaskFactory {
private static final Logger LOG = LoggerFactory.getLogger(AuditReductionTaskFactory.class);
private static Configuration configuration;
public static final int MAX_PENDING_TASKS_ALLOWED;
private static final int MAX_PENDING_TASKS_ALLOWED_DEFAULT = 50;
private static final String MAX_PENDING_TASKS_ALLOWED_KEY = "atlas.audit.reduction.max.pending.tasks";
public static final String ATLAS_AUDIT_REDUCTION = "ATLAS_AUDIT_REDUCTION";
public static final String ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL = "AUDIT_REDUCTION_ENTITY_RETRIEVAL";
static {
try {
configuration = ApplicationProperties.get();
} catch (Exception e) {
LOG.info("Failed to load application properties", e);
}
if (configuration != null) {
MAX_PENDING_TASKS_ALLOWED = configuration.getInt(MAX_PENDING_TASKS_ALLOWED_KEY, MAX_PENDING_TASKS_ALLOWED_DEFAULT);
} else {
MAX_PENDING_TASKS_ALLOWED = MAX_PENDING_TASKS_ALLOWED_DEFAULT;
}
}
private static final List<String> supportedTypes = new ArrayList<String>() {{
add(ATLAS_AUDIT_REDUCTION);
add(ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL);
}};
public static final Map<AtlasAuditAgingType, String> AGING_TYPE_PROPERTY_KEY_MAP = new HashMap<AtlasAuditAgingType, String>() {
{
put(AtlasAuditAgingType.DEFAULT, PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_DEFAULT);
put(AtlasAuditAgingType.SWEEP, PROPERTY_KEY_GUIDS_TO_SWEEPOUT);
put(AtlasAuditAgingType.CUSTOM, PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_CUSTOM);
}
};
private final EntityAuditRepository auditRepository;
private final AtlasGraph graph;
private final AtlasDiscoveryService discoveryService;
private final AtlasTypeRegistry typeRegistry;
@Inject
public AuditReductionTaskFactory(EntityAuditRepository auditRepository, AtlasGraph graph, AtlasDiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) {
this.auditRepository = auditRepository;
this.graph = graph;
this.discoveryService = discoveryService;
this.typeRegistry = typeRegistry;
}
@Override
public AbstractTask create(AtlasTask task) {
String taskType = task.getType();
String taskGuid = task.getGuid();
switch (taskType) {
case ATLAS_AUDIT_REDUCTION:
return new AuditReductionTask(task, auditRepository, graph);
case ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL:
return new AuditReductionEntityRetrievalTask(task, graph, discoveryService, typeRegistry);
default:
LOG.warn("Type: {} - {} not found!. The task will be ignored.", taskType, taskGuid);
return null;
}
}
@Override
public List<String> getSupportedTypes() {
return this.supportedTypes;
}
}

View File

@ -22,15 +22,20 @@ import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.commons.lang.time.DateUtils;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
public class AuditRepositoryTestBase {
protected EntityAuditRepository eventRepository;
@ -203,4 +208,62 @@ public class AuditRepositoryTestBase {
assertEquals(actual.getDetails(), expected.getDetails());
}
@Test
public void testDeleteEventsV2() throws Exception {
String id1 = "id1" + rand();
int ttlInDays = 1;
long ts = System.currentTimeMillis() - (ttlInDays * DateUtils.MILLIS_PER_DAY);
AtlasEntity entity = new AtlasEntity(rand());
int j = 0;
List<EntityAuditEventV2> expectedEvents = new ArrayList<>();
expectedEvents.add(new EntityAuditEventV2(id1, ts + j++, "user-a", EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE, "details" + j, entity));
expectedEvents.add(new EntityAuditEventV2(id1, ts + j++, "user-C", EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE, "details" + j, entity));
for (int i = 0; i < 5; i++) {
expectedEvents.add(new EntityAuditEventV2(id1, ts + j++, "user" + i, EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE, "details" + j, entity));
expectedEvents.add(new EntityAuditEventV2(id1, ts + j++, "user" + i, EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_ADD, "details" + j, entity));
}
expectedEvents.add(new EntityAuditEventV2(id1, ts+ j++, "User-b", EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE, "details" + j, entity));
for(EntityAuditEventV2 event : expectedEvents) {
eventRepository.putEventsV2(event);
}
List<EntityAuditEventV2> events = eventRepository.listEventsV2(id1, null, "timestamp", false, 0, (short) -1);
assertEquals(events.size(), 13);
assertEventV2Equals(events.get(0), expectedEvents.get(0));
assertEventV2Equals(events.get(1), expectedEvents.get(1));
short expectedUpdateEventsCount = 2;
List<EntityAuditEventV2> deletedUpdateEvents = eventRepository.deleteEventsV2(id1, new HashSet<>(Arrays.asList(EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE)), expectedUpdateEventsCount , 0, false, AtlasAuditAgingType.CUSTOM);
List<EntityAuditEventV2> remainingEvents = events.stream().filter(x -> !deletedUpdateEvents.contains(x)).collect(Collectors.toList());
List<EntityAuditEventV2> remainingUpdateEvents = remainingEvents.stream().filter(x -> x.getAction() == EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE).collect(Collectors.toList());
assertEquals(remainingUpdateEvents.size(), expectedUpdateEventsCount);
short expectedEventsCount = 4;
List<EntityAuditEventV2> deletedEvents = eventRepository.deleteEventsV2(id1, null, expectedEventsCount , 0, false, AtlasAuditAgingType.DEFAULT);
remainingEvents = events.stream().filter(x -> !deletedEvents.contains(x)).collect(Collectors.toList());
assertEquals(remainingEvents.size(), expectedEventsCount + 1);
assertEventV2Equals(remainingEvents.get(0), events.get(0));
assertTrue(remainingEvents.contains(events.get(0)));
List<EntityAuditEventV2> deletedEventsIncludingCreate = eventRepository.deleteEventsV2(id1, null, expectedEventsCount , 0, true, AtlasAuditAgingType.DEFAULT);
remainingEvents = events.stream().filter(x -> !deletedEventsIncludingCreate.contains(x)).collect(Collectors.toList());
assertEquals(remainingEvents.size(), expectedEventsCount);
assertNotEquals(remainingEvents.get(3), events.get(0));
assertFalse(remainingEvents.contains(events.get(0)));
EntityAuditEventV2 latestEvent = new EntityAuditEventV2(id1, System.currentTimeMillis(), "User-b", EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE, "details" + j++, entity);
eventRepository.putEventsV2(latestEvent);
List<EntityAuditEventV2> allEvents = eventRepository.listEventsV2(id1, null, "timestamp", false, 0, (short) -1);
List<EntityAuditEventV2> deletedEventsByTTL = eventRepository.deleteEventsV2(id1, null, expectedUpdateEventsCount , ttlInDays, true, AtlasAuditAgingType.DEFAULT);
assertEquals(deletedEventsByTTL.size(), allEvents.size() - 1);
List<EntityAuditEventV2> remainingEventsByTTL = allEvents.stream().filter(x -> !deletedEventsByTTL.contains(x)).collect(Collectors.toList());
assertEquals(remainingEventsByTTL.size(), 1);
assertEquals(latestEvent, remainingEventsByTTL.get(0));
}
}

View File

@ -445,7 +445,7 @@
-->
<lst name="defaults">
<str name="defType">edismax</str>
<str name="qf">35x_t 5j9_t 7wl_t a9x_t but_t dfp_l f0l_t i6d_l iyt_l jr9_t kjp_s lc5_t m4l_s mx1_t ohx_t xz9_i 1151_t 12px_t 14at_l 15vp_t 1891_t 19tx_t 1bet_t 1czp_t 1ekl_t 1gxx_t 1iit_l 1k3p_t 1lol_t 1o1x_t 1qf9_t 1ssl_t 1udh_t 1wqt_t 4eth_t 4rgl_s 4pvp_s 4nid_s 4lxh_s 4p39_t 4wzp_t 4t1h_t 4umd_l 4vet_t 51qd_t 505h_t 53b9_t 5af9_t 5fyd_t 5kp1_t 5ibp_t 5j45_t 5hj9_t 5r0l_t 5q85_t 5y4l_l 5zph_l 5rt1_t 5xc5_t 68ed_l 69z9_l 66th_t 658l_t 6bk5_t 61ad_t 622t_t 63np_t 6ltx_t 6mmd_l 6io5_t 6fid_l 6h39_l 6ccl_t 6k91_t 9bt1_t 93wl_t 9ddx_t 99fp_t 9clh_t 9b0l_t 9mv9_t 9eyt_t 9p8l_t 9khx_t 9m2t_t 9og5_t 9ypx_t 9qth_t 9wcl_t 9xxh_t ebyd_t e4ud_t e9l1_t eb5x_t e805_t e39h_l e6f9_t e8sl_t e41x_l edj9_l ei9x_t ej2d_t ecqt_l eebp_t ef45_i ehhh_t em85_t elfp_t en0l_t eolh_i esjp_t eu4l_t ey2t_t eyv9_i ewhx_t f0g5_t eznp_t f56t_t f7k5_t f2th_t f5z9_t f4ed_t f18l_l f6rp_t f211_l gwed_t h891_l h9tx_i h2px_t hamd_i hbet_i hc79_f hiit_d hhqd_l iex1_t ikg5_l inlx_t iqrp_l iscl_t ip6t_t itxh_l ivid_t</str>
<str name="qf">35x_t 5j9_t 7wl_t a9x_t but_t dfp_l f0l_t i6d_l iyt_l jr9_t kjp_s lc5_t m4l_s mx1_t ohx_t xz9_i 1151_t 12px_t 14at_l 15vp_t 1891_t 19tx_t 1bet_t 1czp_t 1ekl_t 1gxx_t 1iit_l 1k3p_t 1lol_t 1o1x_t 1qf9_t 1ssl_t 1v5x_t 1wqt_t 1z45_t 4h6t_t 4ttx_s 4s91_s 4pvp_s 4oat_s 4rgl_t 4zd1_t 4vet_t 4wzp_l 4xs5_t 543p_t 52it_t 55ol_t 5csl_t 5ibp_t 5n2d_t 5kp1_t 5lhh_t 5jwl_t 5tdx_t 5slh_t 60hx_l 622t_l 5u6d_t 5zph_t 6arp_l 6ccl_l 696t_t 67lx_t 6dxh_t 63np_t 64g5_t 6611_t 6o79_t 6ozp_l 6l1h_t 6hvp_l 6jgl_l 6epx_t 6mmd_t 9e6d_t 969x_t 9fr9_t 9bt1_t 9eyt_t 9ddx_t 9p8l_t 9hc5_t 9rlx_t 9mv9_t 9og5_t 9qth_t a139_t 9t6t_t 9ypx_t a0at_t eebp_t e77p_t ebyd_t edj9_t eadh_t e5mt_l e8sl_t eb5x_t e6f9_l efwl_l ekn9_t elfp_t ef45_l egp1_t ehhh_i ejut_t eolh_t ent1_t epdx_t eqyt_i eux1_t ewhx_t f0g5_t f18l_i eyv9_t f2th_t f211_t f7k5_t f9xh_t f56t_t f8cl_t f6rp_t f3lx_l f951_t f4ed_l gyrp_t hamd_l hc79_i h539_t hczp_i hds5_i hekl_f hkw5_d hk3p_l</str>
<str name="hl.fl">*</str>
<bool name="hl.requireFieldMatch">true</bool>
<bool name="lowercaseOperators">true</bool>

View File

@ -111,7 +111,7 @@ public class ActiveServerFilter implements Filter {
final String adminUriNotFiltered[] = { "/admin/export", "/admin/import", "/admin/importfile", "/admin/audits",
"/admin/purge", "/admin/expimp/audit", "/admin/metrics", "/admin/server", "/admin/audit/", "admin/tasks",
"/admin/debug/metrics"};
"/admin/debug/metrics", "/admin/audits/ageout"};
private boolean isFilteredURI(ServletRequest servletRequest) {
HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
String requestURI = httpServletRequest.getRequestURI();

View File

@ -31,6 +31,7 @@ import org.apache.atlas.discovery.SearchContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.AtlasAuditEntry;
import org.apache.atlas.model.audit.AtlasAuditEntry.AuditOperation;
import org.apache.atlas.model.audit.AuditReductionCriteria;
import org.apache.atlas.model.audit.AuditSearchParameters;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2;
@ -52,6 +53,7 @@ import org.apache.atlas.model.metrics.AtlasMetricsStat;
import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.audit.AtlasAuditService;
import org.apache.atlas.repository.audit.AtlasAuditReductionService;
import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.impexp.AtlasServerService;
import org.apache.atlas.repository.impexp.ExportImportAuditService;
@ -187,6 +189,8 @@ public class AdminResource {
private final boolean isOnDemandLineageEnabled;
private final int defaultLineageNodeCount;
private AtlasAuditReductionService auditReductionService;
static {
try {
atlasProperties = ApplicationProperties.get();
@ -202,7 +206,7 @@ public class AdminResource {
AtlasServerService serverService,
ExportImportAuditService exportImportAuditService, AtlasEntityStore entityStore,
AtlasPatchManager patchManager, AtlasAuditService auditService, EntityAuditRepository auditRepository,
TaskManagement taskManagement, AtlasDebugMetricsSink debugMetricsRESTSink) {
TaskManagement taskManagement, AtlasDebugMetricsSink debugMetricsRESTSink, AtlasAuditReductionService atlasAuditReductionService) {
this.serviceState = serviceState;
this.metricsService = metricsService;
this.exportService = exportService;
@ -219,6 +223,7 @@ public class AdminResource {
this.auditRepository = auditRepository;
this.taskManagement = taskManagement;
this.debugMetricsRESTSink = debugMetricsRESTSink;
this.auditReductionService = atlasAuditReductionService;
if (atlasProperties != null) {
this.defaultUIVersion = atlasProperties.getString(DEFAULT_UI_VERSION, UI_VERSION_V2);
@ -820,6 +825,45 @@ public class AdminResource {
}
}
@POST
@Path("/audits/ageout")
@Consumes(Servlets.JSON_MEDIA_TYPE)
@Produces(Servlets.JSON_MEDIA_TYPE)
public List<AtlasTask> ageoutAuditData(AuditReductionCriteria auditReductionCriteria, @QueryParam("useAuditConfig") @DefaultValue("false") Boolean useAuditConfig) throws AtlasBaseException {
AtlasPerfTracer perf = null;
try {
AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_AUDITS), "Admin Audits Ageout");
if (useAuditConfig) {
return auditReductionService.startAuditAgingByConfig();
}
if (!auditReductionCriteria.isAuditAgingEnabled()) {
LOG.warn("Audit aging should be enabled");
return null;
}
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "AdminResource.ageoutAuditData(" + auditReductionCriteria + ")");
}
updateCriteriaWithDefaultValues(auditReductionCriteria);
List<Map<String, Object>> ageoutTypeCriteriaMap = auditReductionService.buildAgeoutCriteriaForAllAgingTypes(auditReductionCriteria);
return auditReductionService.startAuditAgingByCriteria(ageoutTypeCriteriaMap);
} finally {
AtlasPerfTracer.log(perf);
}
}
private void updateCriteriaWithDefaultValues(AuditReductionCriteria auditReductionCriteria) {
if (auditReductionCriteria.getDefaultAgeoutTTLInDays() <= 0) {
auditReductionCriteria.setDefaultAgeoutTTLInDays(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_TTL.getInt());
}
}
@POST
@Path("/audits")
@Consumes(Servlets.JSON_MEDIA_TYPE)

View File

@ -51,7 +51,7 @@ public class AdminResourceTest {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
Response response = adminResource.getStatus();
assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
JsonNode entity = AtlasJson.parseToV1JsonNode((String) response.getEntity());
@ -62,7 +62,7 @@ public class AdminResourceTest {
public void testResourceGetsValueFromServiceState() throws IOException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
Response response = adminResource.getStatus();
verify(serviceState).getState();