Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Port Github source to adapter/v2
Browse files Browse the repository at this point in the history
Fixes: #1199
  • Loading branch information
MIBc committed May 25, 2020
1 parent 5f84699 commit eb54651
Show file tree
Hide file tree
Showing 4 changed files with 314 additions and 355 deletions.
109 changes: 3 additions & 106 deletions github/cmd/receive_adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,114 +17,11 @@ limitations under the License.
package main

import (
"flag"
"fmt"
"log"
"net/http"
"os"
"knative.dev/eventing/pkg/adapter/v2"

gh "gopkg.in/go-playground/webhooks.v5/github"
github "knative.dev/eventing-contrib/github/pkg/adapter"
githubadapter "knative.dev/eventing-contrib/github/pkg/adapter"
)

const (
// Environment variable containing the HTTP port
envPort = "PORT"

// Environment variable containing GitHub secret token
envSecret = "GITHUB_SECRET_TOKEN"

// Environment variable containing information about the origin of the event
envOwnerRepo = "GITHUB_OWNER_REPO"
)

var validEvents = []gh.Event{
gh.CheckSuiteEvent,
gh.CommitCommentEvent,
gh.CommitCommentEvent,
gh.CreateEvent,
gh.DeleteEvent,
gh.DeploymentEvent,
gh.DeploymentStatusEvent,
gh.ForkEvent,
gh.GollumEvent,
gh.InstallationEvent,
gh.IntegrationInstallationEvent,
gh.IssueCommentEvent,
gh.IssuesEvent,
gh.LabelEvent,
gh.MemberEvent,
gh.MembershipEvent,
gh.MilestoneEvent,
gh.OrganizationEvent,
gh.OrgBlockEvent,
gh.PageBuildEvent,
gh.PingEvent,
gh.ProjectCardEvent,
gh.ProjectColumnEvent,
gh.ProjectEvent,
gh.PublicEvent,
gh.PullRequestEvent,
gh.PullRequestReviewEvent,
gh.PullRequestReviewCommentEvent,
gh.PushEvent,
gh.ReleaseEvent,
gh.RepositoryEvent,
gh.StatusEvent,
gh.TeamEvent,
gh.TeamAddEvent,
gh.WatchEvent,
}

func main() {
sink := flag.String("sink", "", "uri to send events to")

flag.Parse()

if sink == nil || *sink == "" {
log.Fatalf("No sink given")
}

port := os.Getenv(envPort)
if port == "" {
port = "8080"
}

secretToken := os.Getenv(envSecret)
if secretToken == "" {
log.Fatalf("No secret token given")
}

ownerRepo := os.Getenv(envOwnerRepo)
if ownerRepo == "" {
log.Fatalf("No ownerRepo given")
}

log.Printf("Sink is: %q, OwnerRepo is: %q", *sink, ownerRepo)

ra, err := github.New(*sink, ownerRepo)
if err != nil {
log.Fatalf("Failed to create github adapter: %s", err.Error())
}

hook, _ := gh.New(gh.Options.Secret(secretToken))

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
event, err := hook.Parse(r, validEvents...)
if err != nil {
if err == gh.ErrEventNotFound {
w.WriteHeader(http.StatusNotFound)

log.Print("Event not found")
return
}
w.WriteHeader(http.StatusBadRequest)
log.Printf("Error processing request: %s", err)
return
}

ra.HandleEvent(event, r.Header)
})
addr := fmt.Sprintf(":%s", port)
http.ListenAndServe(addr, nil)
adapter.Main("githubsource", githubadapter.NewEnvConfig, githubadapter.NewAdapter)
}
216 changes: 175 additions & 41 deletions github/pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,79 +19,213 @@ package adapter
import (
"context"
"fmt"
"log"
"net/http"
"strconv"
"strings"
"time"

cloudevents "github.com/cloudevents/sdk-go"
sourcesv1alpha1 "knative.dev/eventing-contrib/github/pkg/apis/sources/v1alpha1"
"knative.dev/eventing/pkg/adapter/v2"
"knative.dev/pkg/logging"

cloudevents "github.com/cloudevents/sdk-go/v2"
"go.uber.org/zap"
gh "gopkg.in/go-playground/webhooks.v5/github"
sourcesv1alpha1 "knative.dev/eventing-contrib/github/pkg/apis/sources/v1alpha1"
"knative.dev/eventing/pkg/kncloudevents"
)

