ATLAS-4733 : Download Basic and DSL search results

Signed-off-by: Mandar Ambawane <mandar.ambawane@freestoneinfotech.com>
This commit is contained in:
Mandar Ambawane 2023-04-14 13:26:59 +05:30
parent 19dc92d604
commit 205b975cce
9 changed files with 829 additions and 31 deletions

View File

@ -219,6 +219,7 @@ public enum AtlasErrorCode {
GLOSSARY_CATEGORY_ALREADY_EXISTS(409, "ATLAS-409-00-00A", "Glossary category with qualifiedName {0} already exists"),
GLOSSARY_IMPORT_FAILED(409, "ATLAS-409-00-011", "Glossary import failed"),
METRICSSTAT_ALREADY_EXISTS(409, "ATLAS-409-00-012", "Metric Statistics already collected at {0}"),
PENDING_TASKS_ALREADY_IN_PROGRESS(409, "ATLAS-409-00-013", "There are already {0} pending tasks in queue"),
// All internal errors go here
INTERNAL_ERROR(500, "ATLAS-500-00-001", "Internal server error {0}"),

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.discovery;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.atlas.model.tasks.AtlasTask;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class AtlasSearchResultDownloadStatus implements Serializable {
private List<AtlasSearchDownloadRecord> searchDownloadRecords;
public List<AtlasSearchDownloadRecord> getSearchDownloadRecords() {
return searchDownloadRecords;
}
public void setSearchDownloadRecords(List<AtlasSearchDownloadRecord> searchDownloadRecords) {
this.searchDownloadRecords = searchDownloadRecords;
}
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
public static class AtlasSearchDownloadRecord implements Serializable {
private AtlasTask.Status status;
private String fileName;
private String createdBy;
private Date createdTime;
private Date startTime;
public AtlasSearchDownloadRecord(AtlasTask.Status status, String fileName, String createdBy, Date createdTime, Date startTime) {
this.status = status;
this.fileName = fileName;
this.createdBy = createdBy;
this.createdTime = createdTime;
this.startTime = startTime;
}
public AtlasSearchDownloadRecord(AtlasTask.Status status, String fileName, String createdBy, Date createdTime) {
this(status, fileName, createdBy, createdTime, null);
}
public AtlasTask.Status getStatus() {
return status;
}
public void setStatus(AtlasTask.Status status) {
this.status = status;
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public String getCreatedBy() {
return createdBy;
}
public void setCreatedBy(String createdBy) {
this.createdBy = createdBy;
}
public Date getCreatedTime() {
return createdTime;
}
public void setCreatedTime(Date createdTime) {
this.createdTime = createdTime;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
sb = new StringBuilder();
}
sb.append("AtlasSearchDownloadRecord{");
sb.append("status=").append(status);
sb.append(", fileName=").append(fileName);
sb.append(", createdBy=").append(createdBy);
sb.append(", createTime=").append(createdTime);
sb.append(", startTime=").append(startTime);
sb.append("}");
return sb;
}
@Override
public String toString() {
return toString(new StringBuilder()).toString();
}
}
}

View File

@ -19,13 +19,13 @@
package org.apache.atlas.discovery;
import org.apache.atlas.SortOrder;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.*;
import org.apache.atlas.model.profile.AtlasUserSavedSearch;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.Map;
public interface AtlasDiscoveryService {
/**
@ -160,4 +160,17 @@ public interface AtlasDiscoveryService {
* @return top 5 suggestion strings for the given prefix.
*/
AtlasSuggestionsResult getSuggestions(String prefixString, String fieldName);
/**
* Creates task to search and download the results of Basic and DSL search
* @param taskParams parameters of AtlasTask
*/
void createAndQueueSearchResultDownloadTask(Map<String, Object> taskParams) throws AtlasBaseException;
/**
*
* @return AtlasSearchResultDownloadStatus
* @throws IOException
*/
AtlasSearchResultDownloadStatus getSearchResultDownloadStatus() throws IOException;
}

View File

@ -22,6 +22,7 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.SortOrder;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
@ -32,6 +33,8 @@ import org.apache.atlas.model.discovery.AtlasQuickSearchResult;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasFullTextResult;
import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType;
import org.apache.atlas.model.discovery.AtlasSearchResultDownloadStatus;
import org.apache.atlas.model.discovery.AtlasSearchResultDownloadStatus.AtlasSearchDownloadRecord;
import org.apache.atlas.model.discovery.AtlasSuggestionsResult;
import org.apache.atlas.model.discovery.QuickSearchParameters;
import org.apache.atlas.model.discovery.RelationshipSearchParameters;
@ -41,6 +44,7 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelationshipHeader;
import org.apache.atlas.model.profile.AtlasUserSavedSearch;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.query.QueryParams;
import org.apache.atlas.query.executors.DSLQueryExecutor;
import org.apache.atlas.query.executors.ScriptEngineBasedExecutor;
@ -56,7 +60,10 @@ import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTask;
import org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTaskFactory;
import org.apache.atlas.repository.userprofile.UserProfileService;
import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
import org.apache.atlas.type.AtlasClassificationType;
@ -83,15 +90,21 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.atlas.AtlasErrorCode.*;
import static org.apache.atlas.SortOrder.ASCENDING;
@ -120,13 +133,15 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
private final UserProfileService userProfileService;
private final SuggestionsProvider suggestionsProvider;
private final DSLQueryExecutor dslQueryExecutor;
private final TaskManagement taskManagement;
@Inject
EntityDiscoveryService(AtlasTypeRegistry typeRegistry,
AtlasGraph graph,
GraphBackedSearchIndexer indexer,
SearchTracker searchTracker,
UserProfileService userProfileService) throws AtlasException {
UserProfileService userProfileService,
TaskManagement taskManagement) throws AtlasException {
this.graph = graph;
this.entityRetriever = new EntityGraphRetriever(this.graph, typeRegistry);
this.indexer = indexer;
@ -142,6 +157,7 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
this.dslQueryExecutor = AtlasConfiguration.DSL_EXECUTOR_TRAVERSAL.getBoolean()
? new TraversalBasedExecutor(typeRegistry, graph, entityRetriever)
: new ScriptEngineBasedExecutor(typeRegistry, graph, entityRetriever);
this.taskManagement = taskManagement;
LOG.info("DSL Executor: {}", this.dslQueryExecutor.getClass().getSimpleName());
}
@ -451,6 +467,48 @@ public class EntityDiscoveryService implements AtlasDiscoveryService {
return searchWithSearchContext(new SearchContext(searchParameters, typeRegistry, graph, indexer.getVertexIndexKeys()));
}
@Override
@GraphTransaction
public void createAndQueueSearchResultDownloadTask(Map<String, Object> taskParams) throws AtlasBaseException {
List<AtlasTask> pendingTasks = taskManagement.getPendingTasksByType(SearchResultDownloadTaskFactory.SEARCH_RESULT_DOWNLOAD);
if (CollectionUtils.isNotEmpty(pendingTasks) && pendingTasks.size() > SearchResultDownloadTaskFactory.MAX_PENDING_TASKS_ALLOWED) {
throw new AtlasBaseException(PENDING_TASKS_ALREADY_IN_PROGRESS, String.valueOf(pendingTasks.size()));
}
AtlasTask task = taskManagement.createTask(SearchResultDownloadTaskFactory.SEARCH_RESULT_DOWNLOAD, RequestContext.getCurrentUser(), taskParams);
RequestContext.get().queueTask(task);
}
@Override
public AtlasSearchResultDownloadStatus getSearchResultDownloadStatus() throws IOException {
List<AtlasTask> pendingTasks = taskManagement.getPendingTasksByType(SearchResultDownloadTaskFactory.SEARCH_RESULT_DOWNLOAD);
List<AtlasTask> currentUserPendingTasks = pendingTasks.stream().filter(task -> task.getCreatedBy()
.equals(RequestContext.getCurrentUser())).collect(Collectors.toList());
List<AtlasSearchDownloadRecord> searchDownloadRecords = new ArrayList<>();
for (AtlasTask pendingTask : currentUserPendingTasks) {
String fileName = (String) pendingTask.getParameters().get(SearchResultDownloadTask.CSV_FILE_NAME_KEY);
AtlasSearchDownloadRecord searchDownloadRecord = new AtlasSearchDownloadRecord(pendingTask.getStatus(), fileName, pendingTask.getCreatedBy(), pendingTask.getCreatedTime(), pendingTask.getStartTime());
searchDownloadRecords.add(searchDownloadRecord);
}
File fileDir = new File(SearchResultDownloadTask.DOWNLOAD_DIR_PATH, RequestContext.getCurrentUser());
if (fileDir.exists()) {
File[] currentUserFiles = fileDir.listFiles();
if (currentUserFiles != null) {
for (File file : currentUserFiles) {
BasicFileAttributes attr = Files.readAttributes(file.toPath(), BasicFileAttributes.class);
Date createdTime = new Date(attr.creationTime().toMillis());
AtlasSearchDownloadRecord searchDownloadRecord = new AtlasSearchDownloadRecord(AtlasTask.Status.COMPLETE, file.getName(), RequestContext.getCurrentUser(), createdTime);
searchDownloadRecords.add(searchDownloadRecord);
}
}
}
AtlasSearchResultDownloadStatus result = new AtlasSearchResultDownloadStatus();
result.setSearchDownloadRecords(searchDownloadRecords);
return result;
}
@Override
@GraphTransaction
public AtlasSearchResult searchRelationsWithParameters(RelationshipSearchParameters searchParameters) throws AtlasBaseException {

View File

@ -0,0 +1,270 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.tasks.searchdownload;
import com.opencsv.CSVWriter;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.discovery.AtlasDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.tasks.AbstractTask;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasJson;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType.BASIC;
import static org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType.DSL;
import static org.apache.atlas.model.tasks.AtlasTask.Status.COMPLETE;
import static org.apache.atlas.model.tasks.AtlasTask.Status.FAILED;
public class SearchResultDownloadTask extends AbstractTask {
private static final Logger LOG = LoggerFactory.getLogger(SearchResultDownloadTask.class);
public static final String SEARCH_PARAMETERS_JSON_KEY = "search_parameters_json";
public static final String CSV_FILE_NAME_KEY = "csv_file_Name";
public static final String SEARCH_TYPE_KEY = "search_type";
public static final String ATTRIBUTE_LABEL_MAP_KEY = "attribute_label_map";
public static final String QUERY_KEY = "query";
public static final String TYPE_NAME_KEY = "type_name";
public static final String CLASSIFICATION_KEY = "classification";
public static final String LIMIT_KEY = "limit";
public static final String OFFSET_KEY = "offset";
public static final String CSV_FILE_EXTENSION = ".csv";
public static String DOWNLOAD_DIR_PATH;
private static final String EMPTY_STRING = "";
private static final String DOWNLOAD_DIR_PATH_KEY = "atlas.download.search.dir.path";
private static final String DOWNLOAD_DIR_PATH_DEFAULT = StringUtils.isEmpty(System.getProperty("atlas.home")) ? System.getProperty("user.dir") : System.getProperty("atlas.home");
private static final String CSV_DOWNLOAD_DIR = "search_result_downloads";
private static Configuration configuration;
static {
try {
configuration = ApplicationProperties.get();
} catch (AtlasException e) {
LOG.error("Failed to load application properties", e);
}
if (configuration != null) {
DOWNLOAD_DIR_PATH = configuration.getString(DOWNLOAD_DIR_PATH_KEY, DOWNLOAD_DIR_PATH_DEFAULT) + File.separator + CSV_DOWNLOAD_DIR;
} else {
DOWNLOAD_DIR_PATH = DOWNLOAD_DIR_PATH_DEFAULT + File.separator + CSV_DOWNLOAD_DIR;
}
}
private final AtlasDiscoveryService discoveryService;
private final AtlasTypeRegistry typeRegistry;
public SearchResultDownloadTask(AtlasTask task, AtlasDiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) {
super(task);
this.discoveryService = discoveryService;
this.typeRegistry = typeRegistry;
}
@Override
public AtlasTask.Status perform() throws Exception {
RequestContext.clear();
Map<String, Object> params = getTaskDef().getParameters();
if (MapUtils.isEmpty(params)) {
LOG.warn("Task: {}: Unable to process task: Parameters is not readable!", getTaskGuid());
return FAILED;
}
String userName = getTaskDef().getCreatedBy();
if (StringUtils.isEmpty(userName)) {
LOG.warn("Task: {}: Unable to process task as user name is empty!", getTaskGuid());
return FAILED;
}
RequestContext.get().setUser(userName, null);
try {
run(params);
setStatus(COMPLETE);
} catch (Exception e) {
LOG.error("Task: {}: Error performing task!", getTaskGuid(), e);
setStatus(FAILED);
throw e;
} finally {
RequestContext.clear();
}
return getStatus();
}
protected void run(Map<String, Object> parameters) throws AtlasBaseException, IOException {
Map<String, String> attributeLabelMap;
AtlasSearchResult searchResult = null;
AtlasSearchResult.AtlasQueryType queryType = null;
if (parameters.get(SEARCH_TYPE_KEY) == BASIC) {
String searchParametersJson = (String) parameters.get(SEARCH_PARAMETERS_JSON_KEY);
SearchParameters searchParameters = AtlasJson.fromJson(searchParametersJson, SearchParameters.class);
searchParameters.setLimit(AtlasConfiguration.SEARCH_MAX_LIMIT.getInt());
searchResult = discoveryService.searchWithParameters(searchParameters);
queryType = BASIC;
} else if (parameters.get(SEARCH_TYPE_KEY) == DSL) {
String query = (String) parameters.get(QUERY_KEY);
String typeName = (String) parameters.get(TYPE_NAME_KEY);
String classification = (String) parameters.get(CLASSIFICATION_KEY);
int offset = (int) parameters.get(OFFSET_KEY);
String queryStr = discoveryService.getDslQueryUsingTypeNameClassification(query, typeName, classification);
searchResult = discoveryService.searchUsingDslQuery(queryStr, AtlasConfiguration.SEARCH_MAX_LIMIT.getInt(), offset);
queryType = DSL;
}
String attributeLabelMapJson = (String) parameters.get(ATTRIBUTE_LABEL_MAP_KEY);
attributeLabelMap = AtlasJson.fromJson(attributeLabelMapJson, Map.class);
generateCSVFileFromSearchResult(searchResult, attributeLabelMap, queryType);
}
private void generateCSVFileFromSearchResult(AtlasSearchResult searchResult, Map<String, String> attributeLabelMap, AtlasSearchResult.AtlasQueryType queryType) throws IOException {
List<AtlasEntityHeader> allEntityHeaders = searchResult.getEntities();
AtlasSearchResult.AttributeSearchResult attributeSearchResult = searchResult.getAttributes();
String fileName = (String) getTaskDef().getParameters().get(CSV_FILE_NAME_KEY);
if ((queryType == BASIC && CollectionUtils.isEmpty(allEntityHeaders))
|| (queryType == DSL && (CollectionUtils.isEmpty(allEntityHeaders) && attributeSearchResult == null))) {
LOG.info("No result found. Not generating csv file: {}", fileName);
return;
}
File dir = new File(DOWNLOAD_DIR_PATH, RequestContext.getCurrentUser());
File csvFile;
if (!dir.exists()) {
dir.mkdirs();
}
csvFile = new File(dir, fileName);
try (FileWriter fileWriter = new FileWriter(csvFile);
CSVWriter csvWriter = new CSVWriter(fileWriter)) {
String[] defaultHeaders = new String[]{"Type name", "Name", "Classifications", "Terms"};
String[] attributeHeaders;
int attrSize = 0;
if (attributeLabelMap == null) {
attributeLabelMap = new HashMap<>();
}
attributeLabelMap.put("Owner", "owner");
attributeLabelMap.put("Description", "description");
Collection<String> attributeHeaderLabels = attributeLabelMap.keySet();
if (queryType == DSL && (CollectionUtils.isEmpty(allEntityHeaders) && attributeSearchResult != null)) {
attributeHeaderLabels = attributeSearchResult.getName();
defaultHeaders = new String[0];
}
attrSize = (attributeHeaderLabels == null) ? 0 : attributeHeaderLabels.size();
attributeHeaders = new String[attrSize];
if (attributeHeaderLabels != null) {
attributeHeaders = attributeHeaderLabels.toArray(attributeHeaders);
}
int headerSize = attrSize + defaultHeaders.length;
String[] headers = new String[headerSize];
System.arraycopy(defaultHeaders, 0, headers, 0, defaultHeaders.length);
if (ArrayUtils.isNotEmpty(attributeHeaders)) {
System.arraycopy(attributeHeaders, 0, headers, defaultHeaders.length, attrSize);
}
csvWriter.writeNext(headers);
String[] entityRecords = new String[headerSize];
if (CollectionUtils.isNotEmpty(allEntityHeaders)) {
for (AtlasEntityHeader entityHeader : allEntityHeaders) {
entityRecords[0] = entityHeader.getTypeName();
entityRecords[1] = entityHeader.getDisplayText() != null ? entityHeader.getDisplayText() : entityHeader.getGuid();
entityRecords[2] = String.join(",", entityHeader.getClassificationNames());
entityRecords[3] = String.join(",", entityHeader.getMeaningNames());
if (MapUtils.isNotEmpty(entityHeader.getAttributes())) {
for (int i = defaultHeaders.length; i < headerSize; i++) {
Object attrValue = entityHeader.getAttribute(attributeLabelMap.get(headers[i]));
if (attrValue instanceof AtlasObjectId) {
entityRecords[i] = String.valueOf(((AtlasObjectId) attrValue).getUniqueAttributes().get("qualifiedName"));
} else if (attrValue instanceof List) {
if (CollectionUtils.isNotEmpty((List<?>) attrValue)) {
List<String> valueList = new ArrayList<>();
for (Object attrVal : (List) attrValue) {
if (attrVal instanceof AtlasObjectId) {
String value = String.valueOf(((AtlasObjectId) attrVal).getUniqueAttributes().get("qualifiedName"));
valueList.add(value);
} else {
valueList.add(String.valueOf(attrVal));
}
}
entityRecords[i] = String.join(",", valueList);
}
} else {
entityRecords[i] = attrValue == null ? EMPTY_STRING : String.valueOf(attrValue);
}
}
}
csvWriter.writeNext(entityRecords);
}
}
if (queryType == DSL && attributeSearchResult != null) {
for (List<Object> resultSet : attributeSearchResult.getValues()) {
for (int i = defaultHeaders.length; i < headerSize; i++) {
entityRecords[i] = resultSet.get(i) == null ? EMPTY_STRING : String.valueOf(resultSet.get(i));
}
csvWriter.writeNext(entityRecords);
}
}
}
}
}

View File

@ -0,0 +1,152 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.tasks.searchdownload;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.discovery.AtlasDiscoveryService;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.tasks.AbstractTask;
import org.apache.atlas.tasks.TaskFactory;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@Singleton
@Service
@EnableScheduling
public class SearchResultDownloadTaskFactory implements TaskFactory {
private static final Logger LOG = LoggerFactory.getLogger(SearchResultDownloadTaskFactory.class);
public static final String SEARCH_RESULT_DOWNLOAD = "SEARCH_RESULT_DOWNLOAD";
public static int MAX_PENDING_TASKS_ALLOWED;
private static final int MAX_PENDING_TASKS_ALLOWED_DEFAULT = 50;
private static final String MAX_PENDING_TASKS_ALLOWED_KEY = "atlas.download.search.max.pending.tasks";
private static final String FILES_CLEANUP_INTERVAL = "0 0/1 * * * *";
private static final long FILE_EXP_DURATION_IN_MILLIS_DEFAULT = 24 * 60 * 60 * 1000;
private static long FILE_EXP_DURATION_IN_MILLIS;
private static final String FILE_EXP_DURATION_IN_MILLIS_KEY = "atlas.download.search.file.expiry.millis";
private static Configuration configuration;
static {
try {
configuration = ApplicationProperties.get();
} catch (Exception e) {
LOG.info("Failed to load application properties", e);
}
if (configuration != null) {
MAX_PENDING_TASKS_ALLOWED = configuration.getInt(MAX_PENDING_TASKS_ALLOWED_KEY, MAX_PENDING_TASKS_ALLOWED_DEFAULT);
FILE_EXP_DURATION_IN_MILLIS = configuration.getLong(FILE_EXP_DURATION_IN_MILLIS_KEY, FILE_EXP_DURATION_IN_MILLIS_DEFAULT);
} else {
MAX_PENDING_TASKS_ALLOWED = MAX_PENDING_TASKS_ALLOWED_DEFAULT;
FILE_EXP_DURATION_IN_MILLIS = FILE_EXP_DURATION_IN_MILLIS_DEFAULT;
}
}
private static final List<String> supportedTypes = new ArrayList<String>() {{
add(SEARCH_RESULT_DOWNLOAD);
}};
private final AtlasDiscoveryService discoveryService;
private final AtlasTypeRegistry typeRegistry;
@Inject
public SearchResultDownloadTaskFactory(AtlasDiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) {
this.discoveryService = discoveryService;
this.typeRegistry = typeRegistry;
}
@Override
public AbstractTask create(AtlasTask task) {
String taskType = task.getType();
String taskGuid = task.getGuid();
switch (taskType) {
case SEARCH_RESULT_DOWNLOAD:
return new SearchResultDownloadTask(task, discoveryService, typeRegistry);
default:
LOG.warn("Type: {} - {} not found!. The task will be ignored.", taskType, taskGuid);
return null;
}
}
@Override
public List<String> getSupportedTypes() {
return this.supportedTypes;
}
@Scheduled(cron = "#{getCronExpressionForCleanup}")
public void cleanupExpiredFiles() {
File csvDir = new File(SearchResultDownloadTask.DOWNLOAD_DIR_PATH);
deleteFiles(csvDir);
}
@Bean
private String getCronExpressionForCleanup() {
return FILES_CLEANUP_INTERVAL;
}
private void deleteFiles(File downloadDir) {
File[] subDirs = downloadDir.listFiles();
if (ArrayUtils.isNotEmpty(subDirs)) {
for (File subDir : subDirs) {
File[] csvFiles = subDir.listFiles();
if (ArrayUtils.isNotEmpty(csvFiles)) {
for (File csv : csvFiles) {
BasicFileAttributes attr;
try {
attr = Files.readAttributes(csv.toPath(), BasicFileAttributes.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
Date now = new Date();
long fileAgeInMillis = now.getTime() - attr.creationTime().toMillis();
if (FILE_EXP_DURATION_IN_MILLIS < fileAgeInMillis) {
if (LOG.isDebugEnabled()) {
LOG.debug("deleting file: {}", csv.getName());
}
csv.delete();
}
}
}
}
}
}
}

View File

@ -120,6 +120,10 @@ public class TaskManagement implements Service, ActiveStateChangeHandler {
return this.registry.getPendingTasks();
}
public List<AtlasTask> getPendingTasksByType(String type) {
return this.registry.getPendingTasksByType(type);
}
public List<AtlasTask> getAll() {
return this.registry.getAll();
}

View File

@ -84,6 +84,31 @@ public class TaskRegistry {
return ret;
}
@GraphTransaction
public List<AtlasTask> getPendingTasksByType(String type) {
List<AtlasTask> ret = new ArrayList<>();
try {
AtlasGraphQuery query = graph.query()
.has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
.has(Constants.TASK_STATUS, AtlasTask.Status.PENDING)
.has(Constants.TASK_TYPE, type)
.orderBy(Constants.TASK_CREATED_TIME, AtlasGraphQuery.SortOrder.ASC);
Iterator<AtlasVertex> results = query.vertices().iterator();
while (results.hasNext()) {
AtlasVertex vertex = results.next();
ret.add(toAtlasTask(vertex));
}
} catch (Exception exception) {
LOG.error("Error fetching pending tasks by type!", exception);
}
return ret;
}
@GraphTransaction
public void updateStatus(AtlasVertex taskVertex, AtlasTask task) {
if (taskVertex == null) {

View File

@ -19,6 +19,7 @@ package org.apache.atlas.web.rest;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.SortOrder;
import org.apache.atlas.annotation.Timed;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
@ -29,9 +30,11 @@ import org.apache.atlas.model.discovery.*;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
import org.apache.atlas.model.profile.AtlasUserSavedSearch;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTask;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.collections.CollectionUtils;
@ -54,10 +57,21 @@ import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.File;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType.BASIC;
import static org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType.DSL;
import static org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTask.*;
/**
* REST interface for data discovery using dsl or full text search
*/
@ -107,18 +121,10 @@ public class DiscoveryREST {
@QueryParam("classification") String classification,
@QueryParam("limit") int limit,
@QueryParam("offset") int offset) throws AtlasBaseException {
Servlets.validateQueryParamLength("typeName", typeName);
Servlets.validateQueryParamLength("classification", classification);
if (StringUtils.isNotEmpty(query)) {
if (query.length() > maxDslQueryLength) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_QUERY_LENGTH, Constants.MAX_DSL_QUERY_STR_LENGTH);
}
query = Servlets.decodeQueryString(query);
}
validateDSLSearchParameters(query, typeName, classification);
AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.searchUsingDSL(" + query + "," + typeName
@ -134,6 +140,23 @@ public class DiscoveryREST {
}
/**
*
* @param parameterMap
* @throws AtlasBaseException
*/
@POST
@Timed
@Path("dsl/download/create_file")
public void dslSearchCreateFile(Map<String, Object> parameterMap) throws AtlasBaseException {
SearchParameters parameters = AtlasJson.fromLinkedHashMap(parameterMap.get("searchParameters"), SearchParameters.class);
validateDSLSearchParameters(parameters.getQuery(), parameters.getTypeName(), parameters.getClassification());
Map<String, Object> taskParams = populateTaskParams(parameters.getQuery(), parameters.getTypeName(), parameters.getClassification(), parameters.getLimit(), parameters.getOffset());
discoveryService.createAndQueueSearchResultDownloadTask(taskParams);
}
/**
* Retrieve data for the specified fulltext query
@ -330,24 +353,7 @@ public class DiscoveryREST {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.searchWithParameters(" + parameters + ")");
}
if (parameters.getLimit() < 0 || parameters.getOffset() < 0) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Limit/offset should be non-negative");
}
if (StringUtils.isEmpty(parameters.getTypeName()) && !isEmpty(parameters.getEntityFilters())) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "EntityFilters specified without Type name");
}
if (StringUtils.isEmpty(parameters.getClassification()) && !isEmpty(parameters.getTagFilters())) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "TagFilters specified without tag name");
}
if (StringUtils.isEmpty(parameters.getTypeName()) && StringUtils.isEmpty(parameters.getClassification()) &&
StringUtils.isEmpty(parameters.getQuery()) && StringUtils.isEmpty(parameters.getTermName())) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_SEARCH_PARAMS);
}
validateSearchParameters(parameters);
validateBasicSearchParameters(parameters);
return discoveryService.searchWithParameters(parameters);
} finally {
@ -355,6 +361,67 @@ public class DiscoveryREST {
}
}
/**
*
* @param parameterMap
* @throws AtlasBaseException
*/
@POST
@Timed
@Path("basic/download/create_file")
public void basicSearchCreateFile(Map<String, Object> parameterMap) throws AtlasBaseException {
AtlasPerfTracer perf = null;
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.basicSearchCreateFile(" + parameterMap + ")");
}
Map<String, String> attributeLabelMap = (Map<String, String>) parameterMap.get("attributeLabelMap");
SearchParameters parameters = AtlasJson.fromLinkedHashMap(parameterMap.get("searchParameters"), SearchParameters.class);
validateBasicSearchParameters(parameters);
Map<String, Object> taskParams = populateTaskParams(parameters, attributeLabelMap);
discoveryService.createAndQueueSearchResultDownloadTask(taskParams);
} finally {
AtlasPerfTracer.log(perf);
}
}
@GET
@Timed
@Path("download/status")
public AtlasSearchResultDownloadStatus getSearchResultDownloadStatus() throws IOException {
return discoveryService.getSearchResultDownloadStatus();
}
/**
*
* @param fileName
* @return
*/
@GET
@Timed
@Path("download/{filename}")
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Response downloadSearchResultFile(@PathParam("filename") String fileName) {
File dir = new File(SearchResultDownloadTask.DOWNLOAD_DIR_PATH, RequestContext.getCurrentUser());
File csvFile = new File(dir, fileName);
if (!csvFile.exists()) {
return Response.noContent().build();
}
Response.ResponseBuilder response = Response.ok(csvFile);
response.header("Content-Disposition", "attachment; filename=\"" + fileName + "\"");
return response.build();
}
/**
* Relationship search to search for relations(edges)
*
@ -827,4 +894,78 @@ public class DiscoveryREST {
validateSearchParameters(EntityDiscoveryService.createSearchParameters(parameters));
}
}
}
private void validateBasicSearchParameters(SearchParameters parameters) throws AtlasBaseException {
if (parameters.getLimit() < 0 || parameters.getOffset() < 0) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Limit/offset should be non-negative");
}
if (StringUtils.isEmpty(parameters.getTypeName()) && !isEmpty(parameters.getEntityFilters())) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "EntityFilters specified without Type name");
}
if (StringUtils.isEmpty(parameters.getClassification()) && !isEmpty(parameters.getTagFilters())) {
throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "TagFilters specified without tag name");
}
if (StringUtils.isEmpty(parameters.getTypeName()) && StringUtils.isEmpty(parameters.getClassification()) &&
StringUtils.isEmpty(parameters.getQuery()) && StringUtils.isEmpty(parameters.getTermName())) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_SEARCH_PARAMS);
}
validateSearchParameters(parameters);
}
private void validateDSLSearchParameters(String query, String typeName, String classification) throws AtlasBaseException {
Servlets.validateQueryParamLength("typeName", typeName);
Servlets.validateQueryParamLength("classification", classification);
if (StringUtils.isNotEmpty(query)) {
if (query.length() > maxDslQueryLength) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_QUERY_LENGTH, Constants.MAX_DSL_QUERY_STR_LENGTH);
}
Servlets.decodeQueryString(query);
}
}
private Map<String, Object> populateTaskParams(SearchParameters parameters, Map<String, String> attributeLabelMap) {
String searchParametersJson = AtlasJson.toJson(parameters);
String attrLabelMapJson = AtlasJson.toJson(attributeLabelMap);
Map<String, Object> taskParams = new HashMap<>();
taskParams.put(SEARCH_TYPE_KEY, BASIC);
taskParams.put(SEARCH_PARAMETERS_JSON_KEY, searchParametersJson);
taskParams.put(ATTRIBUTE_LABEL_MAP_KEY, attrLabelMapJson);
String csvFileName = RequestContext.getCurrentUser() + "_" + BASIC + "_" + getDateTimeString() + CSV_FILE_EXTENSION;
taskParams.put(CSV_FILE_NAME_KEY, csvFileName);
return taskParams;
}
private Map<String, Object> populateTaskParams(String query, String typeName, String classification, int limit, int offset) {
Map<String, Object> taskParams = new HashMap<>();
taskParams.put(SEARCH_TYPE_KEY, DSL);
taskParams.put(QUERY_KEY, query);
taskParams.put(TYPE_NAME_KEY, typeName);
taskParams.put(CLASSIFICATION_KEY, classification);
taskParams.put(LIMIT_KEY, limit);
taskParams.put(OFFSET_KEY, offset);
String csvFileName = RequestContext.getCurrentUser() + "_" + DSL + "_" + getDateTimeString() + CSV_FILE_EXTENSION;
taskParams.put(CSV_FILE_NAME_KEY, csvFileName);
return taskParams;
}
private String getDateTimeString() {
LocalDateTime localDateTime = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss.SSS");
return formatter.format(localDateTime);
}
}