Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Port Github source to adapter/v2 #1230

Merged
merged 1 commit into from
May 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 3 additions & 106 deletions github/cmd/receive_adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,114 +17,11 @@ limitations under the License.
package main

import (
"flag"
"fmt"
"log"
"net/http"
"os"
"knative.dev/eventing/pkg/adapter/v2"

gh "gopkg.in/go-playground/webhooks.v5/github"
github "knative.dev/eventing-contrib/github/pkg/adapter"
githubadapter "knative.dev/eventing-contrib/github/pkg/adapter"
)

const (
// Environment variable containing the HTTP port
envPort = "PORT"

// Environment variable containing GitHub secret token
envSecret = "GITHUB_SECRET_TOKEN"

// Environment variable containing information about the origin of the event
envOwnerRepo = "GITHUB_OWNER_REPO"
)

var validEvents = []gh.Event{
gh.CheckSuiteEvent,
gh.CommitCommentEvent,
gh.CommitCommentEvent,
gh.CreateEvent,
gh.DeleteEvent,
gh.DeploymentEvent,
gh.DeploymentStatusEvent,
gh.ForkEvent,
gh.GollumEvent,
gh.InstallationEvent,
gh.IntegrationInstallationEvent,
gh.IssueCommentEvent,
gh.IssuesEvent,
gh.LabelEvent,
gh.MemberEvent,
gh.MembershipEvent,
gh.MilestoneEvent,
gh.OrganizationEvent,
gh.OrgBlockEvent,
gh.PageBuildEvent,
gh.PingEvent,
gh.ProjectCardEvent,
gh.ProjectColumnEvent,
gh.ProjectEvent,
gh.PublicEvent,
gh.PullRequestEvent,
gh.PullRequestReviewEvent,
gh.PullRequestReviewCommentEvent,
gh.PushEvent,
gh.ReleaseEvent,
gh.RepositoryEvent,
gh.StatusEvent,
gh.TeamEvent,
gh.TeamAddEvent,
gh.WatchEvent,
}

func main() {
sink := flag.String("sink", "", "uri to send events to")

flag.Parse()

if sink == nil || *sink == "" {
log.Fatalf("No sink given")
}

port := os.Getenv(envPort)
if port == "" {
port = "8080"
}

secretToken := os.Getenv(envSecret)
if secretToken == "" {
log.Fatalf("No secret token given")
}

ownerRepo := os.Getenv(envOwnerRepo)
if ownerRepo == "" {
log.Fatalf("No ownerRepo given")
}

log.Printf("Sink is: %q, OwnerRepo is: %q", *sink, ownerRepo)

ra, err := github.New(*sink, ownerRepo)
if err != nil {
log.Fatalf("Failed to create github adapter: %s", err.Error())
}

hook, _ := gh.New(gh.Options.Secret(secretToken))

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
event, err := hook.Parse(r, validEvents...)
if err != nil {
if err == gh.ErrEventNotFound {
w.WriteHeader(http.StatusNotFound)

log.Print("Event not found")
return
}
w.WriteHeader(http.StatusBadRequest)
log.Printf("Error processing request: %s", err)
return
}

ra.HandleEvent(event, r.Header)
})
addr := fmt.Sprintf(":%s", port)
http.ListenAndServe(addr, nil)
adapter.Main("githubsource", githubadapter.NewEnvConfig, githubadapter.NewAdapter)
}
216 changes: 175 additions & 41 deletions github/pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,79 +19,213 @@ package adapter
import (
"context"
"fmt"
"log"
"net/http"
"strconv"
"strings"
"time"

cloudevents "github.com/cloudevents/sdk-go"
sourcesv1alpha1 "knative.dev/eventing-contrib/github/pkg/apis/sources/v1alpha1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's group this import together with the other third-party packages. Or even better: group all knative.dev imports together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank for your review. I wil think about them. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"knative.dev/eventing/pkg/adapter/v2"
"knative.dev/pkg/logging"

cloudevents "github.com/cloudevents/sdk-go/v2"
"go.uber.org/zap"
gh "gopkg.in/go-playground/webhooks.v5/github"
sourcesv1alpha1 "knative.dev/eventing-contrib/github/pkg/apis/sources/v1alpha1"
"knative.dev/eventing/pkg/kncloudevents"
)

