Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Alert Name to Publishers #17108

Merged
merged 2 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ private Map<UUID, Destination<ChangeEvent>> loadDestinationsMap(JobExecutionCont
context.getJobDetail().getJobDataMap().get(DESTINATION_MAP_KEY);
if (dMap == null) {
dMap = new HashMap<>();
for (SubscriptionDestination subscription : eventSubscription.getDestinations()) {
dMap.put(subscription.getId(), AlertFactory.getAlert(subscription));
for (SubscriptionDestination subscriptionDest : eventSubscription.getDestinations()) {
dMap.put(
subscriptionDest.getId(), AlertFactory.getAlert(eventSubscription, subscriptionDest));
}
context.getJobDetail().getJobDataMap().put(DESTINATION_MAP_KEY, dMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.openmetadata.schema.api.events.CreateEventSubscription.AlertType.ACTIVITY_FEED;

import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.apps.bundles.changeEvent.email.EmailPublisher;
Expand All @@ -12,14 +13,15 @@
import org.openmetadata.service.apps.bundles.changeEvent.slack.SlackEventPublisher;

public class AlertFactory {
public static Destination<ChangeEvent> getAlert(SubscriptionDestination config) {
public static Destination<ChangeEvent> getAlert(
EventSubscription subscription, SubscriptionDestination config) {
return switch (config.getType()) {
case SLACK -> new SlackEventPublisher(config);
case MS_TEAMS -> new MSTeamsPublisher(config);
case G_CHAT -> new GChatPublisher(config);
case WEBHOOK -> new GenericPublisher(config);
case EMAIL -> new EmailPublisher(config);
case ACTIVITY_FEED -> new ActivityFeedPublisher(config);
case SLACK -> new SlackEventPublisher(subscription, config);
case MS_TEAMS -> new MSTeamsPublisher(subscription, config);
case G_CHAT -> new GChatPublisher(subscription, config);
case WEBHOOK -> new GenericPublisher(subscription, config);
case EMAIL -> new EmailPublisher(subscription, config);
case ACTIVITY_FEED -> new ActivityFeedPublisher(subscription, config);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.openmetadata.schema.entity.events.SubscriptionStatus.Status.AWAITING_RETRY;
import static org.openmetadata.schema.entity.events.SubscriptionStatus.Status.FAILED;

import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.entity.events.SubscriptionStatus;
import org.openmetadata.service.events.errors.EventPublisherException;
Expand All @@ -27,6 +28,8 @@ public interface Destination<T> {

SubscriptionDestination getSubscriptionDestination();

EventSubscription getEventSubscriptionForDestination();

void close();

boolean getEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.schema.alert.type.EmailAlertConfig;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.Entity;
Expand All @@ -40,12 +41,15 @@ public class EmailPublisher implements Destination<ChangeEvent> {
private final CollectionDAO daoCollection;

@Getter private final SubscriptionDestination subscriptionDestination;
private final EventSubscription eventSubscription;

public EmailPublisher(SubscriptionDestination subscription) {
if (subscription.getType() == EMAIL) {
this.subscriptionDestination = subscription;
public EmailPublisher(
EventSubscription eventSubscription, SubscriptionDestination subscriptionDestination) {
if (subscriptionDestination.getType() == EMAIL) {
this.eventSubscription = eventSubscription;
this.subscriptionDestination = subscriptionDestination;
this.emailAlertConfig =
JsonUtils.convertValue(subscription.getConfig(), EmailAlertConfig.class);
JsonUtils.convertValue(subscriptionDestination.getConfig(), EmailAlertConfig.class);
this.daoCollection = Entity.getCollectionDAO();
} else {
throw new IllegalArgumentException("Email Alert Invoked with Illegal Type and Settings.");
Expand All @@ -57,9 +61,11 @@ public void sendMessage(ChangeEvent event) throws EventPublisherException {
try {
Set<String> receivers =
getTargetsForAlert(emailAlertConfig, subscriptionDestination.getCategory(), EMAIL, event);
EmailMessage emailMessage = emailDecorator.buildOutgoingMessage(event);
EmailMessage emailMessage =
emailDecorator.buildOutgoingMessage(eventSubscription.getFullyQualifiedName(), event);
for (String email : receivers) {
EmailUtil.sendChangeEventMail(email, emailMessage);
EmailUtil.sendChangeEventMail(
eventSubscription.getFullyQualifiedName(), email, emailMessage);
}
setSuccessStatus(System.currentTimeMillis());
} catch (Exception e) {
Expand All @@ -71,6 +77,11 @@ public void sendMessage(ChangeEvent event) throws EventPublisherException {
}
}

@Override
public EventSubscription getEventSubscriptionForDestination() {
return eventSubscription;
}

@Override
public boolean getEnabled() {
return subscriptionDestination.getEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.ChangeEvent;
Expand All @@ -38,10 +39,13 @@ public class ActivityFeedPublisher implements Destination<ChangeEvent> {
FeedRepository feedRepository = new FeedRepository();

@Getter private final SubscriptionDestination subscriptionDestination;
private final EventSubscription eventSubscription;

public ActivityFeedPublisher(SubscriptionDestination subscription) {
if (subscription.getType() == ACTIVITY_FEED) {
this.subscriptionDestination = subscription;
public ActivityFeedPublisher(
EventSubscription eventSubscription, SubscriptionDestination subscriptionDestination) {
if (subscriptionDestination.getType() == ACTIVITY_FEED) {
this.eventSubscription = eventSubscription;
this.subscriptionDestination = subscriptionDestination;
} else {
throw new IllegalArgumentException("Activity Alert Invoked with Illegal Type and Settings.");
}
Expand Down Expand Up @@ -73,6 +77,11 @@ public void sendMessage(ChangeEvent changeEvent) throws EventPublisherException
}
}

@Override
public EventSubscription getEventSubscriptionForDestination() {
return eventSubscription;
}

@Override
public boolean getEnabled() {
return subscriptionDestination.getEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.Webhook;
Expand All @@ -45,13 +46,18 @@ public class GChatPublisher implements Destination<ChangeEvent> {

@Getter private final SubscriptionDestination subscriptionDestination;

public GChatPublisher(SubscriptionDestination subscription) {
if (subscription.getType() == G_CHAT) {
this.subscriptionDestination = subscription;
this.webhook = JsonUtils.convertValue(subscription.getConfig(), Webhook.class);
private final EventSubscription eventSubscription;

public GChatPublisher(
EventSubscription eventSubscription, SubscriptionDestination subscriptionDestination) {
if (subscriptionDestination.getType() == G_CHAT) {
this.eventSubscription = eventSubscription;
this.subscriptionDestination = subscriptionDestination;
this.webhook = JsonUtils.convertValue(subscriptionDestination.getConfig(), Webhook.class);

// Build Client
client = getClient(subscription.getTimeout(), subscription.getReadTimeout());
client =
getClient(subscriptionDestination.getTimeout(), subscriptionDestination.getReadTimeout());

// Build Target
if (webhook != null && webhook.getEndpoint() != null) {
Expand All @@ -68,7 +74,9 @@ public GChatPublisher(SubscriptionDestination subscription) {
@Override
public void sendMessage(ChangeEvent event) throws EventPublisherException {
try {
GChatMessage gchatMessage = gChatMessageMessageDecorator.buildOutgoingMessage(event);
GChatMessage gchatMessage =
gChatMessageMessageDecorator.buildOutgoingMessage(
eventSubscription.getFullyQualifiedName(), event);
List<Invocation.Builder> targets =
getTargetsForWebhookAlert(
webhook, subscriptionDestination.getCategory(), G_CHAT, client, event);
Expand All @@ -86,6 +94,11 @@ public void sendMessage(ChangeEvent event) throws EventPublisherException {
}
}

@Override
public EventSubscription getEventSubscriptionForDestination() {
return eventSubscription;
}

@Override
public boolean getEnabled() {
return subscriptionDestination.getEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.Webhook;
Expand All @@ -43,14 +44,18 @@ public class GenericPublisher implements Destination<ChangeEvent> {
private final Webhook webhook;

@Getter private final SubscriptionDestination subscriptionDestination;
private final EventSubscription eventSubscription;

public GenericPublisher(SubscriptionDestination subscription) {
if (subscription.getType() == WEBHOOK) {
this.subscriptionDestination = subscription;
this.webhook = JsonUtils.convertValue(subscription.getConfig(), Webhook.class);
public GenericPublisher(
EventSubscription eventSubscription, SubscriptionDestination subscriptionDestination) {
if (subscriptionDestination.getType() == WEBHOOK) {
this.eventSubscription = eventSubscription;
this.subscriptionDestination = subscriptionDestination;
this.webhook = JsonUtils.convertValue(subscriptionDestination.getConfig(), Webhook.class);

// Build Client
this.client = getClient(subscription.getTimeout(), subscription.getReadTimeout());
this.client =
getClient(subscriptionDestination.getTimeout(), subscriptionDestination.getReadTimeout());
} else {
throw new IllegalArgumentException(
"GenericWebhook Alert Invoked with Illegal Type and Settings.");
Expand Down Expand Up @@ -104,6 +109,11 @@ private Invocation.Builder getTarget() {
return SecurityUtil.addHeaders(client.target(webhook.getEndpoint()), authHeaders);
}

@Override
public EventSubscription getEventSubscriptionForDestination() {
return eventSubscription;
}

@Override
public boolean getEnabled() {
return subscriptionDestination.getEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.Webhook;
Expand All @@ -46,14 +47,18 @@ public class MSTeamsPublisher implements Destination<ChangeEvent> {
private final Client client;

@Getter private final SubscriptionDestination subscriptionDestination;
private final EventSubscription eventSubscription;

public MSTeamsPublisher(SubscriptionDestination subscription) {
if (subscription.getType() == MS_TEAMS) {
this.subscriptionDestination = subscription;
this.webhook = JsonUtils.convertValue(subscription.getConfig(), Webhook.class);
public MSTeamsPublisher(
EventSubscription eventSubscription, SubscriptionDestination subscriptionDestination) {
if (subscriptionDestination.getType() == MS_TEAMS) {
this.eventSubscription = eventSubscription;
this.subscriptionDestination = subscriptionDestination;
this.webhook = JsonUtils.convertValue(subscriptionDestination.getConfig(), Webhook.class);

// Build Client
client = getClient(subscription.getTimeout(), subscription.getReadTimeout());
client =
getClient(subscriptionDestination.getTimeout(), subscriptionDestination.getReadTimeout());

// Build Target
if (webhook != null && webhook.getEndpoint() != null) {
Expand All @@ -70,7 +75,9 @@ public MSTeamsPublisher(SubscriptionDestination subscription) {
@Override
public void sendMessage(ChangeEvent event) throws EventPublisherException {
try {
TeamsMessage teamsMessage = teamsMessageFormatter.buildOutgoingMessage(event);
TeamsMessage teamsMessage =
teamsMessageFormatter.buildOutgoingMessage(
eventSubscription.getFullyQualifiedName(), event);
List<Invocation.Builder> targets =
getTargetsForWebhookAlert(
webhook, subscriptionDestination.getCategory(), MS_TEAMS, client, event);
Expand All @@ -97,6 +104,11 @@ public void sendMessage(ChangeEvent event) throws EventPublisherException {
}
}

@Override
public EventSubscription getEventSubscriptionForDestination() {
return eventSubscription;
}

@Override
public boolean getEnabled() {
return subscriptionDestination.getEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.Webhook;
Expand All @@ -44,14 +45,17 @@ public class SlackEventPublisher implements Destination<ChangeEvent> {
private Invocation.Builder target;
private final Client client;
@Getter private final SubscriptionDestination subscriptionDestination;
private final EventSubscription eventSubscription;

public SlackEventPublisher(SubscriptionDestination subscription) {
if (subscription.getType() == SLACK) {
this.subscriptionDestination = subscription;
this.webhook = JsonUtils.convertValue(subscription.getConfig(), Webhook.class);
public SlackEventPublisher(
EventSubscription eventSubscription, SubscriptionDestination subscriptionDest) {
if (subscriptionDest.getType() == SLACK) {
this.eventSubscription = eventSubscription;
this.subscriptionDestination = subscriptionDest;
this.webhook = JsonUtils.convertValue(subscriptionDest.getConfig(), Webhook.class);

// Build Client
client = getClient(subscription.getTimeout(), subscription.getReadTimeout());
client = getClient(subscriptionDest.getTimeout(), subscriptionDest.getReadTimeout());

// Build Target
if (webhook != null && webhook.getEndpoint() != null) {
Expand All @@ -68,7 +72,9 @@ public SlackEventPublisher(SubscriptionDestination subscription) {
@Override
public void sendMessage(ChangeEvent event) throws EventPublisherException {
try {
SlackMessage slackMessage = slackMessageFormatter.buildOutgoingMessage(event);
SlackMessage slackMessage =
slackMessageFormatter.buildOutgoingMessage(
eventSubscription.getFullyQualifiedName(), event);
List<Invocation.Builder> targets =
getTargetsForWebhookAlert(
webhook, subscriptionDestination.getCategory(), SLACK, client, event);
Expand All @@ -95,6 +101,11 @@ public void sendMessage(ChangeEvent event) throws EventPublisherException {
}
}

@Override
public EventSubscription getEventSubscriptionForDestination() {
return eventSubscription;
}

@Override
public boolean getEnabled() {
return subscriptionDestination.getEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ public String getEntityUrl(String prefix, String fqn, String additionalParams) {
}

@Override
public EmailMessage buildEntityMessage(ChangeEvent event) {
return getEmailMessage(createEntityMessage(event));
public EmailMessage buildEntityMessage(String publisherName, ChangeEvent event) {
return getEmailMessage(createEntityMessage(publisherName, event));
}

@Override
public EmailMessage buildThreadMessage(ChangeEvent event) {
return getEmailMessage(createThreadMessage(event));
public EmailMessage buildThreadMessage(String publisherName, ChangeEvent event) {
return getEmailMessage(createThreadMessage(publisherName, event));
}

public EmailMessage getEmailMessage(OutgoingMessage outgoingMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ public String getEntityUrl(String prefix, String fqn, String additionalParams) {
}

@Override
public FeedMessage buildEntityMessage(ChangeEvent event) {
public FeedMessage buildEntityMessage(String publisherName, ChangeEvent event) {
return null;
}

@Override
public FeedMessage buildThreadMessage(ChangeEvent event) {
public FeedMessage buildThreadMessage(String publisherName, ChangeEvent event) {
return null;
}
}
Loading
Loading