Skip to content

Commit

Permalink
Add GetDX processor (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
edigaryev authored Sep 3, 2024
1 parent a76f0e2 commit 331ef5c
Show file tree
Hide file tree
Showing 14 changed files with 519 additions and 160 deletions.
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,27 @@ The following command-line arguments are supported:
* `--http-path` (`string`) — HTTP path on which the webhook events will be expected (defaults to `/`)
* `--secret-token` (`string`) — if specified, this value will be used as a HMAC SHA-256 secret to verify the webhook events

### Example
## GetDX processor

This processor receives, enriches and streams Cirrus CI webhook events to DX's Data Cloud API.

### Usage

```
docker run -it --rm ghcr.io/cirruslabs/cirrus-webhooks-server:latest getdx
```

The following command-line arguments are supported:

* `--dx-instance` (`string`) — DX instance to use when sending webhook events as DX Pipeline events to the Data Cloud API
* `--dx-api-key` (`string`) — API key to use when sending webhook events as DX Pipeline events to the Data Cloud API
* `--http-addr` (`string`) — address on which the HTTP server will listen on (defaults to `:8080`)
* `--http-path` (`string`) — HTTP path on which the webhook events will be expected (defaults to `/`)
* `--secret-token` (`string`) — if specified, this value will be used as a HMAC SHA-256 secret to verify the webhook events

## Example

In this example, we'll receive Cirrus CI webhooks events using the Datadog processor.

The simplest way to try this processor is to use Docker and [ngrok](https://ngrok.com/).

Expand Down
35 changes: 31 additions & 4 deletions cmd/cws.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,48 @@ package main

import (
"context"
"fmt"
"github.com/cirruslabs/cirrus-webhooks-server/internal/command"
"log"
"github.com/cirruslabs/cirrus-webhooks-server/internal/logginglevel"
"go.uber.org/zap"
"os"
"os/signal"
)

func main() {
if !mainImpl() {
os.Exit(1)
}
}

func mainImpl() bool {
// Set up a signal-interruptible context
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

// Initialize logger
cfg := zap.NewProductionConfig()
cfg.Level = logginglevel.Level
logger, err := cfg.Build()
if err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)

return false
}
defer func() {
_ = logger.Sync()
}()

// Replace zap.L() and zap.S() to avoid
// propagating the *zap.Logger by hand
zap.ReplaceGlobals(logger)

// Run the command
if err := command.NewRootCmd().ExecuteContext(ctx); err != nil {
cancel()
log.Fatal(err)
logger.Sugar().Error(err)

return false
}

cancel()
return true
}
169 changes: 29 additions & 140 deletions internal/command/datadog/datadog.go
Original file line number Diff line number Diff line change
@@ -1,75 +1,47 @@
package datadog

import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"github.com/brpaz/echozap"
payloadpkg "github.com/cirruslabs/cirrus-webhooks-server/internal/command/datadog/payload"
"github.com/cirruslabs/cirrus-webhooks-server/internal/datadogsender"
mapset "github.com/deckarep/golang-set/v2"
"github.com/cirruslabs/cirrus-webhooks-server/internal/server"
"github.com/labstack/echo/v4"
"github.com/spf13/cobra"
"go.uber.org/zap"
"io"
"net/http"
"strings"
"time"
)

var debug bool
var httpAddr string
var httpPath string
var eventTypes []string
var secretToken string
var dogstatsdAddr string
var apiKey string
var apiSite string

var (
ErrDatadogFailed = errors.New("failed to stream Cirrus CI events to Datadog")
ErrSignatureVerificationFailed = errors.New("event signature verification failed")
ErrDatadogFailed = errors.New("failed to stream Cirrus CI events to Datadog")
)

func NewCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "datadog",
Short: "Stream Cirrus CI webhook events to Datadog",
RunE: runDatadog,
RunE: run,
}