const (
GHHeaderEvent = "GitHub-Event"
GHHeaderDelivery = "GitHub-Delivery"
GHHeaderEvent = "X-GitHub-Event"
GHHeaderDelivery = "X-GitHub-Delivery"
)

// Adapter converts incoming GitHub webhook events to CloudEvents
type Adapter struct {
client cloudevents.Client
source string
var validEvents = []gh.Event{
gh.CheckSuiteEvent,
gh.CommitCommentEvent,
gh.CommitCommentEvent,
gh.CreateEvent,
gh.DeleteEvent,
gh.DeploymentEvent,
gh.DeploymentStatusEvent,
gh.ForkEvent,
gh.GollumEvent,
gh.InstallationEvent,
gh.IntegrationInstallationEvent,
gh.IssueCommentEvent,
gh.IssuesEvent,
gh.LabelEvent,
gh.MemberEvent,
gh.MembershipEvent,
gh.MilestoneEvent,
gh.OrganizationEvent,
gh.OrgBlockEvent,
gh.PageBuildEvent,
gh.PingEvent,
gh.ProjectCardEvent,
gh.ProjectColumnEvent,
gh.ProjectEvent,
gh.PublicEvent,
gh.PullRequestEvent,
gh.PullRequestReviewEvent,
gh.PullRequestReviewCommentEvent,
gh.PushEvent,
gh.ReleaseEvent,
gh.RepositoryEvent,
gh.StatusEvent,
gh.TeamEvent,
gh.TeamAddEvent,
gh.WatchEvent,
}

// New creates an adapter to convert incoming GitHub webhook events to CloudEvents and
// then sends them to the specified Sink
func New(sinkURI, ownerRepo string) (*Adapter, error) {
a := new(Adapter)
var err error
a.client, err = kncloudevents.NewDefaultClient(sinkURI)
if err != nil {
return nil, err
type envConfig struct {
adapter.EnvConfig

// Environment variable containing GitHub secret token
EnvSecret string `envconfig:"GITHUB_SECRET_TOKEN" required:"true"`
// Environment variable containing the HTTP port
EnvPort string `envconfig:"PORT" default:"8080"`
// Environment variable containing information about the origin of the event
EnvOwnerRepo string `envconfig:"GITHUB_OWNER_REPO" required:"true"`
}

// NewEnvConfig function reads env variables defined in envConfig structure and
// returns accessor interface
func NewEnvConfig() adapter.EnvConfigAccessor {
return &envConfig{}
}

// gitHubAdapter converts incoming GitHub webhook events to CloudEvents
type gitHubAdapter struct {
logger *zap.SugaredLogger
client cloudevents.Client
secretToken string
port string
source string
}

// NewAdapter returns the instance of gitHubReceiveAdapter that implements adapter.Adapter interface
func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
logger := logging.FromContext(ctx)
env := processed.(*envConfig)

return &gitHubAdapter{
logger: logger,
client: ceClient,
port: env.EnvPort,
secretToken: env.EnvSecret,
source: sourcesv1alpha1.GitHubEventSource(env.EnvOwnerRepo),
}
source := sourcesv1alpha1.GitHubEventSource(ownerRepo)
// Check at startup if it's not a URLRef and return an error in that case.
src := cloudevents.ParseURLRef(source)
}

// Start implements adapter.Adapter
func (a *gitHubAdapter) Start(stopCh <-chan struct{}) error {
src := cloudevents.ParseURIRef(a.source)
if src == nil {
return nil, fmt.Errorf("invalid source for github events: %s", source)
return fmt.Errorf("invalid source for github events: %s", a.source)
}
done := make(chan bool, 1)
hook, err := gh.New(gh.Options.Secret(a.secretToken))
if err != nil {
return fmt.Errorf("cannot create gitHub hook: %v", err)
}
a.source = source
return a, nil

server := &http.Server{
Addr: ":" + a.port,
Handler: a.newRouter(hook),
}

go gracefullShutdown(server, a.logger, stopCh, done)

a.logger.Infof("Server is ready to handle requests at %s", server.Addr)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
return fmt.Errorf("could not listen on %s: %v", server.Addr, err)
}

<-done
a.logger.Infof("Server stopped")
return nil
}

func (a *gitHubAdapter) newRouter(hook *gh.Webhook) *http.ServeMux {
router := http.NewServeMux()
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
payload, err := hook.Parse(r, validEvents...)
if err != nil {
if err == gh.ErrEventNotFound {
w.WriteHeader(http.StatusNotFound)
a.logger.Info("Event not found")
return
}
w.WriteHeader(http.StatusBadRequest)
a.logger.Errorf("Error processing request: %v", err)
return
}
err = a.HandleEvent(payload, r.Header)
if err != nil {
a.logger.Errorf("Event handler error: %v", err)
w.WriteHeader(400)
w.Write([]byte(err.Error()))
return
}
a.logger.Infof("Event processed")
w.WriteHeader(202)
w.Write([]byte("accepted"))
})
return router
}

