Skip to content

Commit

Permalink
Merge pull request #1920 from HubSpot/sns_webhooks
Browse files Browse the repository at this point in the history
Add ability to do sns-based updates instead of webhooks
  • Loading branch information
ssalinas authored Apr 25, 2019
2 parents f0b0b7e + 90eef2e commit e1060d3
Show file tree
Hide file tree
Showing 26 changed files with 668 additions and 75 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ jdk:
- oraclejdk8

install: false
script: mvn -B -q -DskipSingularityWebUI verify
script: mvn -B -DskipSingularityWebUI verify
cache:
directories:
- $HOME/.m2
17 changes: 17 additions & 0 deletions Docs/reference/webhooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@ Singularity provides webhooks for changes to the three core types of objects in

Webhooks are managed via the [API](api.html) and a separate webhook should be added separately in order to receive updates about all three object types.

### SNS Topic Updates

By default, Singularity will use it's own internal queue of webhooks backed by zookeeper. If you would like to save load on zookeeper or have more flexibility in the consumption of task/deploy/request updates, you can instead configure Singularity to produce messages to SNS. In the configuration yaml, you can specify:

```yaml
webhookQueue:
queueType: SNS
awsAccessKey: {my key}
awsSecretKey: {my secret}
snsTopics:
TASK: singularity-task-updates
DEPLOY: singularity-deploy-updates
REQUEST: singularity-request-updates
```
This will cause Singularity to create these topics if they do not exist, and publish messages to SNS rather than sending its own webhooks. The content of these messages still follows the same format outlined below.
### Adding a Webhook
In order to create a new Webhook, post the json for the [SingularityWebhook](api.html) to the [webhook endpoint](api.html).
Expand Down
5 changes: 5 additions & 0 deletions SingularityService/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sns</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,14 @@
import com.hubspot.singularity.guice.DropwizardObjectMapperProvider;
import com.hubspot.singularity.helpers.SingularityS3Service;
import com.hubspot.singularity.helpers.SingularityS3Services;
import com.hubspot.singularity.hooks.AbstractWebhookChecker;
import com.hubspot.singularity.hooks.LoadBalancerClient;
import com.hubspot.singularity.hooks.LoadBalancerClientImpl;
import com.hubspot.singularity.hooks.SingularityWebhookPoller;
import com.hubspot.singularity.hooks.SingularityWebhookSender;
import com.hubspot.singularity.hooks.SnsWebhookManager;
import com.hubspot.singularity.hooks.SnsWebhookRetryer;
import com.hubspot.singularity.hooks.WebhookQueueType;
import com.hubspot.singularity.managed.SingularityLifecycleManaged;
import com.hubspot.singularity.mesos.OfferCache;
import com.hubspot.singularity.mesos.SingularityMesosStatusUpdateHandler;
Expand Down Expand Up @@ -149,7 +153,12 @@ public void configure(Binder binder) {

binder.bind(SingularityAbort.class).in(Scopes.SINGLETON);
binder.bind(SingularityExceptionNotifierManaged.class).in(Scopes.SINGLETON);
binder.bind(SingularityWebhookSender.class).in(Scopes.SINGLETON);
if (configuration.getWebhookQueueConfiguration().getQueueType() == WebhookQueueType.SNS) {
binder.bind(SnsWebhookManager.class).in(Scopes.SINGLETON);
binder.bind(AbstractWebhookChecker.class).to(SnsWebhookRetryer.class).in(Scopes.SINGLETON);
} else {
binder.bind(AbstractWebhookChecker.class).to(SingularityWebhookSender.class).in(Scopes.SINGLETON);
}

binder.bind(SingularityUsageHelper.class).in(Scopes.SINGLETON);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void configure(Binder binder) {
// API Docs
getEnvironment().jersey().register(SingularityOpenApiResource.class);

binder.install(new SingularityEventModule());
binder.install(new SingularityEventModule(getConfiguration().getWebhookQueueConfiguration()));
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ public class SingularityConfiguration extends Configuration {
@Valid
private WebhookAuthConfiguration webhookAuthConfiguration = new WebhookAuthConfiguration();

@JsonProperty("webhookQueue")
@Valid
private WebhookQueueConfiguration webhookQueueConfiguration = new WebhookQueueConfiguration();

private int maxConcurrentWebhooks = 100;

@JsonProperty("auth")
Expand Down Expand Up @@ -1272,6 +1276,14 @@ public void setWebhookAuthConfiguration(WebhookAuthConfiguration webhookAuthConf
this.webhookAuthConfiguration = webhookAuthConfiguration;
}

public WebhookQueueConfiguration getWebhookQueueConfiguration() {
return webhookQueueConfiguration;
}

public void setWebhookQueueConfiguration(WebhookQueueConfiguration webhookQueueConfiguration) {
this.webhookQueueConfiguration = webhookQueueConfiguration;
}

public int getMaxConcurrentWebhooks() {
return maxConcurrentWebhooks;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package com.hubspot.singularity.config;

import java.util.Map;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.hubspot.singularity.WebhookType;
import com.hubspot.singularity.hooks.WebhookQueueType;

public class WebhookQueueConfiguration {
@JsonProperty
private WebhookQueueType queueType = WebhookQueueType.ZOOKEEPER;

@JsonProperty
private Map<WebhookType, String> snsTopics = ImmutableMap.of(
WebhookType.TASK, "singularity-task-updates",
WebhookType.DEPLOY, "singularity-deploy-updates",
WebhookType.REQUEST, "singularity-request-updates"
);

@JsonProperty
private Optional<String> awsAccessKey = Optional.absent();

@JsonProperty
private Optional<String> awsSecretKey = Optional.absent();

@JsonProperty
private Optional<String> awsRegion = Optional.absent();

private int snsRequestTimeout = 3000;

private int snsSocketTimeout = 3000;

private int snsConnectTimeout = 2000;

private int snsTotalTimeout = 5000;

// Protection for zookeeper so large list children calls will not take it down
private int maxZkQueuedWebhooksPerParentNode = 3000;

public WebhookQueueType getQueueType() {
return queueType;
}

public void setQueueType(WebhookQueueType queueType) {
this.queueType = queueType;
}

public Map<WebhookType, String> getSnsTopics() {
return snsTopics;
}

public void setSnsTopics(Map<WebhookType, String> snsTopics) {
this.snsTopics = snsTopics;
}

public Optional<String> getAwsAccessKey() {
return awsAccessKey;
}

public void setAwsAccessKey(Optional<String> awsAccessKey) {
this.awsAccessKey = awsAccessKey;
}

public Optional<String> getAwsSecretKey() {
return awsSecretKey;
}

public void setAwsSecretKey(Optional<String> awsSecretKey) {
this.awsSecretKey = awsSecretKey;
}

public Optional<String> getAwsRegion() {
return awsRegion;
}

public void setAwsRegion(Optional<String> awsRegion) {
this.awsRegion = awsRegion;
}

public int getSnsRequestTimeout() {
return snsRequestTimeout;
}

public void setSnsRequestTimeout(int snsRequestTimeout) {
this.snsRequestTimeout = snsRequestTimeout;
}

public int getSnsSocketTimeout() {
return snsSocketTimeout;
}

public void setSnsSocketTimeout(int snsSocketTimeout) {
this.snsSocketTimeout = snsSocketTimeout;
}

public int getSnsConnectTimeout() {
return snsConnectTimeout;
}

public void setSnsConnectTimeout(int snsConnectTimeout) {
this.snsConnectTimeout = snsConnectTimeout;
}

public int getSnsTotalTimeout() {
return snsTotalTimeout;
}

public void setSnsTotalTimeout(int snsTotalTimeout) {
this.snsTotalTimeout = snsTotalTimeout;
}

public int getMaxZkQueuedWebhooksPerParentNode() {
return maxZkQueuedWebhooksPerParentNode;
}

public void setMaxZkQueuedWebhooksPerParentNode(int maxZkQueuedWebhooksPerParentNode) {
this.maxZkQueuedWebhooksPerParentNode = maxZkQueuedWebhooksPerParentNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ protected void configure() {
bind(SingularityValidator.class).in(Scopes.SINGLETON);
bind(UserManager.class).in(Scopes.SINGLETON);
bind(UsageManager.class).in(Scopes.SINGLETON);
bind(WebhookManager.class).in(Scopes.SINGLETON);

bind(NotificationsManager.class).in(Scopes.SINGLETON);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.hubspot.singularity.SingularityTaskShellCommandRequestId;
import com.hubspot.singularity.SingularityTaskShellCommandUpdate;
import com.hubspot.singularity.SingularityTaskStatusHolder;
import com.hubspot.singularity.SingularityTaskWebhook;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.transcoders.IdTranscoder;
import com.hubspot.singularity.data.transcoders.StringTranscoder;
Expand Down Expand Up @@ -583,7 +584,10 @@ public SingularityCreateResult saveTaskHistoryUpdate(SingularityTaskHistoryUpdat

@Timed
public SingularityCreateResult saveTaskHistoryUpdate(SingularityTaskHistoryUpdate taskHistoryUpdate, boolean overwriteExisting) {
singularityEventListener.taskHistoryUpdateEvent(taskHistoryUpdate);
Optional<SingularityTask> task = getTask(taskHistoryUpdate.getTaskId());
if (task.isPresent()) {
singularityEventListener.taskHistoryUpdateEvent(new SingularityTaskWebhook(task.get(), taskHistoryUpdate));
}

if (overwriteExisting) {
Optional<SingularityTaskHistoryUpdate> maybeExisting = getTaskHistoryUpdate(taskHistoryUpdate.getTaskId(), taskHistoryUpdate.getTaskState());
Expand Down Expand Up @@ -612,7 +616,10 @@ public SingularityCreateResult saveTaskHistoryUpdate(SingularityTaskHistoryUpdat

public SingularityDeleteResult deleteTaskHistoryUpdate(SingularityTaskId taskId, ExtendedTaskState state, Optional<SingularityTaskHistoryUpdate> previousStateUpdate) {
if (previousStateUpdate.isPresent()) {
singularityEventListener.taskHistoryUpdateEvent(previousStateUpdate.get());
Optional<SingularityTask> task = getTask(previousStateUpdate.get().getTaskId());
if (task.isPresent()) {
singularityEventListener.taskHistoryUpdateEvent(new SingularityTaskWebhook(task.get(), previousStateUpdate.get()));
}
}
if (leaderCache.active()) {
leaderCache.deleteTaskHistoryUpdate(taskId, state);
Expand Down
Loading

0 comments on commit e1060d3

Please sign in to comment.