cmd.PersistentFlags().BoolVar(&debug, "debug", false, "enable debug logging")
cmd.PersistentFlags().StringVar(&httpAddr, "http-addr", ":8080",
"address on which the HTTP server will listen on")
cmd.PersistentFlags().StringVar(&httpPath, "http-path", "/",
"HTTP path on which the webhook events will be expected")
cmd.PersistentFlags().StringSliceVar(&eventTypes, "event-types", []string{},
"comma-separated list of the event types to limit processing to "+
"(for example, --event-types=audit_event or --event-types=build,task")
cmd.PersistentFlags().StringVar(&secretToken, "secret-token", "",
"if specified, this value will be used as a HMAC SHA-256 secret to verify the webhook events")
server.AppendFlags(cmd)

cmd.PersistentFlags().StringVar(&dogstatsdAddr, "dogstatsd-addr", "",
"enables sending webhook events as Datadog events via the DogStatsD protocol to the specified address "+
"(for example, --dogstatsd-addr=127.0.0.1:8125)")
cmd.PersistentFlags().StringVar(&apiKey, "api-key", "",
"Enables sending webhook events as Datadog logs via the Datadog API using the specified API key")
"enables sending webhook events as Datadog logs via the Datadog API using the specified API key")
cmd.PersistentFlags().StringVar(&apiSite, "api-site", "datadoghq.com",
"specifies the Datadog site to use when sending webhook events as Datadog logs via the Datadog API")

return cmd
}