func gracefullShutdown(server *http.Server, logger *zap.SugaredLogger, stopCh <-chan struct{}, done chan<- bool) {
<-stopCh
logger.Info("Server is shutting down...")

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

server.SetKeepAlivesEnabled(false)
if err := server.Shutdown(ctx); err != nil {
logger.Fatalf("Could not gracefully shutdown the server: %v", err)
}
close(done)
}

// HandleEvent is invoked whenever an event comes in from GitHub
func (a *Adapter) HandleEvent(payload interface{}, header http.Header) {
func (a *gitHubAdapter) HandleEvent(payload interface{}, header http.Header) error {
hdr := http.Header(header)
err := a.handleEvent(payload, hdr)
if err != nil {
log.Printf("unexpected error handling GitHub event: %s", err)
}
return err
}

func (a *Adapter) handleEvent(payload interface{}, hdr http.Header) error {
gitHubEventType := hdr.Get("X-" + GHHeaderEvent)
eventID := hdr.Get("X-" + GHHeaderDelivery)
func (a *gitHubAdapter) handleEvent(payload interface{}, hdr http.Header) error {
gitHubEventType := hdr.Get(GHHeaderEvent)
if gitHubEventType == "" {
return fmt.Errorf("%q header is not set", GHHeaderEvent)
}
eventID := hdr.Get(GHHeaderDelivery)
if eventID == "" {
return fmt.Errorf("%q header is not set", GHHeaderDelivery)
}

log.Printf("Handling %s", gitHubEventType)
a.logger.Infof("Handling %s", gitHubEventType)

cloudEventType := sourcesv1alpha1.GitHubEventType(gitHubEventType)
subject := subjectFromGitHubEvent(gh.Event(gitHubEventType), payload)
subject := subjectFromGitHubEvent(gh.Event(gitHubEventType), payload, a.logger)

event := cloudevents.NewEvent(cloudevents.VersionV1)
event := cloudevents.NewEvent()
event.SetID(eventID)
event.SetType(cloudEventType)
event.SetSource(a.source)
event.SetSubject(subject)
event.SetDataContentType(cloudevents.ApplicationJSON)
event.SetData(payload)
if err := event.SetData(cloudevents.ApplicationJSON, payload); err != nil {
return fmt.Errorf("failed to marshal event data: %w", err)
}

_, _, err := a.client.Send(context.Background(), event)
return err
result := a.client.Send(context.Background(), event)
if !cloudevents.IsACK(result) {
return result
}
return nil
}

func subjectFromGitHubEvent(gitHubEvent gh.Event, payload interface{}) string {
func subjectFromGitHubEvent(gitHubEvent gh.Event, payload interface{}, logger *zap.SugaredLogger) string {
// The decision of what to put in subject is somewhat arbitrary here (i.e., it's the author's opinion)
// TODO check if we should be setting subject to these values.
var subject string
Expand Down Expand Up @@ -284,9 +418,9 @@ func subjectFromGitHubEvent(gitHubEvent gh.Event, payload interface{}) string {
}
}
if !ok {
log.Printf("Invalid payload in github event %s", gitHubEvent)
logger.Errorf("Invalid payload in gitHub event %s", gitHubEvent)
} else if subject == "" {
log.Printf("No subject found in github event %s", gitHubEvent)
logger.Warnf("No subject found in gitHub event %s", gitHubEvent)
}
return subject
}
Expand Down
Loading