ATLAS-4254 : Basic Search : Optimize pagination
Signed-off-by: Pinal <pinal-shah>
This commit is contained in:
parent
f3afbdb395
commit
35419d6c5a
|
|
@ -53,6 +53,7 @@ public class AtlasSearchResult implements Serializable {
|
|||
private List<AtlasFullTextResult> fullTextResult;
|
||||
private Map<String, AtlasEntityHeader> referredEntities;
|
||||
private long approximateCount = -1;
|
||||
private String nextMarker;
|
||||
|
||||
public AtlasSearchResult() {}
|
||||
|
||||
|
|
@ -131,8 +132,12 @@ public class AtlasSearchResult implements Serializable {
|
|||
|
||||
public void setApproximateCount(long approximateCount) { this.approximateCount = approximateCount; }
|
||||
|
||||
public String getNextMarker() { return nextMarker; }
|
||||
|
||||
public void setNextMarker(String nextMarker) { this.nextMarker = nextMarker; }
|
||||
|
||||
@Override
|
||||
public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult, referredEntities); }
|
||||
public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult, referredEntities, nextMarker); }
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
|
|
@ -147,7 +152,8 @@ public class AtlasSearchResult implements Serializable {
|
|||
Objects.equals(entities, that.entities) &&
|
||||
Objects.equals(attributes, that.attributes) &&
|
||||
Objects.equals(fullTextResult, that.fullTextResult) &&
|
||||
Objects.equals(referredEntities, that.referredEntities);
|
||||
Objects.equals(referredEntities, that.referredEntities) &&
|
||||
Objects.equals(nextMarker, that.nextMarker);
|
||||
}
|
||||
|
||||
public void addEntity(AtlasEntityHeader newEntity) {
|
||||
|
|
@ -190,6 +196,7 @@ public class AtlasSearchResult implements Serializable {
|
|||
", fullTextResult=" + fullTextResult +
|
||||
", referredEntities=" + referredEntities +
|
||||
", approximateCount=" + approximateCount +
|
||||
", nextMarker=" + nextMarker +
|
||||
'}';
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -52,6 +52,7 @@ public class SearchParameters implements Serializable {
|
|||
private boolean includeSubClassifications = true;
|
||||
private int limit;
|
||||
private int offset;
|
||||
private String marker;
|
||||
|
||||
private FilterCriteria entityFilters;
|
||||
private FilterCriteria tagFilters;
|
||||
|
|
@ -215,6 +216,16 @@ public class SearchParameters implements Serializable {
|
|||
this.offset = offset;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return marker (offset of the next page)
|
||||
*/
|
||||
public String getMarker() { return marker; }
|
||||
|
||||
/**
|
||||
* @param marker
|
||||
*/
|
||||
public void setMarker(String marker) { this.marker = marker; }
|
||||
|
||||
/**
|
||||
* Entity attribute filters for the type (if type name is specified)
|
||||
* @return
|
||||
|
|
@ -294,6 +305,8 @@ public class SearchParameters implements Serializable {
|
|||
SearchParameters that = (SearchParameters) o;
|
||||
return excludeDeletedEntities == that.excludeDeletedEntities &&
|
||||
includeClassificationAttributes == that.includeClassificationAttributes &&
|
||||
includeSubTypes == that.includeSubTypes &&
|
||||
includeSubClassifications == that.includeSubClassifications &&
|
||||
limit == that.limit &&
|
||||
offset == that.offset &&
|
||||
Objects.equals(query, that.query) &&
|
||||
|
|
@ -309,8 +322,9 @@ public class SearchParameters implements Serializable {
|
|||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(query, typeName, classification, termName, excludeDeletedEntities, includeClassificationAttributes,
|
||||
limit, offset, entityFilters, tagFilters, attributes, sortBy, sortOrder);
|
||||
return Objects.hash(query, typeName, classification, termName, includeSubTypes, includeSubClassifications,
|
||||
excludeDeletedEntities, includeClassificationAttributes, limit, offset, entityFilters,
|
||||
tagFilters, attributes, sortBy, sortOrder);
|
||||
}
|
||||
|
||||
public StringBuilder toString(StringBuilder sb) {
|
||||
|
|
@ -323,6 +337,8 @@ public class SearchParameters implements Serializable {
|
|||
sb.append(", typeName='").append(typeName).append('\'');
|
||||
sb.append(", classification='").append(classification).append('\'');
|
||||
sb.append(", termName='").append(termName).append('\'');
|
||||
sb.append(", includeSubTypes='").append(includeSubTypes).append('\'');
|
||||
sb.append(", includeSubClassifications='").append(includeSubClassifications).append('\'');
|
||||
sb.append(", excludeDeletedEntities=").append(excludeDeletedEntities);
|
||||
sb.append(", includeClassificationAttributes=").append(includeClassificationAttributes);
|
||||
sb.append(", limit=").append(limit);
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import org.apache.atlas.type.AtlasClassificationType;
|
|||
import org.apache.atlas.util.SearchPredicateUtil;
|
||||
import org.apache.atlas.utils.AtlasPerfTracer;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.collections.MapUtils;
|
||||
import org.apache.commons.collections.Predicate;
|
||||
import org.apache.commons.collections.PredicateUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
|
@ -212,26 +213,32 @@ public class ClassificationSearchProcessor extends SearchProcessor {
|
|||
}
|
||||
|
||||
try {
|
||||
final int startIdx = context.getSearchParameters().getOffset();
|
||||
final int limit = context.getSearchParameters().getLimit();
|
||||
Integer marker = context.getMarker();
|
||||
|
||||
// query to start at 0, even though startIdx can be higher - because few results in earlier retrieval could
|
||||
// have been dropped: like non-active-entities or duplicate-entities (same entity pointed to by multiple
|
||||
// classifications in the result)
|
||||
//
|
||||
// first 'startIdx' number of entries will be ignored
|
||||
int qryOffset = 0;
|
||||
//marker functionality will not work when there is need to fetch classificationVertices and get entities from it
|
||||
if (indexQuery == null) {
|
||||
marker = null;
|
||||
}
|
||||
// if marker is provided, start query with marker offset
|
||||
int startIdx = marker != null ? marker : context.getSearchParameters().getOffset();
|
||||
int qryOffset = marker != null ? marker : 0;
|
||||
int resultIdx = qryOffset;
|
||||
|
||||
final Set<String> processedGuids = new HashSet<>();
|
||||
final List<AtlasVertex> entityVertices = new ArrayList<>();
|
||||
final List<AtlasVertex> classificationVertices = new ArrayList<>();
|
||||
final Set<String> processedGuids = new HashSet<>();
|
||||
LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>();
|
||||
final List<AtlasVertex> classificationVertices = new ArrayList<>();
|
||||
|
||||
final String sortBy = context.getSearchParameters().getSortBy();
|
||||
final SortOrder sortOrder = context.getSearchParameters().getSortOrder();
|
||||
|
||||
for (; ret.size() < limit; qryOffset += limit) {
|
||||
entityVertices.clear();
|
||||
offsetEntityVertexMap.clear();
|
||||
classificationVertices.clear();
|
||||
|
||||
if (context.terminateSearch()) {
|
||||
|
|
@ -251,12 +258,12 @@ public class ClassificationSearchProcessor extends SearchProcessor {
|
|||
queryResult = indexQuery.vertices(qryOffset, limit);
|
||||
}
|
||||
|
||||
getVerticesFromIndexQueryResult(queryResult, entityVertices);
|
||||
isLastResultPage = entityVertices.size() < limit;
|
||||
offsetEntityVertexMap = getVerticesFromIndexQueryResult(queryResult, offsetEntityVertexMap, qryOffset);
|
||||
isLastResultPage = offsetEntityVertexMap.size() < limit;
|
||||
|
||||
// Do in-memory filtering
|
||||
CollectionUtils.filter(entityVertices, traitPredicate);
|
||||
CollectionUtils.filter(entityVertices, isEntityPredicate);
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap, traitPredicate);
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap, isEntityPredicate);
|
||||
|
||||
} else {
|
||||
if (classificationIndexQuery != null) {
|
||||
|
|
@ -283,11 +290,14 @@ public class ClassificationSearchProcessor extends SearchProcessor {
|
|||
// Since tag filters are present, we need to collect the entity vertices after filtering the classification
|
||||
// vertex results (as these might be lower in number)
|
||||
if (CollectionUtils.isNotEmpty(classificationVertices)) {
|
||||
int resultCount = 0;
|
||||
|
||||
for (AtlasVertex classificationVertex : classificationVertices) {
|
||||
Iterable<AtlasEdge> edges = classificationVertex.getEdges(AtlasEdgeDirection.IN, Constants.CLASSIFICATION_LABEL);
|
||||
|
||||
for (AtlasEdge edge : edges) {
|
||||
AtlasVertex entityVertex = edge.getOutVertex();
|
||||
resultCount++;
|
||||
|
||||
String guid = AtlasGraphUtilsV2.getIdFromVertex(entityVertex);
|
||||
|
||||
|
|
@ -295,7 +305,7 @@ public class ClassificationSearchProcessor extends SearchProcessor {
|
|||
continue;
|
||||
}
|
||||
|
||||
entityVertices.add(entityVertex);
|
||||
offsetEntityVertexMap.put((qryOffset + resultCount) - 1, entityVertex);
|
||||
|
||||
processedGuids.add(guid);
|
||||
}
|
||||
|
|
@ -303,22 +313,28 @@ public class ClassificationSearchProcessor extends SearchProcessor {
|
|||
}
|
||||
|
||||
if (whiteSpaceFilter) {
|
||||
filterWhiteSpaceClassification(entityVertices);
|
||||
offsetEntityVertexMap = filterWhiteSpaceClassification(offsetEntityVertexMap);
|
||||
}
|
||||
// Do in-memory filtering
|
||||
CollectionUtils.filter(entityVertices, isEntityPredicate);
|
||||
// Do in-memory filtering
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap, isEntityPredicate);
|
||||
if (activePredicate != null) {
|
||||
CollectionUtils.filter(entityVertices, activePredicate);
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap, activePredicate);
|
||||
}
|
||||
|
||||
super.filter(entityVertices);
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
|
||||
|
||||
resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
|
||||
resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, offsetEntityVertexMap, marker);
|
||||
|
||||
if (isLastResultPage) {
|
||||
resultIdx = SearchContext.MarkerUtil.MARKER_END - 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (marker != null) {
|
||||
nextOffset = resultIdx + 1;
|
||||
}
|
||||
|
||||
} finally {
|
||||
AtlasPerfTracer.log(perf);
|
||||
}
|
||||
|
|
@ -331,20 +347,23 @@ public class ClassificationSearchProcessor extends SearchProcessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void filter(List<AtlasVertex> entityVertices) {
|
||||
public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("==> ClassificationSearchProcessor.filter({})", entityVertices.size());
|
||||
LOG.debug("==> ClassificationSearchProcessor.filter({})", offsetEntityVertexMap.size());
|
||||
}
|
||||
|
||||
if (inMemoryPredicate != null) {
|
||||
//in case of classification type + index attributes
|
||||
CollectionUtils.filter(entityVertices, traitPredicate);
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap, traitPredicate);
|
||||
|
||||
//filter attributes (filterCriteria). Find classification vertex(typeName = classification) from entity vertex (traitName = classification)
|
||||
final Set<String> processedGuids = new HashSet<>();
|
||||
List<AtlasVertex> matchEntityVertices = new ArrayList<>();
|
||||
if (CollectionUtils.isNotEmpty(entityVertices)) {
|
||||
for (AtlasVertex entityVertex : entityVertices) {
|
||||
LinkedHashMap<Integer, AtlasVertex> matchEntityVertices = new LinkedHashMap<>();
|
||||
|
||||
if (MapUtils.isNotEmpty(offsetEntityVertexMap)) {
|
||||
for (Map.Entry<Integer, AtlasVertex> offsetToEntity : offsetEntityVertexMap.entrySet()) {
|
||||
|
||||
AtlasVertex entityVertex = offsetToEntity.getValue();
|
||||
Iterable<AtlasEdge> edges = entityVertex.getEdges(AtlasEdgeDirection.OUT, Constants.CLASSIFICATION_LABEL);
|
||||
|
||||
for (AtlasEdge edge : edges) {
|
||||
|
|
@ -358,7 +377,7 @@ public class ClassificationSearchProcessor extends SearchProcessor {
|
|||
continue;
|
||||
}
|
||||
|
||||
matchEntityVertices.add(entityVertex);
|
||||
matchEntityVertices.put(offsetToEntity.getKey(), entityVertex);
|
||||
processedGuids.add(guid);
|
||||
break;
|
||||
|
||||
|
|
@ -366,20 +385,22 @@ public class ClassificationSearchProcessor extends SearchProcessor {
|
|||
}
|
||||
}
|
||||
}
|
||||
entityVertices.clear();
|
||||
entityVertices.addAll(matchEntityVertices);
|
||||
offsetEntityVertexMap.clear();
|
||||
offsetEntityVertexMap.putAll(matchEntityVertices);
|
||||
|
||||
} else {
|
||||
//in case of only classsification type
|
||||
CollectionUtils.filter(entityVertices, traitPredicate);
|
||||
CollectionUtils.filter(entityVertices, isEntityPredicate);
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap, traitPredicate);
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap, isEntityPredicate);
|
||||
}
|
||||
|
||||
super.filter(entityVertices);
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("<== ClassificationSearchProcessor.filter(): ret.size()={}", entityVertices.size());
|
||||
LOG.debug("<== ClassificationSearchProcessor.filter(): ret.size()={}", offsetEntityVertexMap.size());
|
||||
}
|
||||
|
||||
return offsetEntityVertexMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -463,6 +463,11 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
|
|||
|
||||
ret.setApproximateCount(searchContext.getSearchProcessor().getResultCount());
|
||||
|
||||
String nextMarker = searchContext.getSearchProcessor().getNextMarker();
|
||||
if (StringUtils.isNotEmpty(nextMarker)) {
|
||||
ret.setNextMarker(nextMarker);
|
||||
}
|
||||
|
||||
// By default any attribute that shows up in the search parameter should be sent back in the response
|
||||
// If additional values are requested then the entityAttributes will be a superset of the all search attributes
|
||||
// and the explicitly requested attribute(s)
|
||||
|
|
|
|||
|
|
@ -240,32 +240,26 @@ public class EntitySearchProcessor extends SearchProcessor {
|
|||
}
|
||||
|
||||
try {
|
||||
final int startIdx = context.getSearchParameters().getOffset();
|
||||
final int limit = context.getSearchParameters().getLimit();
|
||||
final int limit = context.getSearchParameters().getLimit();
|
||||
final Integer marker = context.getMarker();
|
||||
final int startIdx = marker != null ? marker : context.getSearchParameters().getOffset();
|
||||
|
||||
// when subsequent filtering stages are involved, query should start at 0 even though startIdx can be higher
|
||||
//
|
||||
// first 'startIdx' number of entries will be ignored
|
||||
int qryOffset = (nextProcessor != null || (graphQuery != null && indexQuery != null)) ? 0 : startIdx;
|
||||
// if marker is provided, start query with marker offset
|
||||
int qryOffset;
|
||||
if (marker != null) {
|
||||
qryOffset = marker;
|
||||
} else {
|
||||
qryOffset = (nextProcessor != null || (graphQuery != null && indexQuery != null)) ? 0 : startIdx;
|
||||
}
|
||||
int resultIdx = qryOffset;
|
||||
|
||||
final List<AtlasVertex> entityVertices = new ArrayList<>();
|
||||
|
||||
SortOrder sortOrder = context.getSearchParameters().getSortOrder();
|
||||
String sortBy = context.getSearchParameters().getSortBy();
|
||||
|
||||
final AtlasEntityType entityType = context.getEntityTypes().iterator().next();
|
||||
AtlasAttribute sortByAttribute = entityType.getAttribute(sortBy);
|
||||
if (sortByAttribute == null) {
|
||||
sortBy = null;
|
||||
} else {
|
||||
sortBy = sortByAttribute.getVertexPropertyName();
|
||||
}
|
||||
|
||||
if (sortOrder == null) { sortOrder = ASCENDING; }
|
||||
LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>();
|
||||
|
||||
for (; ret.size() < limit; qryOffset += limit) {
|
||||
entityVertices.clear();
|
||||
offsetEntityVertexMap.clear();
|
||||
|
||||
if (context.terminateSearch()) {
|
||||
LOG.warn("query terminated: {}", context.getSearchParameters());
|
||||
|
|
@ -277,41 +271,36 @@ public class EntitySearchProcessor extends SearchProcessor {
|
|||
|
||||
if (indexQuery != null) {
|
||||
Iterator<AtlasIndexQuery.Result> idxQueryResult = executeIndexQuery(context, indexQuery, qryOffset, limit);
|
||||
offsetEntityVertexMap = getVerticesFromIndexQueryResult(idxQueryResult, offsetEntityVertexMap, qryOffset);
|
||||
|
||||
getVerticesFromIndexQueryResult(idxQueryResult, entityVertices);
|
||||
|
||||
isLastResultPage = entityVertices.size() < limit;
|
||||
|
||||
// Do in-memory filtering before the graph query
|
||||
CollectionUtils.filter(entityVertices, inMemoryPredicate);
|
||||
|
||||
if (graphQueryPredicate != null) {
|
||||
CollectionUtils.filter(entityVertices, graphQueryPredicate);
|
||||
}
|
||||
} else {
|
||||
Iterator<AtlasVertex> queryResult = graphQuery.vertices(qryOffset, limit).iterator();
|
||||
|
||||
getVertices(queryResult, entityVertices);
|
||||
|
||||
isLastResultPage = entityVertices.size() < limit;
|
||||
|
||||
// Do in-memory filtering
|
||||
CollectionUtils.filter(entityVertices, inMemoryPredicate);
|
||||
|
||||
//incase when operator is NEQ in pipeSeperatedSystemAttributes
|
||||
if (graphQueryPredicate != null) {
|
||||
CollectionUtils.filter(entityVertices, graphQueryPredicate);
|
||||
}
|
||||
offsetEntityVertexMap = getVertices(queryResult, offsetEntityVertexMap, qryOffset);
|
||||
}
|
||||
|
||||
super.filter(entityVertices);
|
||||
isLastResultPage = offsetEntityVertexMap.size() < limit;
|
||||
// Do in-memory filtering
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap, inMemoryPredicate);
|
||||
|
||||
resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
|
||||
//incase when operator is NEQ in pipeSeperatedSystemAttributes
|
||||
if (graphQueryPredicate != null) {
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap, graphQueryPredicate);
|
||||
}
|
||||
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
|
||||
|
||||
resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, offsetEntityVertexMap, marker);
|
||||
|
||||
if (isLastResultPage) {
|
||||
resultIdx = MarkerUtil.MARKER_END - 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (marker != null) {
|
||||
nextOffset = resultIdx + 1;
|
||||
}
|
||||
|
||||
} finally {
|
||||
AtlasPerfTracer.log(perf);
|
||||
}
|
||||
|
|
@ -324,23 +313,25 @@ public class EntitySearchProcessor extends SearchProcessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void filter(List<AtlasVertex> entityVertices) {
|
||||
public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer,AtlasVertex> offsetEntityVertexMap) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("==> EntitySearchProcessor.filter({})", entityVertices.size());
|
||||
LOG.debug("==> EntitySearchProcessor.filter({})", offsetEntityVertexMap.size());
|
||||
}
|
||||
|
||||
// Since we already have the entity vertices, a in-memory filter will be faster than fetching the same
|
||||
// vertices again with the required filtering
|
||||
if (filterGraphQueryPredicate != null) {
|
||||
LOG.debug("Filtering in-memory");
|
||||
CollectionUtils.filter(entityVertices, filterGraphQueryPredicate);
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap, filterGraphQueryPredicate);
|
||||
}
|
||||
|
||||
super.filter(entityVertices);
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("<== EntitySearchProcessor.filter(): ret.size()={}", entityVertices.size());
|
||||
LOG.debug("<== EntitySearchProcessor.filter(): ret.size()={}", offsetEntityVertexMap.size());
|
||||
}
|
||||
|
||||
return offsetEntityVertexMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -96,20 +96,23 @@ public class FreeTextSearchProcessor extends SearchProcessor {
|
|||
}
|
||||
|
||||
try {
|
||||
final int startIdx = context.getSearchParameters().getOffset();
|
||||
final int limit = context.getSearchParameters().getLimit();
|
||||
final int limit = context.getSearchParameters().getLimit();
|
||||
final Integer marker = context.getMarker();
|
||||
final int startIdx = marker != null ? marker : context.getSearchParameters().getOffset();
|
||||
|
||||
// query to start at 0, even though startIdx can be higher - because few results in earlier retrieval could
|
||||
// have been dropped: like vertices of non-entity or non-active-entity
|
||||
//
|
||||
// first 'startIdx' number of entries will be ignored
|
||||
int qryOffset = 0;
|
||||
// if marker is provided, start query with marker offset
|
||||
int qryOffset = marker != null ? marker : 0;
|
||||
int resultIdx = qryOffset;
|
||||
|
||||
final List<AtlasVertex> entityVertices = new ArrayList<>();
|
||||
LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>();
|
||||
|
||||
try {
|
||||
for (; ret.size() < limit; qryOffset += limit) {
|
||||
entityVertices.clear();
|
||||
offsetEntityVertexMap.clear();
|
||||
|
||||
if (context.terminateSearch()) {
|
||||
LOG.warn("query terminated: {}", context.getSearchParameters());
|
||||
|
|
@ -150,22 +153,27 @@ public class FreeTextSearchProcessor extends SearchProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
entityVertices.add(vertex);
|
||||
offsetEntityVertexMap.put((qryOffset + resultCount) - 1, vertex);
|
||||
}
|
||||
|
||||
isLastResultPage = resultCount < limit;
|
||||
isLastResultPage = resultCount < limit;
|
||||
|
||||
super.filter(entityVertices);
|
||||
|
||||
resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
|
||||
resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, offsetEntityVertexMap, marker);
|
||||
|
||||
if (isLastResultPage) {
|
||||
resultIdx = SearchContext.MarkerUtil.MARKER_END - 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
throw t;
|
||||
}
|
||||
|
||||
if (marker != null) {
|
||||
nextOffset = resultIdx + 1;
|
||||
}
|
||||
|
||||
} finally {
|
||||
AtlasPerfTracer.log(perf);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.atlas.discovery.SearchContext.MATCH_ALL_NOT_CLASSIFIED;
|
||||
|
|
@ -96,21 +97,23 @@ public class FullTextSearchProcessor extends SearchProcessor {
|
|||
}
|
||||
|
||||
try {
|
||||
final int startIdx = context.getSearchParameters().getOffset();
|
||||
final int limit = context.getSearchParameters().getLimit();
|
||||
final boolean activeOnly = context.getSearchParameters().getExcludeDeletedEntities();
|
||||
final Integer marker = context.getMarker();
|
||||
final int startIdx = marker != null ? marker : context.getSearchParameters().getOffset();
|
||||
|
||||
// query to start at 0, even though startIdx can be higher - because few results in earlier retrieval could
|
||||
// have been dropped: like vertices of non-entity or non-active-entity
|
||||
//
|
||||
// first 'startIdx' number of entries will be ignored
|
||||
int qryOffset = 0;
|
||||
// if marker is provided, start query with marker offset
|
||||
int qryOffset = marker != null ? marker : 0;
|
||||
int resultIdx = qryOffset;
|
||||
|
||||
final List<AtlasVertex> entityVertices = new ArrayList<>();
|
||||
LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>();
|
||||
|
||||
for (; ret.size() < limit; qryOffset += limit) {
|
||||
entityVertices.clear();
|
||||
offsetEntityVertexMap.clear();
|
||||
|
||||
if (context.terminateSearch()) {
|
||||
LOG.warn("query terminated: {}", context.getSearchParameters());
|
||||
|
|
@ -141,19 +144,25 @@ public class FullTextSearchProcessor extends SearchProcessor {
|
|||
continue;
|
||||
}
|
||||
|
||||
entityVertices.add(vertex);
|
||||
offsetEntityVertexMap.put((qryOffset + resultCount) - 1, vertex);
|
||||
}
|
||||
|
||||
isLastResultPage = resultCount < limit;
|
||||
isLastResultPage = resultCount < limit;
|
||||
|
||||
super.filter(entityVertices);
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
|
||||
|
||||
resultIdx = collectResultVertices(ret, startIdx, limit, resultIdx, entityVertices);
|
||||
resultIdx = collectResultVertices(ret,startIdx, limit, resultIdx, offsetEntityVertexMap, marker);
|
||||
|
||||
if (isLastResultPage) {
|
||||
resultIdx = SearchContext.MarkerUtil.MARKER_END - 1 ;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (marker != null) {
|
||||
nextOffset = resultIdx + 1;
|
||||
}
|
||||
|
||||
} finally {
|
||||
AtlasPerfTracer.log(perf);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,10 +18,10 @@
|
|||
package org.apache.atlas.discovery;
|
||||
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.atlas.AtlasErrorCode;
|
||||
import org.apache.atlas.exception.AtlasBaseException;
|
||||
import org.apache.atlas.model.discovery.SearchParameters;
|
||||
import org.apache.atlas.model.discovery.SearchParameters.*;
|
||||
import org.apache.atlas.model.instance.AtlasEntity;
|
||||
import org.apache.atlas.model.typedef.AtlasClassificationDef;
|
||||
import org.apache.atlas.repository.Constants;
|
||||
|
|
@ -43,11 +43,25 @@ import org.apache.commons.lang3.StringUtils;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.atlas.discovery.SearchProcessor.ALL_TYPE_QUERY;
|
||||
import static org.apache.atlas.model.discovery.SearchParameters.*;
|
||||
import static org.apache.atlas.model.discovery.SearchParameters.ALL_CLASSIFICATIONS;
|
||||
import static org.apache.atlas.model.discovery.SearchParameters.ALL_CLASSIFICATION_TYPES;
|
||||
import static org.apache.atlas.model.discovery.SearchParameters.ALL_ENTITY_TYPES;
|
||||
import static org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
|
||||
import static org.apache.atlas.model.discovery.SearchParameters.NO_CLASSIFICATIONS;
|
||||
import static org.apache.atlas.model.discovery.SearchParameters.WILDCARD_CLASSIFICATIONS;
|
||||
|
||||
/*
|
||||
* Search context captures elements required for performing a basic search
|
||||
|
|
@ -71,6 +85,7 @@ public class SearchContext {
|
|||
private final String classificationTypeAndSubTypesQryStr;
|
||||
private boolean terminateSearch = false;
|
||||
private SearchProcessor searchProcessor;
|
||||
private Integer marker;
|
||||
|
||||
public final static AtlasClassificationType MATCH_ALL_WILDCARD_CLASSIFICATION = new AtlasClassificationType(new AtlasClassificationDef(WILDCARD_CLASSIFICATIONS));
|
||||
public final static AtlasClassificationType MATCH_ALL_CLASSIFIED = new AtlasClassificationType(new AtlasClassificationDef(ALL_CLASSIFICATIONS));
|
||||
|
|
@ -124,6 +139,10 @@ public class SearchContext {
|
|||
}
|
||||
}
|
||||
|
||||
if (StringUtils.isNotEmpty(searchParameters.getMarker())) {
|
||||
marker = MarkerUtil.decodeMarker(searchParameters);
|
||||
}
|
||||
|
||||
//remove other types if builtin type is present
|
||||
filterStructTypes();
|
||||
|
||||
|
|
@ -231,6 +250,8 @@ public class SearchContext {
|
|||
|
||||
public Set<String> getClassificationNames() {return classificationNames;}
|
||||
|
||||
public Integer getMarker() { return marker; }
|
||||
|
||||
public boolean includeEntityType(String entityType) {
|
||||
return typeAndSubTypes.isEmpty() || typeAndSubTypes.contains(entityType);
|
||||
}
|
||||
|
|
@ -238,9 +259,7 @@ public class SearchContext {
|
|||
public boolean includeClassificationTypes(Collection<String> traitNames) {
|
||||
final boolean ret;
|
||||
|
||||
if (CollectionUtils.isEmpty(classificationTypes) || classificationTypeAndSubTypes.isEmpty()) {
|
||||
ret = true;
|
||||
} else if (classificationTypes.iterator().next() == MATCH_ALL_NOT_CLASSIFIED) {
|
||||
if (classificationTypes.iterator().next() == MATCH_ALL_NOT_CLASSIFIED) {
|
||||
ret = CollectionUtils.isEmpty(traitNames);
|
||||
} else if (classificationTypes.iterator().next() == MATCH_ALL_CLASSIFICATION_TYPES) {
|
||||
ret = CollectionUtils.isNotEmpty(traitNames);
|
||||
|
|
@ -503,4 +522,64 @@ public class SearchContext {
|
|||
private AtlasEntityType getTermEntityType() {
|
||||
return typeRegistry.getEntityTypeByName(TermSearchProcessor.ATLAS_GLOSSARY_TERM_ENTITY_TYPE);
|
||||
}
|
||||
|
||||
public static class MarkerUtil {
|
||||
private final static int IDX_HASH_CODE = 0;
|
||||
private final static int IDX_OFFSET = 1;
|
||||
|
||||
private final static String MARKER_DELIMITER = ":";
|
||||
|
||||
@VisibleForTesting
|
||||
final static String MARKER_START = "*";
|
||||
|
||||
@VisibleForTesting
|
||||
final static int MARKER_END = -1;
|
||||
|
||||
public static String getNextEncMarker(SearchParameters searchParameters, Integer nextOffset) {
|
||||
if (nextOffset == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (nextOffset == MARKER_END) {
|
||||
return String.valueOf(nextOffset);
|
||||
}
|
||||
|
||||
String value = searchParameters.hashCode() + MARKER_DELIMITER + nextOffset;
|
||||
return Base64.getEncoder().encodeToString(value.getBytes());
|
||||
}
|
||||
|
||||
public static Integer decodeMarker(SearchParameters searchParameters) throws AtlasBaseException {
|
||||
if (searchParameters == null || searchParameters.getOffset() > 0) {
|
||||
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Marker can be used only if offset=0.");
|
||||
}
|
||||
|
||||
String encodedMarker = searchParameters.getMarker();
|
||||
if (StringUtils.equals(encodedMarker, MARKER_START)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
try {
|
||||
byte[] inputMarkerBytes = Base64.getDecoder().decode(encodedMarker);
|
||||
String inputMarker = new String(inputMarkerBytes);
|
||||
if (StringUtils.isEmpty(inputMarker) || !inputMarker.contains(MARKER_DELIMITER)) {
|
||||
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid marker found! Marker does not contain delimiter: " + MARKER_DELIMITER);
|
||||
}
|
||||
|
||||
String[] str = inputMarker.split(MARKER_DELIMITER);
|
||||
if (str == null || str.length != 2) {
|
||||
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid marker found! Decoding using delimiter did not yield correct result!");
|
||||
}
|
||||
|
||||
int hashCode = Integer.parseInt(str[IDX_HASH_CODE]);
|
||||
int currentHashCode = searchParameters.hashCode();
|
||||
if (hashCode == currentHashCode && Integer.parseInt(str[IDX_OFFSET]) >= 0) {
|
||||
return Integer.parseInt(str[IDX_OFFSET]);
|
||||
}
|
||||
|
||||
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid Marker! Parsing resulted in error.");
|
||||
} catch (Exception e) {
|
||||
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Invalid marker!");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ import org.apache.atlas.util.AtlasGremlinQueryProvider;
|
|||
import org.apache.atlas.util.SearchPredicateUtil;
|
||||
import org.apache.atlas.util.SearchPredicateUtil.*;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.collections.MapUtils;
|
||||
import org.apache.commons.collections.Predicate;
|
||||
import org.apache.commons.collections.PredicateUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
|
@ -52,6 +53,7 @@ import java.sql.Timestamp;
|
|||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.atlas.SortOrder.ASCENDING;
|
||||
import static org.apache.atlas.discovery.SearchContext.MATCH_ALL_CLASSIFICATION_TYPES;
|
||||
|
|
@ -137,6 +139,7 @@ public abstract class SearchProcessor {
|
|||
protected SearchProcessor nextProcessor;
|
||||
protected Predicate inMemoryPredicate;
|
||||
protected GraphIndexQueryBuilder graphIndexQueryBuilder;
|
||||
protected Integer nextOffset;
|
||||
|
||||
protected SearchProcessor(SearchContext context) {
|
||||
this.context = context;
|
||||
|
|
@ -151,6 +154,10 @@ public abstract class SearchProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
public String getNextMarker() {
|
||||
return SearchContext.MarkerUtil.getNextEncMarker(context.getSearchParameters(), nextOffset);
|
||||
}
|
||||
|
||||
public abstract List<AtlasVertex> execute();
|
||||
public abstract long getResultCount();
|
||||
|
||||
|
|
@ -181,28 +188,41 @@ public abstract class SearchProcessor {
|
|||
StringUtils.equals(attrName, CUSTOM_ATTRIBUTES_PROPERTY_KEY);
|
||||
}
|
||||
|
||||
protected int collectResultVertices(final List<AtlasVertex> ret, final int startIdx, final int limit, int resultIdx, final List<AtlasVertex> entityVertices) {
|
||||
for (AtlasVertex entityVertex : entityVertices) {
|
||||
resultIdx++;
|
||||
protected int collectResultVertices(final List<AtlasVertex> ret, final int startIdx, final int limit, int resultIdx, final Map<Integer, AtlasVertex> offsetEntityVertexMap, Integer marker) {
|
||||
int lastOffset = resultIdx;
|
||||
|
||||
if (resultIdx <= startIdx) {
|
||||
continue;
|
||||
for (Map.Entry<Integer, AtlasVertex> offsetToEntity : offsetEntityVertexMap.entrySet()) {
|
||||
resultIdx++;
|
||||
|
||||
if (resultIdx <= startIdx) {
|
||||
continue;
|
||||
}
|
||||
|
||||
lastOffset = offsetToEntity.getKey();
|
||||
ret.add(offsetToEntity.getValue());
|
||||
|
||||
if (ret.size() == limit) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ret.add(entityVertex);
|
||||
|
||||
if (ret.size() == limit) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return resultIdx;
|
||||
return marker == null ? resultIdx : lastOffset;
|
||||
}
|
||||
|
||||
public void filter(List<AtlasVertex> entityVertices) {
|
||||
if (nextProcessor != null && CollectionUtils.isNotEmpty(entityVertices)) {
|
||||
nextProcessor.filter(entityVertices);
|
||||
public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap) {
|
||||
if (nextProcessor != null && MapUtils.isNotEmpty(offsetEntityVertexMap)) {
|
||||
return nextProcessor.filter(offsetEntityVertexMap);
|
||||
}
|
||||
return offsetEntityVertexMap;
|
||||
}
|
||||
|
||||
public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap, Predicate predicate) {
|
||||
if (predicate != null) {
|
||||
offsetEntityVertexMap = offsetEntityVertexMap.entrySet()
|
||||
.stream()
|
||||
.filter(x -> predicate.evaluate(x.getValue()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (x, y) -> y, LinkedHashMap::new));
|
||||
}
|
||||
return offsetEntityVertexMap;
|
||||
}
|
||||
|
||||
protected Predicate buildTraitPredict(Set<AtlasClassificationType> classificationTypes) {
|
||||
|
|
@ -361,13 +381,13 @@ public abstract class SearchProcessor {
|
|||
return ret;
|
||||
}
|
||||
|
||||
protected void filterWhiteSpaceClassification(List<AtlasVertex> entityVertices) {
|
||||
if (CollectionUtils.isNotEmpty(entityVertices)) {
|
||||
final Iterator<AtlasVertex> it = entityVertices.iterator();
|
||||
final Set<String> typeAndSubTypes = context.getClassificationTypeNames();
|
||||
protected LinkedHashMap<Integer,AtlasVertex> filterWhiteSpaceClassification(LinkedHashMap<Integer,AtlasVertex> offsetEntityVertexMap) {
|
||||
if (offsetEntityVertexMap != null) {
|
||||
final Iterator<Map.Entry<Integer, AtlasVertex>> it = offsetEntityVertexMap.entrySet().iterator();
|
||||
final Set<String> typeAndSubTypes = context.getClassificationTypeNames();
|
||||
|
||||
while (it.hasNext()) {
|
||||
AtlasVertex entityVertex = it.next();
|
||||
AtlasVertex entityVertex = it.next().getValue();
|
||||
List<String> classificationNames = AtlasGraphUtilsV2.getClassificationNames(entityVertex);
|
||||
|
||||
if (CollectionUtils.isNotEmpty(classificationNames)) {
|
||||
|
|
@ -387,6 +407,7 @@ public abstract class SearchProcessor {
|
|||
it.remove();
|
||||
}
|
||||
}
|
||||
return offsetEntityVertexMap;
|
||||
}
|
||||
|
||||
protected void constructFilterQuery(StringBuilder indexQuery, Set<? extends AtlasStructType> structTypes, FilterCriteria filterCriteria, Set<String> indexAttributes) {
|
||||
|
|
@ -1205,6 +1226,18 @@ public abstract class SearchProcessor {
|
|||
return vertices;
|
||||
}
|
||||
|
||||
protected LinkedHashMap<Integer, AtlasVertex> getVerticesFromIndexQueryResult(Iterator<AtlasIndexQuery.Result> idxQueryResult, LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap, int qryOffset) {
|
||||
if (idxQueryResult != null) {
|
||||
while (idxQueryResult.hasNext()) {
|
||||
AtlasVertex vertex = idxQueryResult.next().getVertex();
|
||||
|
||||
offsetEntityVertexMap.put(qryOffset++, vertex);
|
||||
}
|
||||
}
|
||||
|
||||
return offsetEntityVertexMap;
|
||||
}
|
||||
|
||||
protected Collection<AtlasVertex> getVertices(Iterator<AtlasVertex> iterator, Collection<AtlasVertex> vertices) {
|
||||
if (iterator != null) {
|
||||
while (iterator.hasNext()) {
|
||||
|
|
@ -1217,6 +1250,18 @@ public abstract class SearchProcessor {
|
|||
return vertices;
|
||||
}
|
||||
|
||||
protected LinkedHashMap<Integer, AtlasVertex> getVertices(Iterator<AtlasVertex> iterator, LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap, int qryOffset) {
|
||||
if (iterator != null) {
|
||||
while (iterator.hasNext()) {
|
||||
AtlasVertex vertex = iterator.next();
|
||||
|
||||
offsetEntityVertexMap.put(qryOffset++, vertex);
|
||||
}
|
||||
}
|
||||
|
||||
return offsetEntityVertexMap;
|
||||
}
|
||||
|
||||
protected Set<String> getGuids(List<AtlasVertex> vertices) {
|
||||
Set<String> ret = new HashSet<>();
|
||||
|
||||
|
|
|
|||
|
|
@ -20,12 +20,15 @@ package org.apache.atlas.discovery;
|
|||
import org.apache.atlas.repository.graphdb.AtlasVertex;
|
||||
import org.apache.atlas.utils.AtlasPerfTracer;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.collections.MapUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
public class TermSearchProcessor extends SearchProcessor {
|
||||
|
|
@ -58,15 +61,22 @@ public class TermSearchProcessor extends SearchProcessor {
|
|||
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "TermSearchProcessor.execute(" + context + ")");
|
||||
}
|
||||
|
||||
//marker functionality will not work when there is need to fetch Term vertices and get entities from it
|
||||
try {
|
||||
if (CollectionUtils.isNotEmpty(assignedEntities)) {
|
||||
LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap = new LinkedHashMap<>();
|
||||
|
||||
final int startIdx = context.getSearchParameters().getOffset();
|
||||
final int limit = context.getSearchParameters().getLimit();
|
||||
final List<AtlasVertex> tmpList = new ArrayList<>(assignedEntities);
|
||||
|
||||
super.filter(tmpList);
|
||||
for (int i = 0; i < tmpList.size(); i++) {
|
||||
offsetEntityVertexMap.put(i, tmpList.get(i));
|
||||
}
|
||||
|
||||
collectResultVertices(ret, startIdx, limit, 0, tmpList);
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
|
||||
|
||||
collectResultVertices(ret, startIdx, limit, 0, offsetEntityVertexMap, null);
|
||||
}
|
||||
} finally {
|
||||
AtlasPerfTracer.log(perf);
|
||||
|
|
@ -79,37 +89,41 @@ public class TermSearchProcessor extends SearchProcessor {
|
|||
return ret;
|
||||
}
|
||||
|
||||
//this filter is never used
|
||||
@Override
|
||||
public void filter(List<AtlasVertex> entityVertices) {
|
||||
public LinkedHashMap<Integer, AtlasVertex> filter(LinkedHashMap<Integer, AtlasVertex> offsetEntityVertexMap) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("==> TermSearchProcessor.filter({})", entityVertices.size());
|
||||
LOG.debug("==> TermSearchProcessor.filter({})", offsetEntityVertexMap.size());
|
||||
}
|
||||
|
||||
if (CollectionUtils.isNotEmpty(entityVertices)) {
|
||||
if (MapUtils.isNotEmpty(offsetEntityVertexMap)) {
|
||||
if (CollectionUtils.isEmpty(assignedEntities)) {
|
||||
entityVertices.clear();
|
||||
offsetEntityVertexMap.clear();
|
||||
} else {
|
||||
CollectionUtils.filter(entityVertices, o -> {
|
||||
if (o instanceof AtlasVertex) {
|
||||
AtlasVertex entityVertex = (AtlasVertex) o;
|
||||
offsetEntityVertexMap.entrySet().stream().
|
||||
filter(o -> {
|
||||
if (o instanceof AtlasVertex) {
|
||||
AtlasVertex entityVertex = (AtlasVertex) o;
|
||||
|
||||
for (AtlasVertex assignedEntity : assignedEntities) {
|
||||
if (assignedEntity.getId().equals(entityVertex.getId())) {
|
||||
return true;
|
||||
for (AtlasVertex assignedEntity : assignedEntities) {
|
||||
if (assignedEntity.getId().equals(entityVertex.getId())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
});
|
||||
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (x, y) -> y, LinkedHashMap::new));
|
||||
}
|
||||
}
|
||||
|
||||
super.filter(entityVertices);
|
||||
offsetEntityVertexMap = super.filter(offsetEntityVertexMap);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("<== TermSearchProcessor.filter(): ret.size()={}", entityVertices.size());
|
||||
LOG.debug("<== TermSearchProcessor.filter(): ret.size()={}", offsetEntityVertexMap.size());
|
||||
}
|
||||
|
||||
return offsetEntityVertexMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse;
|
|||
import org.apache.atlas.repository.graph.AtlasGraphProvider;
|
||||
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.*;
|
||||
|
||||
|
|
@ -73,67 +74,51 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
|
|||
SearchParameters params = new SearchParameters();
|
||||
params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY);
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 10);
|
||||
assertSearchProcessorWithoutMarker(params, 10);
|
||||
}
|
||||
|
||||
// TSP execute and CSP,ESP filter
|
||||
@Test
|
||||
public void term_tag() throws AtlasBaseException {
|
||||
public void termTag() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY);
|
||||
params.setClassification(METRIC_CLASSIFICATION);
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
for(AtlasEntityHeader e : entityHeaders){
|
||||
System.out.println(e.toString());
|
||||
}
|
||||
assertEquals(entityHeaders.size(), 4);
|
||||
assertSearchProcessorWithoutMarker(params, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void term_entity() throws AtlasBaseException {
|
||||
public void termEntity() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY);
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 10);
|
||||
assertSearchProcessorWithoutMarker(params, 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void term_entity_tag() throws AtlasBaseException {
|
||||
public void termEntityTag() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY);
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
params.setClassification(DIMENSIONAL_CLASSIFICATION);
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isEmpty(entityHeaders));
|
||||
}
|
||||
|
||||
//FSP execute and CSP,ESP filter
|
||||
@Test
|
||||
public void query_ALLTag() throws AtlasBaseException {
|
||||
public void queryALLTag() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setClassification(ALL_CLASSIFICATION_TYPES);
|
||||
params.setQuery("sales");
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 5);
|
||||
assertSearchProcessorWithoutMarker(params, 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void query_ALLTag_tagFilter() throws AtlasBaseException {
|
||||
public void queryALLTagTagFilter() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setClassification(ALL_CLASSIFICATION_TYPES);
|
||||
//typeName will check for only classification name not propogated classification
|
||||
|
|
@ -141,103 +126,79 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
|
|||
params.setTagFilters(fc);
|
||||
params.setQuery("sales");
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 4);
|
||||
assertSearchProcessorWithoutMarker(params, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void query_NOTCLASSIFIEDTag() throws AtlasBaseException {
|
||||
public void queryNOTCLASSIFIEDTag() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setClassification(NO_CLASSIFICATIONS);
|
||||
params.setQuery("sales");
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 11);
|
||||
assertSearchProcessorWithoutMarker(params, 11);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void query_ALLWildcardTag() throws AtlasBaseException {
|
||||
public void queryALLWildcardTag() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setClassification("*");
|
||||
params.setQuery("sales");
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 5);
|
||||
assertSearchProcessorWithoutMarker(params, 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void query_wildcardTag() throws AtlasBaseException {
|
||||
public void queryWildcardTag() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setClassification("Dimen*on");
|
||||
params.setQuery("sales");
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 2);
|
||||
assertSearchProcessorWithoutMarker(params, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void query_tag() throws AtlasBaseException {
|
||||
public void queryTag() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setClassification(METRIC_CLASSIFICATION);
|
||||
params.setQuery("sales");
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 3);
|
||||
assertSearchProcessorWithoutMarker(params, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void query_tag_tagFilter() throws AtlasBaseException {
|
||||
public void queryTagTagFilter() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setClassification(METRIC_CLASSIFICATION);
|
||||
SearchParameters.FilterCriteria fc = getSingleFilterCondition("__timestamp", SearchParameters.Operator.LT, String.valueOf(System.currentTimeMillis()));
|
||||
params.setTagFilters(fc);
|
||||
params.setQuery("sales");
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 3);
|
||||
assertSearchProcessorWithoutMarker(params, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void query_entity() throws AtlasBaseException {
|
||||
public void queryEntity() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
params.setQuery("sales");
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 4);
|
||||
assertSearchProcessorWithoutMarker(params, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void query_entity_entityFilter() throws AtlasBaseException {
|
||||
public void queryEntityEntityFilter() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
SearchParameters.FilterCriteria fc = getSingleFilterCondition("tableType", Operator.NOT_NULL, "null");
|
||||
params.setEntityFilters(fc);
|
||||
params.setQuery("sales");
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 3);
|
||||
assertSearchProcessorWithoutMarker(params, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void query_entity_entityFilter_tag() throws AtlasBaseException {
|
||||
public void queryEntityEntityFilterTag() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
SearchParameters.FilterCriteria fc = getSingleFilterCondition("tableType", Operator.IS_NULL, "null");
|
||||
|
|
@ -245,14 +206,11 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
|
|||
params.setClassification(DIMENSIONAL_CLASSIFICATION);
|
||||
params.setQuery("sales");
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 1);
|
||||
assertSearchProcessorWithoutMarker(params, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void query_entity_entityFilter_tag_tagFilter() throws AtlasBaseException {
|
||||
public void queryEntityEntityFilterTagTagFilter() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
SearchParameters.FilterCriteria fcE = getSingleFilterCondition("tableType", Operator.IS_NULL, "null");
|
||||
|
|
@ -262,14 +220,11 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
|
|||
SearchParameters.FilterCriteria fcC = getSingleFilterCondition("attr1", Operator.EQ, "value1");
|
||||
params.setTagFilters(fcC);
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 1);
|
||||
assertSearchProcessorWithoutMarker(params, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void query_entity_tag_tagFilter() throws AtlasBaseException {
|
||||
public void queryEntityTagTagFilter() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
params.setClassification(METRIC_CLASSIFICATION);
|
||||
|
|
@ -277,29 +232,22 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
|
|||
params.setTagFilters(fc);
|
||||
params.setQuery("sales");
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 2);
|
||||
|
||||
assertSearchProcessorWithoutMarker(params, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void query_entity_tag() throws AtlasBaseException {
|
||||
public void queryEntityTag() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
params.setClassification(METRIC_CLASSIFICATION);
|
||||
params.setQuery("sales");
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 2);
|
||||
assertSearchProcessorWithoutMarker(params, 2);
|
||||
}
|
||||
|
||||
// CSP Execute and ESP filter
|
||||
@Test
|
||||
public void entity_entityFilter_tag_tagFilter() throws AtlasBaseException {
|
||||
public void entityEntityFilterTagTagFilter() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
SearchParameters.FilterCriteria fcE = getSingleFilterCondition("tableType", Operator.EQ, "Managed");
|
||||
|
|
@ -308,27 +256,171 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
|
|||
SearchParameters.FilterCriteria fcC = getSingleFilterCondition("__timestamp", SearchParameters.Operator.LT, String.valueOf(System.currentTimeMillis()));
|
||||
params.setTagFilters(fcC);
|
||||
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 4);
|
||||
|
||||
assertSearchProcessorWithoutMarker(params, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void entity_tag_tagFilter() throws AtlasBaseException {
|
||||
public void entityTagTagFilter() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
params.setClassification(METRIC_CLASSIFICATION);
|
||||
SearchParameters.FilterCriteria fc = getSingleFilterCondition("__timestamp", SearchParameters.Operator.LT, String.valueOf(System.currentTimeMillis()));
|
||||
params.setTagFilters(fc);
|
||||
|
||||
assertSearchProcessorWithoutMarker(params, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void searchWith0offsetMarker() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
params.setOffset(0);
|
||||
params.setMarker(SearchContext.MarkerUtil.MARKER_START);
|
||||
params.setLimit(5);
|
||||
|
||||
assertSearchProcessorWithMarker(params, 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void searchWithNoOffsetMarker() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
params.setMarker(SearchContext.MarkerUtil.MARKER_START);
|
||||
params.setLimit(5);
|
||||
|
||||
assertSearchProcessorWithMarker(params, 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void searchWithGreaterThan0OffsetBlankMarker() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
params.setOffset(1);
|
||||
params.setMarker("");
|
||||
params.setLimit(5);
|
||||
|
||||
assertSearchProcessorWithoutMarker(params, 5);
|
||||
}
|
||||
|
||||
@Test(expectedExceptions = AtlasBaseException.class, expectedExceptionsMessageRegExp = "Marker can be used only if offset=0.")
|
||||
public void searchWithGreaterThan0OffsetMarker() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
params.setOffset(1);
|
||||
params.setMarker(SearchContext.MarkerUtil.MARKER_START);
|
||||
params.setLimit(5);
|
||||
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 4);
|
||||
assertNotNull(entityHeaders);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void searchWithMarkerSet() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
params.setMarker(SearchContext.MarkerUtil.MARKER_START);
|
||||
params.setLimit(5);
|
||||
AtlasSearchResult searchResult = discoveryService.searchWithParameters(params);
|
||||
List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 5);
|
||||
Assert.assertTrue(StringUtils.isNotEmpty(searchResult.getNextMarker()));
|
||||
|
||||
//get next marker and set in marker of subsequent request
|
||||
params.setMarker(searchResult.getNextMarker());
|
||||
AtlasSearchResult nextsearchResult = discoveryService.searchWithParameters(params);
|
||||
List<AtlasEntityHeader> nextentityHeaders = nextsearchResult.getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(nextentityHeaders));
|
||||
Assert.assertTrue(StringUtils.isNotEmpty(nextsearchResult.getNextMarker()));
|
||||
|
||||
if (entityHeaders.size() < params.getLimit()) {
|
||||
Assert.assertTrue(nextsearchResult.getNextMarker() == String.valueOf(-1));
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expectedExceptions = AtlasBaseException.class, expectedExceptionsMessageRegExp = "Invalid marker!")
|
||||
public void searchWithInvalidMarker() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
params.setMarker(SearchContext.MarkerUtil.MARKER_START);
|
||||
params.setLimit(5);
|
||||
AtlasSearchResult searchResult = discoveryService.searchWithParameters(params);
|
||||
List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 5);
|
||||
Assert.assertTrue(StringUtils.isNotEmpty(searchResult.getNextMarker()));
|
||||
|
||||
//get next marker and set in marker of subsequent request
|
||||
params.setMarker(searchResult.getNextMarker()+"abc");
|
||||
AtlasSearchResult nextsearchResult = discoveryService.searchWithParameters(params);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void searchWithLastPageMarker() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
params.setExcludeDeletedEntities(true);
|
||||
params.setMarker(SearchContext.MarkerUtil.MARKER_START);
|
||||
params.setLimit(5);
|
||||
AtlasSearchResult searchResult = discoveryService.searchWithParameters(params);
|
||||
List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), 5);
|
||||
Assert.assertTrue(StringUtils.isNotEmpty(searchResult.getNextMarker()));
|
||||
|
||||
long maxEntities = searchResult.getApproximateCount();
|
||||
|
||||
//get next marker and set in marker of subsequent request
|
||||
params.setMarker(SearchContext.MarkerUtil.MARKER_START);
|
||||
params.setLimit((int)maxEntities + 10);
|
||||
AtlasSearchResult nextsearchResult = discoveryService.searchWithParameters(params);
|
||||
|
||||
Assert.assertTrue(nextsearchResult.getNextMarker().equals("-1"));
|
||||
}
|
||||
|
||||
|
||||
@Test //marker functionality is not supported
|
||||
public void termMarker() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTermName(SALES_TERM+"@"+SALES_GLOSSARY);
|
||||
params.setMarker("*");
|
||||
|
||||
assertSearchProcessorWithoutMarker(params, 10);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void queryEntityTagMarker() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
params.setClassification(METRIC_CLASSIFICATION);
|
||||
params.setQuery("sales");
|
||||
params.setMarker("*");
|
||||
params.setLimit(5);
|
||||
|
||||
assertSearchProcessorWithMarker(params, 2);
|
||||
}
|
||||
|
||||
// CSP Execute and ESP filter
|
||||
@Test
|
||||
public void entityEntityFilterTagTagFilterMarker() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setTypeName(HIVE_TABLE_TYPE);
|
||||
SearchParameters.FilterCriteria fcE = getSingleFilterCondition("tableType", Operator.EQ, "Managed");
|
||||
params.setEntityFilters(fcE);
|
||||
params.setClassification(METRIC_CLASSIFICATION);
|
||||
SearchParameters.FilterCriteria fcC = getSingleFilterCondition("__timestamp", SearchParameters.Operator.LT, String.valueOf(System.currentTimeMillis()));
|
||||
params.setTagFilters(fcC);
|
||||
params.setMarker("*");
|
||||
assertSearchProcessorWithoutMarker(params, 4);
|
||||
}
|
||||
|
||||
|
||||
String spChar1 = "default.test_dot_name";
|
||||
String spChar2 = "default.test_dot_name@db.test_db";
|
||||
String spChar3 = "default.test_dot_name_12.col1@db1";
|
||||
|
|
@ -794,6 +886,29 @@ public class AtlasDiscoveryServiceTest extends BasicTestSetup {
|
|||
entityStore.addClassification(Arrays.asList(guid), new AtlasClassification(DIMENSIONAL_CLASSIFICATION, attr));
|
||||
}
|
||||
|
||||
|
||||
private void assertSearchProcessorWithoutMarker(SearchParameters params, int expected) throws AtlasBaseException {
|
||||
assertSearchProcessor(params, expected, false);
|
||||
}
|
||||
|
||||
private void assertSearchProcessorWithMarker(SearchParameters params, int expected) throws AtlasBaseException {
|
||||
assertSearchProcessor(params, expected, true);
|
||||
}
|
||||
|
||||
private void assertSearchProcessor(SearchParameters params, int expected, boolean checkMarker) throws AtlasBaseException {
|
||||
AtlasSearchResult searchResult = discoveryService.searchWithParameters(params);
|
||||
List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
|
||||
assertEquals(entityHeaders.size(), expected);
|
||||
|
||||
if (checkMarker) {
|
||||
Assert.assertTrue(StringUtils.isNotEmpty(searchResult.getNextMarker()));
|
||||
} else {
|
||||
Assert.assertTrue(StringUtils.isEmpty(searchResult.getNextMarker()));
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public void teardown() throws Exception {
|
||||
AtlasGraphProvider.cleanup();
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ import org.apache.atlas.repository.graphdb.AtlasVertex;
|
|||
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
|
||||
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.AfterClass;
|
||||
import org.testng.annotations.BeforeClass;
|
||||
|
|
@ -263,6 +264,62 @@ public class ClassificationSearchProcessorTest extends BasicTestSetup {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void searchByWildcardTagMarker() throws AtlasBaseException {
|
||||
final String LAST_MARKER = "-1";
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setClassification("*");
|
||||
int limit = 5;
|
||||
String marker = "*";
|
||||
params.setLimit(limit);
|
||||
|
||||
while (!StringUtils.equals(marker, LAST_MARKER)) {
|
||||
params.setMarker(marker);
|
||||
SearchContext context = new SearchContext(params, typeRegistry, graph, indexer.getVertexIndexKeys());
|
||||
ClassificationSearchProcessor processor = new ClassificationSearchProcessor(context);
|
||||
List<AtlasVertex> vertices = processor.execute();
|
||||
long totalCount = vertices.size();
|
||||
marker = processor.getNextMarker();
|
||||
|
||||
if (totalCount < limit) {
|
||||
assertEquals(marker, LAST_MARKER);
|
||||
break;
|
||||
} else {
|
||||
Assert.assertNotNull(marker);
|
||||
assertEquals(vertices.size(), 5);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test //marker functionality is not supported in this case
|
||||
public void searchByTagAndGraphSysFiltersMarker() throws AtlasBaseException {
|
||||
SearchParameters params = new SearchParameters();
|
||||
params.setClassification(DIMENSION_CLASSIFICATION);
|
||||
FilterCriteria filterCriteria = getSingleFilterCondition("__entityStatus", Operator.EQ, "DELETED");
|
||||
params.setTagFilters(filterCriteria);
|
||||
params.setExcludeDeletedEntities(false);
|
||||
params.setLimit(20);
|
||||
params.setMarker("*");
|
||||
|
||||
SearchContext context = new SearchContext(params, typeRegistry, graph, indexer.getVertexIndexKeys());
|
||||
ClassificationSearchProcessor processor = new ClassificationSearchProcessor(context);
|
||||
List<AtlasVertex> vertices = processor.execute();
|
||||
|
||||
Assert.assertTrue(CollectionUtils.isNotEmpty(vertices));
|
||||
assertEquals(vertices.size(), 1);
|
||||
List<String> guids = vertices.stream().map(g -> {
|
||||
try {
|
||||
return entityRetriever.toAtlasEntityHeader(g).getGuid();
|
||||
} catch (AtlasBaseException e) {
|
||||
fail("Failure in mapping vertex to AtlasEntityHeader");
|
||||
}
|
||||
return "";
|
||||
}).collect(Collectors.toList());
|
||||
Assert.assertTrue(guids.contains(dimensionTagDeleteGuid));
|
||||
|
||||
Assert.assertNull(processor.getNextMarker());
|
||||
}
|
||||
|
||||
private void createDimensionTaggedEntityAndDelete() throws AtlasBaseException {
|
||||
AtlasEntity entityToDelete = new AtlasEntity(HIVE_TABLE_TYPE);
|
||||
entityToDelete.setAttribute("name", "entity to be deleted");
|
||||
|
|
|
|||
|
|
@ -197,7 +197,8 @@ public class DiscoveryREST {
|
|||
@QueryParam("sortOrder") SortOrder sortOrder,
|
||||
@QueryParam("excludeDeletedEntities") boolean excludeDeletedEntities,
|
||||
@QueryParam("limit") int limit,
|
||||
@QueryParam("offset") int offset) throws AtlasBaseException {
|
||||
@QueryParam("offset") int offset,
|
||||
@QueryParam("marker") String marker) throws AtlasBaseException {
|
||||
Servlets.validateQueryParamLength("typeName", typeName);
|
||||
Servlets.validateQueryParamLength("classification", classification);
|
||||
Servlets.validateQueryParamLength("sortBy", sortByAttribute);
|
||||
|
|
@ -220,6 +221,7 @@ public class DiscoveryREST {
|
|||
searchParameters.setExcludeDeletedEntities(excludeDeletedEntities);
|
||||
searchParameters.setLimit(limit);
|
||||
searchParameters.setOffset(offset);
|
||||
searchParameters.setMarker(marker);
|
||||
searchParameters.setSortBy(sortByAttribute);
|
||||
searchParameters.setSortOrder(sortOrder);
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue