ATLAS-4335: Hook Notifications through Rest Interface

Signed-off-by: radhikakundam <radhikakundam@apache.org>
This commit is contained in:
radhikakundam 2022-12-06 11:23:22 -08:00
parent 990bbfc25c
commit 8d8169e83a
27 changed files with 789 additions and 54 deletions

View File

@ -185,11 +185,6 @@
<artifactId>jersey-json</artifactId>
<version>${jersey.version}</version>
</artifactItem>
<artifactItem>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
<version>${jsr.version}</version>
</artifactItem>
</artifactItems>
</configuration>
</execution>

View File

@ -63,7 +63,7 @@
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-bundle</artifactId>
<version>1.19</version>
<version>${jersey.version}</version>
<scope>test</scope>
</dependency>
@ -399,11 +399,6 @@
<artifactId>jersey-bundle</artifactId>
<version>${jersey.version}</version>
</artifactItem>
<artifactItem>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
<version>${jsr.version}</version>
</artifactItem>
</artifactItems>
</configuration>
</execution>
@ -621,7 +616,6 @@
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -311,6 +311,11 @@
<artifactId>jersey-json</artifactId>
<version>${jersey.version}</version>
</artifactItem>
<artifactItem>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
<version>${jersey.version}</version>
</artifactItem>
</artifactItems>
</configuration>
</execution>

View File

@ -325,11 +325,6 @@
<artifactId>jersey-json</artifactId>
<version>${jersey.version}</version>
</artifactItem>
<artifactItem>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
<version>${jsr.version}</version>
</artifactItem>
</artifactItems>
</configuration>
</execution>

View File

@ -249,11 +249,6 @@
<artifactId>jersey-json</artifactId>
<version>${jersey.version}</version>
</artifactItem>
<artifactItem>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
<version>${jsr.version}</version>
</artifactItem>
</artifactItems>
</configuration>
</execution>

View File

@ -46,7 +46,9 @@ public enum AtlasPrivilege {
TYPE_READ("type-read"),
ADMIN_AUDITS("admin-audits");
ADMIN_AUDITS("admin-audits"),
SERVICE_NOTIFICATION_POST("service-notification-post");
private final String type;

View File

@ -134,6 +134,9 @@ public class AtlasClientV2 extends AtlasBaseClient {
private static final String GLOSSARY_CATEGORY = GLOSSARY_URI + "/category";
private static final String GLOSSARY_CATEGORIES = GLOSSARY_URI + "/categories";
//Notification APIs
private static final String NOTIFICATION_URI = BASE_URI + "v2/notification";
public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) {
super(baseUrl, basicAuthUserNamePassword);
@ -173,7 +176,7 @@ public class AtlasClientV2 extends AtlasBaseClient {
}
@VisibleForTesting
AtlasClientV2(WebResource service, Configuration configuration) {
public AtlasClientV2(WebResource service, Configuration configuration) {
super(service, configuration);
}
@ -1024,6 +1027,14 @@ public class AtlasClientV2 extends AtlasBaseClient {
return callAPI(API_V2.IMPORT_GLOSSARY, BulkImportResponse.class, multipartEntity);
}
public void postNotificationToTopic(String topic, List<String> messages) throws AtlasServiceException {
callAPI(formatPathParameters(API_V2.POST_NOTIFICATIONS_TO_TOPIC, topic), (Class<?>) null, messages);
}
@VisibleForTesting
public API formatPathWithParameter(API api, String... params) {
return formatPathParameters(api, params);
}
@Override
protected API formatPathParameters(API api, String... params) {
@ -1199,6 +1210,8 @@ public class AtlasClientV2 extends AtlasBaseClient {
public static final API_V2 GET_BUSINESS_METADATA_TEMPLATE = new API_V2(ENTITY_API + "businessmetadata/import/template", HttpMethod.GET, Response.Status.OK, MediaType.APPLICATION_JSON, MediaType.APPLICATION_OCTET_STREAM);
public static final API_V2 IMPORT_BUSINESS_METADATA = new API_V2(ENTITY_API + "businessmetadata/import", HttpMethod.POST, Response.Status.OK, MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON);
public static final API_V2 POST_NOTIFICATIONS_TO_TOPIC = new API_V2(NOTIFICATION_URI + "/topic/%s", HttpMethod.POST, Response.Status.NO_CONTENT);
// labels APIs
public static final API_V2 ADD_LABELS = new API_V2(ENTITY_API + "guid/%s/labels", HttpMethod.PUT, Response.Status.NO_CONTENT);
public static final API_V2 ADD_LABELS_BY_UNIQUE_ATTRIBUTE = new API_V2(ENTITY_API + "uniqueAttribute/type/%s/labels", HttpMethod.PUT, Response.Status.NO_CONTENT);

View File

@ -40,9 +40,14 @@ public enum AtlasConfiguration {
NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", "ATLAS_HOOK"),
NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", "ATLAS_ENTITIES"),
NOTIFICATION_HOOK_CONSUMER_BUFFERING_INTERVAL("atlas.notification.consumer.message.buffering.interval.seconds", 15),
NOTIFICATION_HOOK_CONSUMER_BUFFERING_BATCH_SIZE("atlas.notification.consumer.message.buffering.batch.size", 100),
NOTIFICATION_HOOK_REST_ENABLED("atlas.hook.rest.notification.enabled", false),
NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES("atlas.notification.hook.consumer.topic.names", "ATLAS_HOOK"), // a comma separated list of topic names
NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES("atlas.notification.entities.consumer.topic.names", "ATLAS_ENTITIES"), // a comma separated list of topic names
NOTIFICATION_REST_BODY_MAX_LENGTH_BYTES("atlas.notification.rest.body.max.length.bytes", (1 * 1024 * 1024)),
NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)),
NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true),
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60),

View File

@ -176,6 +176,7 @@ public enum AtlasErrorCode {
ATTRIBUTE_NAME_ALREADY_EXISTS_IN_ANOTHER_PARENT_TYPE(400, "ATLAS-400-00-09E", "Invalid attribute name: {0}.{1}. Attribute already exists in another parent type: {2}"),
IMPORT_INVALID_ZIP_ENTRY(400, "ATLAS-400-00-09F", "{0}: invalid zip entry. Reason: {1}"),
LINEAGE_ON_DEMAND_NOT_ENABLED(400, "ATLAS-400-00-100", "Lineage on demand config: {0} is not enabled"),
INVALID_TOPIC_NAME(400, "ATLAS-400-00-101", "Unsupported topic name : {0}"),
UNAUTHORIZED_ACCESS(403, "ATLAS-403-00-001", "{0} is not authorized to perform {1}"),
@ -243,7 +244,8 @@ public enum AtlasErrorCode {
ENTITY_NOTIFICATION_FAILED(500, "ATLAS-500-00-014", "Notification failed for operation: {0} : {1}"),
FAILED_TO_UPLOAD(500, "ATLAS-500-00-015", "Error occurred while uploading the file: {0}"),
FAILED_TO_CREATE_GLOSSARY_TERM(500, "ATLAS-500-00-016", "Error occurred while creating glossary term: {0}"),
FAILED_TO_UPDATE_GLOSSARY_TERM(500, "ATLAS-500-00-017", "Error occurred while updating glossary term: {0}");
FAILED_TO_UPDATE_GLOSSARY_TERM(500, "ATLAS-500-00-017", "Error occurred while updating glossary term: {0}"),
NOTIFICATION_EXCEPTION(500, "ATLAS-500-00-018", "{0}");
private String errorCode;
private String errorMessage;

View File

@ -42,7 +42,9 @@
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-server-api</artifactId>
<artifactId>atlas-client-v2</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>

View File

@ -21,6 +21,7 @@ package org.apache.atlas.hook;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.model.notification.HookNotification;
@ -63,6 +64,7 @@ public abstract class AtlasHook {
public static final String CONF_METADATA_NAMESPACE = "atlas.metadata.namespace";
public static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String CONF_ATLAS_HOOK_MESSAGES_SORT_ENABLED = "atlas.hook.messages.sort.enabled";
protected static Configuration atlasProperties;
protected static NotificationInterface notificationInterface;
@ -75,6 +77,8 @@ public abstract class AtlasHook {
private static final int notificationMaxRetries;
private static final int notificationRetryInterval;
private static ExecutorService executor = null;
public static final boolean isRESTNotificationEnabled;
public static final boolean isHookMsgsSortEnabled;
static {
@ -95,6 +99,8 @@ public abstract class AtlasHook {
failedMessagesLogger = null;
}
isRESTNotificationEnabled = AtlasConfiguration.NOTIFICATION_HOOK_REST_ENABLED.getBoolean();
isHookMsgsSortEnabled = atlasProperties.getBoolean(CONF_ATLAS_HOOK_MESSAGES_SORT_ENABLED, isRESTNotificationEnabled);
metadataNamespace = getMetadataNamespace(atlasProperties);
notificationMaxRetries = atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3);
notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);

View File

@ -73,6 +73,11 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
return receive(this.pollTimeoutMilliSeconds, lastCommittedPartitionOffset);
}
@Override
public List<AtlasKafkaMessage<T>> receiveRawRecordsWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) {
return receiveRawRecords(this.pollTimeoutMilliSeconds, lastCommittedPartitionOffset);
}
@Override
public void commit(TopicPartition partition, long offset) {
@ -98,7 +103,15 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
}
}
private List<AtlasKafkaMessage<T>> receiveRawRecords(long timeoutMilliSeconds, Map<TopicPartition, Long> lastCommittedPartitionOffset) {
return receive(timeoutMilliSeconds, lastCommittedPartitionOffset, true);
}
private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPartition, Long> lastCommittedPartitionOffset) {
return receive(timeoutMilliSeconds, lastCommittedPartitionOffset, false);
}
private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPartition, Long> lastCommittedPartitionOffset, boolean isRawDataRequired) {
List<AtlasKafkaMessage<T>> messages = new ArrayList();
ConsumerRecords<?, ?> records = kafkaConsumer != null ? kafkaConsumer.poll(timeoutMilliSeconds) : null;
@ -134,8 +147,17 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
continue;
}
messages.add(new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(),
deserializer.getMsgCreated(), deserializer.getSpooled()));
AtlasKafkaMessage kafkaMessage = null;
if (isRawDataRequired) {
kafkaMessage = new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(),
deserializer.getMsgCreated(), deserializer.getSpooled(), deserializer.getSource(), record.value().toString());
} else {
kafkaMessage = new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition(),
deserializer.getMsgCreated(), deserializer.getSpooled(), deserializer.getSource());
}
messages.add(kafkaMessage);
}
}

View File

@ -26,13 +26,25 @@ public class AtlasKafkaMessage<T> {
private final TopicPartition topicPartition;
private final boolean spooled;
private final long msgCreated;
private final String source;
private final String rawRecordData;
public AtlasKafkaMessage(T message, long offset, String topic, int partition, long msgCreated, boolean spooled) {
public AtlasKafkaMessage(T message, long offset, String topic, int partition, long msgCreated, boolean spooled, String source, String rawRecordData) {
this.message = message;
this.offset = offset;
this.topicPartition = new TopicPartition(topic, partition);
this.msgCreated = msgCreated;
this.spooled = spooled;
this.source = source;
this.rawRecordData = rawRecordData;
}
public AtlasKafkaMessage(T message, long offset, String topic, int partition, long msgCreated, boolean spooled, String source) {
this(message, offset, topic, partition, msgCreated, spooled, source, null);
}
public AtlasKafkaMessage(T message, long offset, String topic, int partition, long msgCreated, boolean spooled) {
this(message, offset, topic, partition, msgCreated, spooled, null);
}
public AtlasKafkaMessage(T message, long offset, String topic, int partition) {
@ -66,4 +78,8 @@ public class AtlasKafkaMessage<T> {
public long getMsgCreated() {
return this.msgCreated;
}
public String getSource() { return this.source; }
public String getRawRecordData() { return this.rawRecordData; }
}

View File

@ -58,6 +58,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class);
public static final String PROPERTY_PREFIX = "atlas.kafka";
public static final String UNSORTED_POSTFIX = "_UNSORTED";
public static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
public static final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
@ -67,9 +68,27 @@ public class KafkaNotification extends AbstractNotification implements Service {
private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This consumer has already been closed.";
public static String ATLAS_HOOK_TOPIC_UNSORTED;
public static String[] ATLAS_HOOK_UNSORTED_CONSUMER_TOPICS;
static {
try {
ATLAS_HOOK_TOPIC_UNSORTED = ATLAS_HOOK_TOPIC + UNSORTED_POSTFIX;
ATLAS_HOOK_UNSORTED_CONSUMER_TOPICS = ATLAS_HOOK_CONSUMER_TOPICS != null && ATLAS_HOOK_CONSUMER_TOPICS.length > 0
? new String[ATLAS_HOOK_CONSUMER_TOPICS.length] : new String[] {ATLAS_HOOK_TOPIC_UNSORTED};
for (int i = 0; i < ATLAS_HOOK_CONSUMER_TOPICS.length; i++) {
ATLAS_HOOK_UNSORTED_CONSUMER_TOPICS[i] = ATLAS_HOOK_CONSUMER_TOPICS[i] + UNSORTED_POSTFIX;
}
} catch (Exception e) {
LOG.error("Error while initializing Kafka Notification", e);
}
}
private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP = new HashMap<NotificationType, String>() {
{
put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
put(NotificationType.HOOK_UNSORTED, ATLAS_HOOK_TOPIC_UNSORTED);
put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
}
};
@ -77,6 +96,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
private static final Map<NotificationType, String[]> CONSUMER_TOPICS_MAP = new HashMap<NotificationType, String[]>() {
{
put(NotificationType.HOOK, trimAndPurge(ATLAS_HOOK_CONSUMER_TOPICS));
put(NotificationType.HOOK_UNSORTED, trimAndPurge(ATLAS_HOOK_UNSORTED_CONSUMER_TOPICS));
put(NotificationType.ENTITIES, trimAndPurge(ATLAS_ENTITIES_CONSUMER_TOPICS));
}
};
@ -86,6 +106,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
private final Map<NotificationType, List<KafkaConsumer>> consumers = new HashMap<>();
private final Map<NotificationType, KafkaProducer> producers = new HashMap<>();
private String consumerClosedErrorMsg;
private final Map<String, KafkaProducer> producersByTopic = new HashMap<>();
// ----- Constructors ----------------------------------------------------
@ -255,6 +276,21 @@ public class KafkaNotification extends AbstractNotification implements Service {
LOG.info("<== KafkaNotification.close()");
}
//Sending messages received through HTTP or REST Notification Service to Producer
public void sendInternal(String topic, List<String> messages, boolean isSortNeeded) throws NotificationException {
KafkaProducer producer;
if (isSortNeeded) {
topic = topic + UNSORTED_POSTFIX;
}
producer = getOrCreateProducer(topic);
sendInternalToProducer(producer, topic, messages);
}
public void sendInternal(String topic, List<String> messages) throws NotificationException {
KafkaProducer producer = getOrCreateProducer(topic);
sendInternalToProducer(producer, topic, messages);
}
// ----- AbstractNotification --------------------------------------------
@Override
@ -266,7 +302,11 @@ public class KafkaNotification extends AbstractNotification implements Service {
@VisibleForTesting
void sendInternalToProducer(Producer p, NotificationType notificationType, List<String> messages) throws NotificationException {
String topic = PRODUCER_TOPIC_MAP.get(notificationType);
String topic = PRODUCER_TOPIC_MAP.get(notificationType);
sendInternalToProducer(p, topic, messages);
}
void sendInternalToProducer(Producer p, String topic , List<String> messages) throws NotificationException {
List<MessageContext> messageContexts = new ArrayList<>();
for (String message : messages) {
@ -308,6 +348,9 @@ public class KafkaNotification extends AbstractNotification implements Service {
public Properties getConsumerProperties(NotificationType notificationType) {
// find the configured group id for the given notification type
String groupId = properties.getProperty(notificationType.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
if (StringUtils.isEmpty(groupId)) {
groupId = "atlas";
}
if (StringUtils.isEmpty(groupId)) {
throw new IllegalStateException("No configuration group id set for the notification type " + notificationType);
@ -346,21 +389,45 @@ public class KafkaNotification extends AbstractNotification implements Service {
private KafkaProducer getOrCreateProducer(NotificationType notificationType) {
LOG.debug("==> KafkaNotification.getOrCreateProducer()");
KafkaProducer ret = producers.get(notificationType);
KafkaProducer ret = getOrCreateProducerByCriteria(notificationType, producers, false);
LOG.debug("<== KafkaNotification.getOrCreateProducer()");
return ret;
}
private KafkaProducer getOrCreateProducer(String topic) {
LOG.debug("==> KafkaNotification.getOrCreateProducer() by Topic");
KafkaProducer ret = getOrCreateProducerByCriteria(topic, producersByTopic, true);
LOG.debug("<== KafkaNotification.getOrCreateProducer by Topic");
return ret;
}
private KafkaProducer getOrCreateProducerByCriteria(Object producerCriteria, Map producersByCriteria, boolean fetchByTopic) {
LOG.debug("==> KafkaNotification.getOrCreateProducerByCriteria()");
if ((fetchByTopic && !(producerCriteria instanceof String)) || (!fetchByTopic && !(producerCriteria instanceof NotificationType))) {
LOG.error("Error while retrieving Producer due to invalid criteria");
}
KafkaProducer ret = (KafkaProducer) producersByCriteria.get(producerCriteria);
if (ret == null) {
synchronized (this) {
ret = producers.get(notificationType);
ret = (KafkaProducer) producersByCriteria.get(producerCriteria);
if (ret == null) {
ret = new KafkaProducer(properties);
producers.put(notificationType, ret);
producersByCriteria.put(producerCriteria, ret);
}
}
}
LOG.debug("<== KafkaNotification.getOrCreateProducer()");
LOG.debug("<== KafkaNotification.getOrCreateProducerByCriteria()");
return ret;
}

View File

@ -17,26 +17,28 @@
*/
package org.apache.atlas.kafka;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.LogConfigUtils;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.rest.RestNotification;
import org.apache.atlas.notification.spool.AtlasFileSpool;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
/**
* Provider class for Notification interfaces
*/
public class NotificationProvider {
private static final Logger LOG = LoggerFactory.getLogger(NotificationProvider.class);
private static final String CONF_ATLAS_HOOK_SPOOL_ENABLED = "atlas.hook.spool.enabled";
private static final String CONF_ATLAS_HOOK_SPOOL_DIR = "atlas.hook.spool.dir";
@VisibleForTesting
public static final String CONF_ATLAS_HOOK_SPOOL_ENABLED = "atlas.hook.spool.enabled";
private static final String CONF_ATLAS_HOOK_SPOOL_DIR = "atlas.hook.spool.dir";
private static final boolean CONF_ATLAS_HOOK_SPOOL_ENABLED_DEFAULT = false;
@ -45,25 +47,32 @@ public class NotificationProvider {
public static NotificationInterface get() {
if (notificationProvider == null) {
try {
Configuration conf = ApplicationProperties.get();
KafkaNotification kafka = new KafkaNotification(conf);
String spoolDir = getSpoolDir(conf);
Configuration conf = ApplicationProperties.get();
String spoolDir = getSpoolDir(conf);
AbstractNotification absNotifier = null;
if (AtlasHook.isRESTNotificationEnabled) {
absNotifier = new RestNotification(conf);
} else {
absNotifier = new KafkaNotification(conf);
}
if (isSpoolingEnabled(conf) && StringUtils.isNotEmpty(spoolDir)) {
LOG.info("Notification spooling is enabled: spool directory={}", spoolDir);
conf.setProperty(CONF_ATLAS_HOOK_SPOOL_DIR, spoolDir);
notificationProvider = new AtlasFileSpool(conf, kafka);
notificationProvider = new AtlasFileSpool(conf, absNotifier);
} else {
LOG.info("Notification spooling is not enabled");
notificationProvider = kafka;
notificationProvider = absNotifier;
}
} catch (AtlasException e) {
throw new RuntimeException(e);
throw new RuntimeException("Error while initializing Notification interface", e);
}
}
LOG.debug("NotificationInterface of type {} is enabled", notificationProvider.getClass().getSimpleName());
return notificationProvider;
}

View File

@ -65,6 +65,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
private final AtomicLong messageCountSinceLastInterval = new AtomicLong(0);
private long msgCreated;
private boolean spooled;
private String source;
// ----- Constructors ----------------------------------------------------
/**
@ -112,6 +113,10 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
return this.spooled;
}
public String getSource() {
return this.source;
}
@Override
public T deserialize(String messageJson) {
final T ret;
@ -120,6 +125,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
messageCountSinceLastInterval.incrementAndGet();
this.msgCreated = 0;
this.spooled = false;
this.source = null;
AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationMessage.class);
@ -128,6 +134,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message
} else {
this.msgCreated = ((AtlasNotificationMessage) msg).getMsgCreationTime();
this.spooled = ((AtlasNotificationMessage) msg).getSpooled();
this.source = msg.getSource() != null ? msg.getSource().getSource() : null;
String msgJson = messageJson;

View File

@ -63,4 +63,11 @@ public interface NotificationConsumer<T> {
* @return List containing kafka message and partionId and offset.
*/
List<AtlasKafkaMessage<T>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset);
/**
* Fetch raw data for the topics from Kafka, if lastCommittedOffset same as message
* received offset, it will proceed with commit.
* @return List containing kafka message and partitionId and offset.
*/
List<AtlasKafkaMessage<T>> receiveRawRecordsWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset);
}

View File

@ -46,6 +46,9 @@ public interface NotificationInterface {
// Notifications from the Atlas integration hooks.
HOOK(new HookMessageDeserializer()),
// Notifications from the Atlas integration hooks - unsorted.
HOOK_UNSORTED(new HookMessageDeserializer()),
// Notifications to entity change consumers.
ENTITIES(new EntityMessageDeserializer());

View File

@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification.rest;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.model.notification.AtlasNotificationBaseMessage;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.kafka.KafkaNotification.ATLAS_ENTITIES_TOPIC;
import static org.apache.atlas.kafka.KafkaNotification.ATLAS_HOOK_TOPIC;
public class RestNotification extends AbstractNotification {
private static final Logger LOG = LoggerFactory.getLogger(RestNotification.class);
private static final int BATCH_MAX_LENGTH_BYTES = AtlasConfiguration.NOTIFICATION_REST_BODY_MAX_LENGTH_BYTES.getInt();
private static final String ATLAS_ENDPOINT = "atlas.rest.address";
private static final String BASIC_AUTH_USERNAME = "atlas.rest.basic.auth.username";
private static final String BASIC_AUTH_PASSWORD = "atlas.rest.basic.auth.password";
private static final String DEFAULT_ATLAS_URL = "http://localhost:31000/";
private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP = new HashMap<NotificationType, String>() {
{
put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
}
};
@VisibleForTesting
public AtlasClientV2 atlasClientV2;
public RestNotification(Configuration configuration) throws AtlasException {
super();
setupAtlasClientV2(configuration);
}
private AtlasClientV2 setupAtlasClientV2(Configuration configuration) throws AtlasException {
if (atlasClientV2 != null) {
return atlasClientV2;
}
try {
String[] atlasEndPoint = configuration.getStringArray(ATLAS_ENDPOINT);
if (atlasEndPoint == null || atlasEndPoint.length == 0) {
atlasEndPoint = new String[] { DEFAULT_ATLAS_URL };
}
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
String fileAuthUsername = configuration.getString(BASIC_AUTH_USERNAME, "admin");
String fileAuthPassword = configuration.getString(BASIC_AUTH_PASSWORD, "admin123");
String[] basicAuthUsernamePassword = (fileAuthUsername == null || fileAuthPassword == null)
? AuthenticationUtil.getBasicAuthenticationInput()
: new String[]{fileAuthUsername, fileAuthPassword};
atlasClientV2 = new AtlasClientV2(atlasEndPoint, basicAuthUsernamePassword);
} else {
atlasClientV2 = new AtlasClientV2(atlasEndPoint);
}
} catch (AtlasException e) {
throw new AtlasException(e);
}
return atlasClientV2;
}
@Override
public void sendInternal(NotificationType type, List<String> messages) throws NotificationException {
String topic = PRODUCER_TOPIC_MAP.get(type);
List<List<String>> batches = getBatches(messages);
int batchCounter = 0;
try {
for (List<String> batch: batches) {
batchCounter++;
atlasClientV2.postNotificationToTopic(topic, batch);
}
} catch (AtlasServiceException e) {
if (e.getMessage().contains(AtlasErrorCode.NOTIFICATION_EXCEPTION.getErrorCode())) {
LOG.error("Sending notifications through REST interface failed starting from batch# {}", batchCounter);
throw new NotificationException(e);
} else {
throw new RuntimeException(e);
}
}
}
private List<List<String>> getBatches(List<String> messages) {
List<List<String>> batches = new ArrayList();
List<String> batch = new ArrayList();
int batchSize = 0;
for (String message : messages) {
byte[] msgBytes = AtlasNotificationBaseMessage.getBytesUtf8(message);
if (batchSize > 0 && batchSize + msgBytes.length > BATCH_MAX_LENGTH_BYTES) {
batches.add(batch);
batch = new ArrayList();
batchSize = 0;
}
batch.add(message);
batchSize += msgBytes.length;
}
batches.add(batch);
return batches;
}
@Override
public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) {
return null;
}
@Override
public void close() {
}
@Override
public boolean isReady(NotificationType type) {
return true;
}
}

View File

@ -229,6 +229,11 @@ public class AbstractNotificationConsumerTest {
public List<AtlasKafkaMessage<TestMessage>> receiveWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) {
return receive();
}
@Override
public List<AtlasKafkaMessage<TestMessage>> receiveRawRecordsWithCheckedCommit(Map<TopicPartition, Long> lastCommittedPartitionOffset) {
return null;
}
}
public static class TestMessageDeserializer extends AbstractMessageDeserializer<TestMessage> {

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasBaseClient;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.notification.rest.RestNotification;
import org.apache.commons.configuration.Configuration;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Arrays;
import static org.apache.atlas.kafka.KafkaNotification.ATLAS_HOOK_TOPIC;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
public class RestNotificationTest {
private NotificationInterface notifier;
private Configuration conf;
@Mock
private WebResource service;
@Mock
private WebResource.Builder resourceBuilderMock;
@BeforeClass
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
conf = ApplicationProperties.get();
conf.setProperty(AtlasConfiguration.NOTIFICATION_HOOK_REST_ENABLED.getPropertyName(), true);
conf.setProperty(NotificationProvider.CONF_ATLAS_HOOK_SPOOL_ENABLED, false);
notifier = NotificationProvider.get();
}
private WebResource.Builder setupBuilder(AtlasClientV2.API api, WebResource webResource) {
when(webResource.path(api.getPath())).thenReturn(service);
when(webResource.path(api.getNormalizedPath())).thenReturn(service);
return getBuilder(service);
}
private WebResource.Builder getBuilder(WebResource resourceObject) {
when(resourceObject.getRequestBuilder()).thenReturn(resourceBuilderMock);
when(resourceObject.path(anyString())).thenReturn(resourceObject);
when(resourceBuilderMock.accept(MediaType.APPLICATION_JSON)).thenReturn(resourceBuilderMock);
when(resourceBuilderMock.type(MediaType.MULTIPART_FORM_DATA)).thenReturn(resourceBuilderMock);
when(resourceBuilderMock.type(MediaType.APPLICATION_JSON + "; charset=UTF-8")).thenReturn(resourceBuilderMock);
return resourceBuilderMock;
}
@Test
public void testNotificationProvider () throws Exception {
assertEquals(notifier.getClass(), RestNotification.class);
}
@Test
public void testPostNotificationToTopic () throws Exception {
AtlasClientV2 client = new AtlasClientV2(service, conf);
AtlasBaseClient.API api = client.formatPathWithParameter(AtlasClientV2.API_V2.POST_NOTIFICATIONS_TO_TOPIC, ATLAS_HOOK_TOPIC);
WebResource.Builder builder = setupBuilder(api, service);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(Response.Status.NO_CONTENT.getStatusCode());
when(builder.method(anyString(), Matchers.<Class>any(), anyList())).thenReturn(response);
((RestNotification)notifier).atlasClientV2 = client;
try {
((RestNotification)notifier).sendInternal(NotificationInterface.NotificationType.HOOK, new ArrayList<String>(Arrays.asList("Dummy")));
} catch (NotificationException e) {
Assert.fail("Failed with Exception");
}
}
@Test
public void testNotificationException () throws Exception {
AtlasClientV2 client = new AtlasClientV2(service, conf);
AtlasBaseClient.API api = client.formatPathWithParameter(AtlasClientV2.API_V2.POST_NOTIFICATIONS_TO_TOPIC, ATLAS_HOOK_TOPIC);
WebResource.Builder builder = setupBuilder(api, service);
ClientResponse response = mock(ClientResponse.class);
when(response.getStatus()).thenReturn(AtlasErrorCode.NOTIFICATION_EXCEPTION.getHttpCode().getStatusCode());
when(response.getEntity(String.class)).thenReturn(AtlasErrorCode.NOTIFICATION_EXCEPTION.getErrorCode());
when(builder.method(anyString(), Matchers.<Class>any(), anyList())).thenReturn(response);
((RestNotification)notifier).atlasClientV2 = client;
try {
((RestNotification)notifier).sendInternal(NotificationInterface.NotificationType.HOOK, new ArrayList<String>(Arrays.asList("Dummy")));
} catch (NotificationException e) {
Assert.assertTrue(e.getMessage().contains(AtlasErrorCode.NOTIFICATION_EXCEPTION.getErrorCode()));
}
}
}

View File

@ -22,7 +22,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.atlas.*;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
@ -83,6 +85,7 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -91,6 +94,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -123,6 +127,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private static final String EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION = "PermanentLockingException";
private static final int KAFKA_CONSUMER_SHUTDOWN_WAIT = 30000;
private static final String ATLAS_HOOK_CONSUMER_THREAD_NAME = "atlas-hook-consumer-thread";
private static final String ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME = "atlas-hook-unsorted-consumer-thread";
// from org.apache.hadoop.hive.ql.parse.SemanticAnalyzer
public static final String DUMMY_DATABASE = "_dummy_database";
public static final String DUMMY_TABLE = "_dummy_table";
@ -195,6 +202,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private Instant nextStatsLogTime = AtlasMetricsCounter.getNextHourStartTime(Instant.now());
private final Map<TopicPartition, Long> lastCommittedPartitionOffset;
private final EntityCorrelationManager entityCorrelationManager;
private final long consumerMsgBufferingIntervalMS;
private final int consumerMsgBufferingBatchSize;
@VisibleForTesting
final int consumerRetryInterval;
@ -230,6 +239,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
largeMessageProcessingTimeThresholdMs = applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60 * 1000); // 60 sec by default
createShellEntityForNonExistingReference = AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
authorizeUsingMessageUser = applicationProperties.getBoolean(CONSUMER_AUTHORIZE_USING_MESSAGE_USER, false);
consumerMsgBufferingIntervalMS = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_BUFFERING_INTERVAL.getInt() * 1000;
consumerMsgBufferingBatchSize = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_BUFFERING_BATCH_SIZE.getInt();
int authnCacheTtlSeconds = applicationProperties.getInt(CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS, 300);
@ -350,17 +361,35 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
private void startConsumers(ExecutorService executorService) {
int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
Map<NotificationConsumer<HookNotification>, NotificationType> notificationConsumersByType = new HashMap<>();
List<NotificationConsumer<HookNotification>> notificationConsumers = notificationInterface.createConsumers(NotificationType.HOOK, numThreads);
for (NotificationConsumer<HookNotification> notificationConsumer : notificationConsumers) {
notificationConsumersByType.put(notificationConsumer, NotificationType.HOOK);
}
if (AtlasHook.isHookMsgsSortEnabled) {
List<NotificationConsumer<HookNotification>> unsortedNotificationConsumers = notificationInterface.createConsumers(NotificationType.HOOK_UNSORTED, numThreads);
for (NotificationConsumer<HookNotification> unsortedNotificationConsumer : unsortedNotificationConsumers) {
notificationConsumersByType.put(unsortedNotificationConsumer, NotificationType.HOOK_UNSORTED);
}
}
if (executorService == null) {
executorService = Executors.newFixedThreadPool(notificationConsumers.size(), new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
executorService = Executors.newFixedThreadPool(notificationConsumersByType.size(), new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
}
executors = executorService;
for (final NotificationConsumer<HookNotification> consumer : notificationConsumers) {
HookConsumer hookConsumer = new HookConsumer(consumer);
for (final NotificationConsumer<HookNotification> consumer : notificationConsumersByType.keySet()) {
String hookConsumerName = ATLAS_HOOK_CONSUMER_THREAD_NAME;
if (notificationConsumersByType.get(consumer).equals(NotificationType.HOOK_UNSORTED)) {
hookConsumerName = ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME;
}
HookConsumer hookConsumer = new HookConsumer(hookConsumerName, consumer);
consumers.add(hookConsumer);
executors.submit(hookConsumer);
@ -529,8 +558,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
private final List<String> failedMessages = new ArrayList<>();
private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
private int duplicateKeyCounter = 1;
public HookConsumer(NotificationConsumer<HookNotification> consumer) {
super("atlas-hook-consumer-thread");
super(ATLAS_HOOK_CONSUMER_THREAD_NAME);
this.consumer = consumer;
}
public HookConsumer(String consumerThreadName, NotificationConsumer<HookNotification> consumer) {
super(consumerThreadName);
this.consumer = consumer;
}
@ -548,10 +585,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
try {
while (shouldRun.get()) {
try {
List<AtlasKafkaMessage<HookNotification>> messages = consumer.receiveWithCheckedCommit(lastCommittedPartitionOffset);
for (AtlasKafkaMessage<HookNotification> msg : messages) {
handleMessage(msg);
if (StringUtils.equals(ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME, this.getName())) {
long msgBufferingStartTime = System.currentTimeMillis();
Map<String,AtlasKafkaMessage<HookNotification>> msgBuffer = new TreeMap<>();
sortAndPublishMsgsToAtlasHook(msgBufferingStartTime, msgBuffer);
} else {
List<AtlasKafkaMessage<HookNotification>> messages = consumer.receiveWithCheckedCommit(lastCommittedPartitionOffset);
for (AtlasKafkaMessage<HookNotification> msg : messages) {
handleMessage(msg);
}
}
} catch (IllegalStateException ex) {
adaptiveWaiter.pause(ex);
@ -576,6 +618,63 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
}
}
private void resetDuplicateKeyCounter() {
duplicateKeyCounter = 1;
}
private String getKey(String msgCreated, String source) {
return String.format("%s_%s", msgCreated, source);
}
private void sortMessages(AtlasKafkaMessage<HookNotification> msg, Map<String,AtlasKafkaMessage<HookNotification>> msgBuffer) {
String key = getKey(Long.toString(msg.getMsgCreated()), msg.getSource());
if (msgBuffer.containsKey(key)) { //Duplicate key can possible for messages from same source with same msgCreationTime
key = getKey(key, Integer.toString(duplicateKeyCounter));
duplicateKeyCounter++;
}
msgBuffer.put(key, msg);
}
void sortAndPublishMsgsToAtlasHook(long msgBufferingStartTime, Map<String,AtlasKafkaMessage<HookNotification>> msgBuffer) throws NotificationException {
List<AtlasKafkaMessage<HookNotification>> messages = consumer.receiveRawRecordsWithCheckedCommit(lastCommittedPartitionOffset);
AtlasKafkaMessage<HookNotification> maxOffsetMsg = null;
long maxOffsetProcessed = 0;
messages.stream().forEach(x -> sortMessages(x, msgBuffer));
if (msgBuffer.size() < consumerMsgBufferingBatchSize && System.currentTimeMillis() - msgBufferingStartTime < consumerMsgBufferingIntervalMS) {
sortAndPublishMsgsToAtlasHook(msgBufferingStartTime, msgBuffer);
return;
}
for (AtlasKafkaMessage<HookNotification> msg : msgBuffer.values()) {
String hookTopic = StringUtils.isNotEmpty(msg.getTopic()) ? msg.getTopic().split(KafkaNotification.UNSORTED_POSTFIX)[0] : KafkaNotification.ATLAS_HOOK_TOPIC;
if (maxOffsetProcessed == 0 || maxOffsetProcessed < msg.getOffset()) {
maxOffsetMsg = msg;
maxOffsetProcessed = msg.getOffset();
}
((KafkaNotification)notificationInterface).sendInternal(hookTopic,
StringUtils.isNotEmpty(msg.getRawRecordData()) ? Arrays.asList(msg.getRawRecordData()) : Arrays.asList(msg.getMessage().toString()));
}
/** In case of failure while publishing sorted messages(above for loop), consuming of unsorted messages should start from the initial offset
* Explicitly keeping this for loop separate to commit messages only after sending all batch messages to hook topic
*/
for (AtlasKafkaMessage<HookNotification> msg : msgBuffer.values()) {
commit(msg);
}
if (maxOffsetMsg != null) {
commit(maxOffsetMsg);
}
msgBuffer.clear();
resetDuplicateKeyCounter();
}
@VisibleForTesting
void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
AtlasPerfTracer perf = null;

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.rest;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.authorize.AtlasAdminAccessRequest;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.kafka.KafkaNotification;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.*;
@Path("v2/notification")
@Singleton
@Service
@Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
@Produces({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
public class NotificationREST {
private static final Logger LOG = LoggerFactory.getLogger(NotificationREST.class);
public static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
public static final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
private static final String[] ATLAS_HOOK_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC);
private static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC);
private static final Set<String> TOPICS = new HashSet<>();
private final NotificationInterface notificationInterface;
static {
TOPICS.addAll(Arrays.asList(ATLAS_HOOK_CONSUMER_TOPICS));
TOPICS.addAll(Arrays.asList(ATLAS_ENTITIES_CONSUMER_TOPICS));
}
@Inject
public NotificationREST(NotificationInterface notificationInterface) {
this.notificationInterface = notificationInterface;
}
/**
* Publish notifications on Kafka topic
*
* @param topicName - nameOfTheQueue
* @throws AtlasBaseException
*/
@POST
@Path("/topic/{topicName}")
@Consumes({Servlets.JSON_MEDIA_TYPE, MediaType.APPLICATION_JSON})
public void handleNotifications(@PathParam("topicName") String topicName, @Context HttpServletRequest request) throws AtlasBaseException, IOException {
LOG.debug("Handling notifications for topic {}", topicName);
AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.SERVICE_NOTIFICATION_POST), "post on rest notification service");
if (!TOPICS.contains(topicName)) {
throw new AtlasBaseException(AtlasErrorCode.INVALID_TOPIC_NAME, topicName);
}
String messagesAsJson = Servlets.getRequestPayload(request);
List<String> messages = getMessagesToNotify(messagesAsJson);
try {
KafkaNotification notifier = (KafkaNotification) notificationInterface;
notifier.sendInternal(topicName, messages, AtlasHook.isHookMsgsSortEnabled);
} catch (NotificationException exception) {
List<String> failedMessages = exception.getFailedMessages();
String concatenatedMessage = StringUtils.join(failedMessages, "\n");
throw new AtlasBaseException(AtlasErrorCode.NOTIFICATION_EXCEPTION, exception, concatenatedMessage);
}
}
private List<String> getMessagesToNotify(String messagesAsJson) {
List<String> messages = new ArrayList<>();
try {
ArrayNode messageNodes = AtlasJson.parseToV1ArrayNode(messagesAsJson);
for (JsonNode messageNode : messageNodes) {
messages.add(AtlasJson.toV1Json(messageNode));
}
} catch (IOException e) {
messages.add(messagesAsJson);
}
return messages;
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.web.integration;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.utils.TestResourceFileUtils;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import static org.apache.atlas.kafka.KafkaNotification.ATLAS_HOOK_TOPIC;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
public class NotificationRestIT extends BaseResourceIT {
@Test
public void unAuthPostNotification() throws IOException {
AtlasClientV2 unAuthClient = new AtlasClientV2(atlasUrls, new String[]{"admin", "wr0ng_pa55w0rd"});
try {
unAuthClient.postNotificationToTopic(ATLAS_HOOK_TOPIC, new ArrayList<String>(Arrays.asList("Dummy")));
} catch(AtlasServiceException e) {
assertNotNull(e.getStatus(), "expected server error code in the status");
}
}
@Test
public void postNotificationBasicTest() throws Exception {
String db_name = "db_" + randomString();
String cluster_name = "cl" + randomString();
String qualifiedName = db_name + "@" + cluster_name;
String notificationString = TestResourceFileUtils.getJson("notifications/create-db")
.replaceAll("--name--", db_name).replaceAll("--clName--", cluster_name)
.replace("\"--ts--\"", String.valueOf((new Date()).getTime()));
try {
atlasClientV2.postNotificationToTopic(ATLAS_HOOK_TOPIC, new ArrayList<String>(Arrays.asList(notificationString)));
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
ArrayNode results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, qualifiedName));
return results.size() == 1;
}
});
} catch(AtlasServiceException e) {
assertNull(e.getStatus(), "expected no server error code in the status");
}
}
}

View File

@ -0,0 +1 @@
{"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"172.27.100.78","msgCreatedBy":"hive","msgCreationTime":"--ts--","spooled":false,"message":{"type":"ENTITY_CREATE_V2","user":"root","entities":{"entities":[{"typeName":"hive_db_ddl","attributes":{"serviceType":"hive","qualifiedName":"--name--@--clName--:--execTime--","execTime":"--execTime--","queryText":"create database --name--","name":"create database --name--","userName":"root"},"guid":"-34524943520905972","isIncomplete":false,"provenanceType":0,"version":0,"relationshipAttributes":{"db":{"typeName":"hive_db","uniqueAttributes":{"qualifiedName":"--name--@--clName--"},"relationshipType":"hive_db_ddl_queries"}},"proxy":false}]}}}

View File

@ -0,0 +1 @@
{"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"172.27.10.4","msgCreatedBy":"hive","msgCreationTime":"--ts--","message":{"type":"ENTITY_CREATE_V2","user":"hive","entities":{"referredEntities":{},"entities":[{"typeName":"hive_db","attributes":{"owner":"admin","ownerType":"USER","managedLocation":null,"qualifiedName":"--name--@--clName--","clusterName":"--clName--","name":"--name--","location":"some_location","parameters":{}},"guid":"-72583599150177287","isIncomplete":false,"provenanceType":0,"version":0,"proxy":false}]}}}

View File

@ -0,0 +1 @@
{"version":{"version":"1.0.0","versionParts":[1]},"msgCompressionKind":"NONE","msgSplitIdx":1,"msgSplitCount":1,"msgSourceIP":"172.27.72.140","msgCreatedBy":"hive","msgCreationTime":"--ts--","spooled":false,"message":{"type":"ENTITY_DELETE_V2","user":"hive","entities":[{"typeName":"hive_db","uniqueAttributes":{"qualifiedName":"--name--@--clName--"}}]}}