const (
GHHeaderEvent = "GitHub-Event"
GHHeaderDelivery = "GitHub-Delivery"
GHHeaderEvent = "X-GitHub-Event"
GHHeaderDelivery = "X-GitHub-Delivery"
)

// Adapter converts incoming GitHub webhook events to CloudEvents
type Adapter struct {
client cloudevents.Client
source string
var validEvents = []gh.Event{
gh.CheckSuiteEvent,
gh.CommitCommentEvent,
gh.CommitCommentEvent,
gh.CreateEvent,
gh.DeleteEvent,
gh.DeploymentEvent,
gh.DeploymentStatusEvent,
gh.ForkEvent,
gh.GollumEvent,
gh.InstallationEvent,
gh.IntegrationInstallationEvent,
gh.IssueCommentEvent,
gh.IssuesEvent,
gh.LabelEvent,
gh.MemberEvent,
gh.MembershipEvent,
gh.MilestoneEvent,
gh.OrganizationEvent,
gh.OrgBlockEvent,
gh.PageBuildEvent,
gh.PingEvent,
gh.ProjectCardEvent,
gh.ProjectColumnEvent,
gh.ProjectEvent,
gh.PublicEvent,
gh.PullRequestEvent,
gh.PullRequestReviewEvent,
gh.PullRequestReviewCommentEvent,
gh.PushEvent,
gh.ReleaseEvent,
gh.RepositoryEvent,
gh.StatusEvent,
gh.TeamEvent,
gh.TeamAddEvent,
gh.WatchEvent,
}

// New creates an adapter to convert incoming GitHub webhook events to CloudEvents and
// then sends them to the specified Sink
func New(sinkURI, ownerRepo string) (*Adapter, error) {
a := new(Adapter)
var err error
a.client, err = kncloudevents.NewDefaultClient(sinkURI)
if err != nil {
return nil, err
type envConfig struct {
adapter.EnvConfig

// Environment variable containing GitHub secret token
EnvSecret string `envconfig:"GITHUB_SECRET_TOKEN" required:"true"`
// Environment variable containing the HTTP port
EnvPort string `envconfig:"PORT" default:"8080"`
// Environment variable containing information about the origin of the event
EnvOwnerRepo string `envconfig:"GITHUB_OWNER_REPO" required:"true"`
}

// NewEnvConfig function reads env variables defined in envConfig structure and
// returns accessor interface
func NewEnvConfig() adapter.EnvConfigAccessor {
return &envConfig{}
}

// gitHubAdapter converts incoming GitHub webhook events to CloudEvents
type gitHubAdapter struct {
logger *zap.SugaredLogger
client cloudevents.Client
secretToken string
port string
source string
}

// NewAdapter returns the instance of gitHubReceiveAdapter that implements adapter.Adapter interface
func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
logger := logging.FromContext(ctx)
env := processed.(*envConfig)

return &gitHubAdapter{
logger: logger,
client: ceClient,
port: env.EnvPort,
secretToken: env.EnvSecret,
source: sourcesv1alpha1.GitHubEventSource(env.EnvOwnerRepo),
}
source := sourcesv1alpha1.GitHubEventSource(ownerRepo)
// Check at startup if it's not a URLRef and return an error in that case.
src := cloudevents.ParseURLRef(source)
}

// Start implements adapter.Adapter
func (a *gitHubAdapter) Start(stopCh <-chan struct{}) error {
src := cloudevents.ParseURIRef(a.source)
if src == nil {
return nil, fmt.Errorf("invalid source for github events: %s", source)
return fmt.Errorf("invalid source for github events: %s", a.source)
}
done := make(chan bool, 1)
hook, err := gh.New(gh.Options.Secret(a.secretToken))
if err != nil {
return fmt.Errorf("cannot create gitHub hook: %v", err)
}
a.source = source
return a, nil

server := &http.Server{
Addr: ":" + a.port,
Handler: a.newRouter(hook),
}

go gracefullShutdown(server, a.logger, stopCh, done)

a.logger.Infof("Server is ready to handle requests at %s", server.Addr)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
return fmt.Errorf("could not listen on %s: %v", server.Addr, err)
}

<-done
a.logger.Infof("Server stopped")
return nil
}

func (a *gitHubAdapter) newRouter(hook *gh.Webhook) *http.ServeMux {
router := http.NewServeMux()
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
payload, err := hook.Parse(r, validEvents...)
if err != nil {
if err == gh.ErrEventNotFound {
w.WriteHeader(http.StatusNotFound)
a.logger.Info("Event not found")
return
}
w.WriteHeader(http.StatusBadRequest)
a.logger.Errorf("Error processing request: %v", err)
return
}
err = a.HandleEvent(payload, r.Header)
if err != nil {
a.logger.Errorf("Event handler error: %v", err)
w.WriteHeader(400)
w.Write([]byte(err.Error()))
return
}
a.logger.Infof("Event processed")
w.WriteHeader(202)
w.Write([]byte("accepted"))
})
return router
}