func runDatadog(cmd *cobra.Command, args []string) error {
// Initialize the logger
config := zap.NewProductionConfig()
if debug {
config.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
}
logger := zap.Must(config.Build()).Sugar()

func run(cmd *cobra.Command, _ []string) error {
// Initialize a Datadog sender
var sender datadogsender.Sender
var err error
Expand All @@ -88,99 +60,43 @@ func runDatadog(cmd *cobra.Command, args []string) error {
return err
}

// Convert event types to a set for faster lookup
eventTypesSet := mapset.NewSet[string](eventTypes...)

// Configure HTTP server
e := echo.New()

e.Use(echozap.ZapLogger(logger.Desugar()))

e.POST(httpPath, func(ctx echo.Context) error {
return processWebhookEvent(ctx, logger, sender, eventTypesSet)
})

server := &http.Server{
Addr: httpAddr,
Handler: e,
ReadHeaderTimeout: 10 * time.Second,
}

logger.Infof("starting HTTP server on %s", httpAddr)

httpServerErrCh := make(chan error, 1)

go func() {
httpServerErrCh <- server.ListenAndServe()
}()

select {
case <-cmd.Context().Done():
if err := server.Close(); err != nil {
return err
}
case httpServerErr := <-httpServerErrCh:
return httpServerErr
}

return <-httpServerErrCh
return server.New(func(ctx echo.Context, presentedEventType string, body []byte, logger *zap.SugaredLogger) error {
return processWebhookEvent(ctx, presentedEventType, body, sender, logger)
}, zap.S()).Run(cmd.Context())
}

func processWebhookEvent(
ctx echo.Context,
logger *zap.SugaredLogger,
presentedEventType string,
body []byte,
sender datadogsender.Sender,
eventTypesSet mapset.Set[string],
logger *zap.SugaredLogger,
) error {
// Make sure this is an event we're looking for
presentedEventType := ctx.Request().Header.Get("X-Cirrus-Event")

if eventTypesSet.Cardinality() != 0 && !eventTypesSet.Contains(presentedEventType) {
logger.Debugf("skipping event of type %q because we only process events of types %s",
presentedEventType, strings.Join(eventTypesSet.ToSlice(), ", "))

return ctx.String(http.StatusOK, fmt.Sprintf("skipping event of type %q", presentedEventType))
}

body, err := io.ReadAll(ctx.Request().Body)
if err != nil {
logger.Warnf("failed to read request's body: %v", err)
// Decode the event
var payload payloadpkg.Payload

return ctx.NoContent(http.StatusBadRequest)
switch presentedEventType {
case "audit_event":
payload = &payloadpkg.AuditEvent{}
case "build", "task":
payload = &payloadpkg.BuildOrTask{}
default:
return nil
}

// Verify that this event comes from the Cirrus CI
if err := verifyEvent(ctx, body); err != nil {
logger.Warnf("%v", err)

return ctx.NoContent(http.StatusBadRequest)
if err := json.Unmarshal(body, payload); err != nil {
return fmt.Errorf("failed to enrich Datadog event with tags: "+
"failed to parse the webhook event of type %q as JSON: %v", presentedEventType, err)
}

// Log this event into the Datadog
// Create a new Datadog event and enrich it with tags
evt := &datadogsender.Event{
Title: "Webhook event",
Text: string(body),
Tags: []string{fmt.Sprintf("webhook_event_type:%s", presentedEventType)},
}

// Enrich the event with tags
var payload payloadpkg.Payload

switch presentedEventType {
case "audit_event":
payload = &payloadpkg.AuditEvent{}
case "build", "task":
payload = &payloadpkg.BuildOrTask{}
}

if payload != nil {
if err = json.Unmarshal(body, payload); err != nil {
logger.Warnf("failed to enrich Datadog event with tags: "+
"failed to parse the webhook event of type %q as JSON: %v", presentedEventType, err)
} else {
payload.Enrich(ctx.Request().Header, evt, logger)
}
}
payload.Enrich(ctx.Request().Header, evt, logger)

// Datadog silently discards log events submitted with a
// timestamp that is more than 18 hours in the past, sigh.
Expand All @@ -191,37 +107,10 @@ func processWebhookEvent(
"18 hours in the past, it'll likely going to be discarded", presentedEventType)
}

message, err := sender.SendEvent(ctx.Request().Context(), evt)
if err != nil {
// Log this event to Datadog
if err := sender.SendEvent(ctx.Request().Context(), evt); err != nil {
return fmt.Errorf("%w: %v", ErrDatadogFailed, err)
}

return ctx.String(http.StatusCreated, message)
}

func verifyEvent(ctx echo.Context, body []byte) error {
// Nothing to do
if secretToken == "" {
return nil
}

// Calculate the expected signature
hmacSHA256 := hmac.New(sha256.New, []byte(secretToken))
hmacSHA256.Write(body)
expectedSignature := hmacSHA256.Sum(nil)

// Prepare the presented signature
presentedSignatureRaw := ctx.Request().Header.Get("X-Cirrus-Signature")
presentedSignature, err := hex.DecodeString(presentedSignatureRaw)
if err != nil {
return fmt.Errorf("%w: failed to hex-decode the signature %q: %v",
ErrSignatureVerificationFailed, presentedSignatureRaw, err)
}

// Compare signatures
if !hmac.Equal(expectedSignature, presentedSignature) {
return fmt.Errorf("%w: signature is not valid", ErrSignatureVerificationFailed)
}

return nil
}
17 changes: 10 additions & 7 deletions internal/command/datadog/payload/buildortask.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,25 @@ import (

type BuildOrTask struct {
Build struct {
ID *int64 `json:"id"`
Status *string `json:"status"`
Branch *string `json:"branch"`
PullRequest *int64 `json:"pullRequest"`
User struct {
ID *int64 `json:"id"`
Status *string `json:"status"`
Branch *string `json:"branch"`
PullRequest *int64 `json:"pullRequest"`
ChangeIDInRepo *string `json:"changeIdInRepo"`
User struct {
Username *string `json:"username"`
} `json:"user"`
}
} `json:"build"`
Task struct {
ID *int64 `json:"id"`
Name *string `json:"name"`
Status *string `json:"status"`
StatusTimestamp *int64 `json:"statusTimestamp"`
InstanceType *string `json:"instanceType"`
UniqueLabels []string `json:"uniqueLabels"`
ManualRerunCount *int64 `json:"manualRerunCount"`
}
LocalGroupID *int64 `json:"localGroupId"`
} `json:"task"`

common
}
Expand Down
Loading

0 comments on commit 331ef5c

Please sign in to comment.