func gracefullShutdown(server *http.Server, logger *zap.SugaredLogger, stopCh <-chan struct{}, done chan<- bool) {
<-stopCh
logger.Info("Server is shutting down...")

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

server.SetKeepAlivesEnabled(false)
if err := server.Shutdown(ctx); err != nil {
logger.Fatalf("Could not gracefully shutdown the server: %v", err)
}
close(done)
}

// HandleEvent is invoked whenever an event comes in from GitHub
func (a *Adapter) HandleEvent(payload interface{}, header http.Header) {
func (a *gitHubAdapter) HandleEvent(payload interface{}, header http.Header) error {
hdr := http.Header(header)
err := a.handleEvent(payload, hdr)
if err != nil {
log.Printf("unexpected error handling GitHub event: %s", err)
}
return err
}

func (a *Adapter) handleEvent(payload interface{}, hdr http.Header) error {
gitHubEventType := hdr.Get("X-" + GHHeaderEvent)
eventID := hdr.Get("X-" + GHHeaderDelivery)
func (a *gitHubAdapter) handleEvent(payload interface{}, hdr http.Header) error {
gitHubEventType := hdr.Get(GHHeaderEvent)
if gitHubEventType == "" {
return fmt.Errorf("%q header is not set", GHHeaderEvent)
}
eventID := hdr.Get(GHHeaderDelivery)
if eventID == "" {
return fmt.Errorf("%q header is not set", GHHeaderDelivery)
}

log.Printf("Handling %s", gitHubEventType)
a.logger.Infof("Handling %s", gitHubEventType)

cloudEventType := sourcesv1alpha1.GitHubEventType(gitHubEventType)
subject := subjectFromGitHubEvent(gh.Event(gitHubEventType), payload)
subject := subjectFromGitHubEvent(gh.Event(gitHubEventType), payload, a.logger)

event := cloudevents.NewEvent(cloudevents.VersionV1)
event := cloudevents.NewEvent()
event.SetID(eventID)
event.SetType(cloudEventType)
event.SetSource(a.source)
event.SetSubject(subject)
event.SetDataContentType(cloudevents.ApplicationJSON)
event.SetData(payload)
if err := event.SetData(cloudevents.ApplicationJSON, payload); err != nil {
return fmt.Errorf("failed to marshal event data: %w", err)
}

_, _, err := a.client.Send(context.Background(), event)
return err
result := a.client.Send(context.Background(), event)
if !cloudevents.IsACK(result) {
return result
}
return nil
}

func subjectFromGitHubEvent(gitHubEvent gh.Event, payload interface{}) string {
func subjectFromGitHubEvent(gitHubEvent gh.Event, payload interface{}, logger *zap.SugaredLogger) string {
// The decision of what to put in subject is somewhat arbitrary here (i.e., it's the author's opinion)
// TODO check if we should be setting subject to these values.
var subject string
Expand Down Expand Up @@ -284,9 +418,9 @@ func subjectFromGitHubEvent(gitHubEvent gh.Event, payload interface{}) string {
}
}
if !ok {
log.Printf("Invalid payload in github event %s", gitHubEvent)
logger.Errorf("Invalid payload in gitHub event %s", gitHubEvent)
} else if subject == "" {
log.Printf("No subject found in github event %s", gitHubEvent)
logger.Warnf("No subject found in gitHub event %s", gitHubEvent)
}
return subject
}
Expand Down
Loading

0 comments on commit eb54651

Please sign in to comment.