diff --git a/github/cmd/receive_adapter/main.go b/github/cmd/receive_adapter/main.go index 45140b4f4a..4f422dc29e 100644 --- a/github/cmd/receive_adapter/main.go +++ b/github/cmd/receive_adapter/main.go @@ -17,114 +17,11 @@ limitations under the License. package main import ( - "flag" - "fmt" - "log" - "net/http" - "os" + "knative.dev/eventing/pkg/adapter/v2" - gh "gopkg.in/go-playground/webhooks.v5/github" - github "knative.dev/eventing-contrib/github/pkg/adapter" + githubadapter "knative.dev/eventing-contrib/github/pkg/adapter" ) -const ( - // Environment variable containing the HTTP port - envPort = "PORT" - - // Environment variable containing GitHub secret token - envSecret = "GITHUB_SECRET_TOKEN" - - // Environment variable containing information about the origin of the event - envOwnerRepo = "GITHUB_OWNER_REPO" -) - -var validEvents = []gh.Event{ - gh.CheckSuiteEvent, - gh.CommitCommentEvent, - gh.CommitCommentEvent, - gh.CreateEvent, - gh.DeleteEvent, - gh.DeploymentEvent, - gh.DeploymentStatusEvent, - gh.ForkEvent, - gh.GollumEvent, - gh.InstallationEvent, - gh.IntegrationInstallationEvent, - gh.IssueCommentEvent, - gh.IssuesEvent, - gh.LabelEvent, - gh.MemberEvent, - gh.MembershipEvent, - gh.MilestoneEvent, - gh.OrganizationEvent, - gh.OrgBlockEvent, - gh.PageBuildEvent, - gh.PingEvent, - gh.ProjectCardEvent, - gh.ProjectColumnEvent, - gh.ProjectEvent, - gh.PublicEvent, - gh.PullRequestEvent, - gh.PullRequestReviewEvent, - gh.PullRequestReviewCommentEvent, - gh.PushEvent, - gh.ReleaseEvent, - gh.RepositoryEvent, - gh.StatusEvent, - gh.TeamEvent, - gh.TeamAddEvent, - gh.WatchEvent, -} - func main() { - sink := flag.String("sink", "", "uri to send events to") - - flag.Parse() - - if sink == nil || *sink == "" { - log.Fatalf("No sink given") - } - - port := os.Getenv(envPort) - if port == "" { - port = "8080" - } - - secretToken := os.Getenv(envSecret) - if secretToken == "" { - log.Fatalf("No secret token given") - } - - ownerRepo := os.Getenv(envOwnerRepo) - if ownerRepo == "" { - log.Fatalf("No ownerRepo given") - } - - log.Printf("Sink is: %q, OwnerRepo is: %q", *sink, ownerRepo) - - ra, err := github.New(*sink, ownerRepo) - if err != nil { - log.Fatalf("Failed to create github adapter: %s", err.Error()) - } - - hook, _ := gh.New(gh.Options.Secret(secretToken)) - - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - event, err := hook.Parse(r, validEvents...) - if err != nil { - if err == gh.ErrEventNotFound { - w.WriteHeader(http.StatusNotFound) - - log.Print("Event not found") - return - } - w.WriteHeader(http.StatusBadRequest) - log.Printf("Error processing request: %s", err) - return - } - - ra.HandleEvent(event, r.Header) - }) - addr := fmt.Sprintf(":%s", port) - http.ListenAndServe(addr, nil) + adapter.Main("githubsource", githubadapter.NewEnvConfig, githubadapter.NewAdapter) } diff --git a/github/pkg/adapter/adapter.go b/github/pkg/adapter/adapter.go index 74e569e9c2..e06b033a7d 100644 --- a/github/pkg/adapter/adapter.go +++ b/github/pkg/adapter/adapter.go @@ -19,79 +19,213 @@ package adapter import ( "context" "fmt" - "log" "net/http" "strconv" "strings" + "time" - cloudevents "github.com/cloudevents/sdk-go" + sourcesv1alpha1 "knative.dev/eventing-contrib/github/pkg/apis/sources/v1alpha1" + "knative.dev/eventing/pkg/adapter/v2" + "knative.dev/pkg/logging" + cloudevents "github.com/cloudevents/sdk-go/v2" + "go.uber.org/zap" gh "gopkg.in/go-playground/webhooks.v5/github" - sourcesv1alpha1 "knative.dev/eventing-contrib/github/pkg/apis/sources/v1alpha1" - "knative.dev/eventing/pkg/kncloudevents" ) const ( - GHHeaderEvent = "GitHub-Event" - GHHeaderDelivery = "GitHub-Delivery" + GHHeaderEvent = "X-GitHub-Event" + GHHeaderDelivery = "X-GitHub-Delivery" ) -// Adapter converts incoming GitHub webhook events to CloudEvents -type Adapter struct { - client cloudevents.Client - source string +var validEvents = []gh.Event{ + gh.CheckSuiteEvent, + gh.CommitCommentEvent, + gh.CommitCommentEvent, + gh.CreateEvent, + gh.DeleteEvent, + gh.DeploymentEvent, + gh.DeploymentStatusEvent, + gh.ForkEvent, + gh.GollumEvent, + gh.InstallationEvent, + gh.IntegrationInstallationEvent, + gh.IssueCommentEvent, + gh.IssuesEvent, + gh.LabelEvent, + gh.MemberEvent, + gh.MembershipEvent, + gh.MilestoneEvent, + gh.OrganizationEvent, + gh.OrgBlockEvent, + gh.PageBuildEvent, + gh.PingEvent, + gh.ProjectCardEvent, + gh.ProjectColumnEvent, + gh.ProjectEvent, + gh.PublicEvent, + gh.PullRequestEvent, + gh.PullRequestReviewEvent, + gh.PullRequestReviewCommentEvent, + gh.PushEvent, + gh.ReleaseEvent, + gh.RepositoryEvent, + gh.StatusEvent, + gh.TeamEvent, + gh.TeamAddEvent, + gh.WatchEvent, } -// New creates an adapter to convert incoming GitHub webhook events to CloudEvents and -// then sends them to the specified Sink -func New(sinkURI, ownerRepo string) (*Adapter, error) { - a := new(Adapter) - var err error - a.client, err = kncloudevents.NewDefaultClient(sinkURI) - if err != nil { - return nil, err +type envConfig struct { + adapter.EnvConfig + + // Environment variable containing GitHub secret token + EnvSecret string `envconfig:"GITHUB_SECRET_TOKEN" required:"true"` + // Environment variable containing the HTTP port + EnvPort string `envconfig:"PORT" default:"8080"` + // Environment variable containing information about the origin of the event + EnvOwnerRepo string `envconfig:"GITHUB_OWNER_REPO" required:"true"` +} + +// NewEnvConfig function reads env variables defined in envConfig structure and +// returns accessor interface +func NewEnvConfig() adapter.EnvConfigAccessor { + return &envConfig{} +} + +// gitHubAdapter converts incoming GitHub webhook events to CloudEvents +type gitHubAdapter struct { + logger *zap.SugaredLogger + client cloudevents.Client + secretToken string + port string + source string +} + +// NewAdapter returns the instance of gitHubReceiveAdapter that implements adapter.Adapter interface +func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter { + logger := logging.FromContext(ctx) + env := processed.(*envConfig) + + return &gitHubAdapter{ + logger: logger, + client: ceClient, + port: env.EnvPort, + secretToken: env.EnvSecret, + source: sourcesv1alpha1.GitHubEventSource(env.EnvOwnerRepo), } - source := sourcesv1alpha1.GitHubEventSource(ownerRepo) - // Check at startup if it's not a URLRef and return an error in that case. - src := cloudevents.ParseURLRef(source) +} + +// Start implements adapter.Adapter +func (a *gitHubAdapter) Start(stopCh <-chan struct{}) error { + src := cloudevents.ParseURIRef(a.source) if src == nil { - return nil, fmt.Errorf("invalid source for github events: %s", source) + return fmt.Errorf("invalid source for github events: %s", a.source) + } + done := make(chan bool, 1) + hook, err := gh.New(gh.Options.Secret(a.secretToken)) + if err != nil { + return fmt.Errorf("cannot create gitHub hook: %v", err) } - a.source = source - return a, nil + + server := &http.Server{ + Addr: ":" + a.port, + Handler: a.newRouter(hook), + } + + go gracefullShutdown(server, a.logger, stopCh, done) + + a.logger.Infof("Server is ready to handle requests at %s", server.Addr) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + return fmt.Errorf("could not listen on %s: %v", server.Addr, err) + } + + <-done + a.logger.Infof("Server stopped") + return nil +} + +func (a *gitHubAdapter) newRouter(hook *gh.Webhook) *http.ServeMux { + router := http.NewServeMux() + router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + payload, err := hook.Parse(r, validEvents...) + if err != nil { + if err == gh.ErrEventNotFound { + w.WriteHeader(http.StatusNotFound) + a.logger.Info("Event not found") + return + } + w.WriteHeader(http.StatusBadRequest) + a.logger.Errorf("Error processing request: %v", err) + return + } + err = a.HandleEvent(payload, r.Header) + if err != nil { + a.logger.Errorf("Event handler error: %v", err) + w.WriteHeader(400) + w.Write([]byte(err.Error())) + return + } + a.logger.Infof("Event processed") + w.WriteHeader(202) + w.Write([]byte("accepted")) + }) + return router +} + +func gracefullShutdown(server *http.Server, logger *zap.SugaredLogger, stopCh <-chan struct{}, done chan<- bool) { + <-stopCh + logger.Info("Server is shutting down...") + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + server.SetKeepAlivesEnabled(false) + if err := server.Shutdown(ctx); err != nil { + logger.Fatalf("Could not gracefully shutdown the server: %v", err) + } + close(done) } // HandleEvent is invoked whenever an event comes in from GitHub -func (a *Adapter) HandleEvent(payload interface{}, header http.Header) { +func (a *gitHubAdapter) HandleEvent(payload interface{}, header http.Header) error { hdr := http.Header(header) err := a.handleEvent(payload, hdr) - if err != nil { - log.Printf("unexpected error handling GitHub event: %s", err) - } + return err } -func (a *Adapter) handleEvent(payload interface{}, hdr http.Header) error { - gitHubEventType := hdr.Get("X-" + GHHeaderEvent) - eventID := hdr.Get("X-" + GHHeaderDelivery) +func (a *gitHubAdapter) handleEvent(payload interface{}, hdr http.Header) error { + gitHubEventType := hdr.Get(GHHeaderEvent) + if gitHubEventType == "" { + return fmt.Errorf("%q header is not set", GHHeaderEvent) + } + eventID := hdr.Get(GHHeaderDelivery) + if eventID == "" { + return fmt.Errorf("%q header is not set", GHHeaderDelivery) + } - log.Printf("Handling %s", gitHubEventType) + a.logger.Infof("Handling %s", gitHubEventType) cloudEventType := sourcesv1alpha1.GitHubEventType(gitHubEventType) - subject := subjectFromGitHubEvent(gh.Event(gitHubEventType), payload) + subject := subjectFromGitHubEvent(gh.Event(gitHubEventType), payload, a.logger) - event := cloudevents.NewEvent(cloudevents.VersionV1) + event := cloudevents.NewEvent() event.SetID(eventID) event.SetType(cloudEventType) event.SetSource(a.source) event.SetSubject(subject) - event.SetDataContentType(cloudevents.ApplicationJSON) - event.SetData(payload) + if err := event.SetData(cloudevents.ApplicationJSON, payload); err != nil { + return fmt.Errorf("failed to marshal event data: %w", err) + } - _, _, err := a.client.Send(context.Background(), event) - return err + result := a.client.Send(context.Background(), event) + if !cloudevents.IsACK(result) { + return result + } + return nil } -func subjectFromGitHubEvent(gitHubEvent gh.Event, payload interface{}) string { +func subjectFromGitHubEvent(gitHubEvent gh.Event, payload interface{}, logger *zap.SugaredLogger) string { // The decision of what to put in subject is somewhat arbitrary here (i.e., it's the author's opinion) // TODO check if we should be setting subject to these values. var subject string @@ -284,9 +418,9 @@ func subjectFromGitHubEvent(gitHubEvent gh.Event, payload interface{}) string { } } if !ok { - log.Printf("Invalid payload in github event %s", gitHubEvent) + logger.Errorf("Invalid payload in gitHub event %s", gitHubEvent) } else if subject == "" { - log.Printf("No subject found in github event %s", gitHubEvent) + logger.Warnf("No subject found in gitHub event %s", gitHubEvent) } return subject } diff --git a/github/pkg/adapter/adapter_test.go b/github/pkg/adapter/adapter_test.go index 70a1381350..57d43cf58b 100644 --- a/github/pkg/adapter/adapter_test.go +++ b/github/pkg/adapter/adapter_test.go @@ -18,27 +18,31 @@ package adapter import ( "bytes" - "context" - "errors" + "encoding/json" "fmt" - "io/ioutil" "net/http" "net/http/httptest" "strconv" - "strings" "testing" + "time" "knative.dev/eventing-contrib/github/pkg/apis/sources/v1alpha1" + "knative.dev/eventing/pkg/adapter/v2" + adaptertest "knative.dev/eventing/pkg/adapter/v2/test" + "knative.dev/pkg/logging" + pkgtesting "knative.dev/pkg/reconciler/testing" + cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/google/go-cmp/cmp" - - cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" + "go.uber.org/zap" gh "gopkg.in/go-playground/webhooks.v5/github" ) const ( testSubject = "1234" testOwnerRepo = "test-user/test-repo" + secretToken = "gitHubsecret" + eventID = "12345" ) var ( @@ -50,16 +54,6 @@ type testCase struct { // name is a descriptive name for this test suitable as a first argument to t.Run() name string - // sink the response from the fake sink - sink func(http.ResponseWriter, *http.Request) - - // wantErr is true when we expect the test to return an error. - wantErr bool - - // wantErrMsg contains the pattern to match the returned error message. - // Implies wantErr = true. - wantErrMsg string - // payload contains the GitHub event payload payload interface{} @@ -82,9 +76,10 @@ var testCases = []testCase{ payload: func() interface{} { pl := gh.CheckSuitePayload{} id, _ := strconv.ParseInt(testSubject, 10, 64) - pl.Repository.ID = id + pl.CheckSuite.ID = id return pl }(), + eventID: eventID, eventType: "check_suite", wantCloudEventSubject: testSubject, }, { @@ -94,7 +89,9 @@ var testCases = []testCase{ pl.Comment.HTMLURL = fmt.Sprintf("http://test/%s", testSubject) return pl }(), + eventID: eventID, eventType: "commit_comment", + wantCloudEventType: "dev.knative.source.github.commit_comment", wantCloudEventSubject: testSubject, }, { name: "valid create", @@ -103,6 +100,7 @@ var testCases = []testCase{ pl.RefType = testSubject return pl }(), + eventID: eventID, eventType: "create", wantCloudEventSubject: testSubject, }, { @@ -112,6 +110,7 @@ var testCases = []testCase{ pl.RefType = testSubject return pl }(), + eventID: eventID, eventType: "delete", wantCloudEventSubject: testSubject, }, { @@ -122,6 +121,7 @@ var testCases = []testCase{ pl.Deployment.ID = subject return pl }(), + eventID: eventID, eventType: "deployment", wantCloudEventSubject: testSubject, }, { @@ -132,6 +132,7 @@ var testCases = []testCase{ pl.Deployment.ID = subject return pl }(), + eventID: eventID, eventType: "deployment_status", wantCloudEventSubject: testSubject, }, { @@ -142,6 +143,7 @@ var testCases = []testCase{ pl.Forkee.ID = subject return pl }(), + eventID: eventID, eventType: "fork", wantCloudEventSubject: testSubject, }, { @@ -151,6 +153,7 @@ var testCases = []testCase{ // Leaving the subject as empty. return pl }(), + eventID: eventID, eventType: "gollum", wantCloudEventSubject: "", }, { @@ -161,6 +164,7 @@ var testCases = []testCase{ pl.Installation.ID = subject return pl }(), + eventID: eventID, eventType: "installation", wantCloudEventSubject: testSubject, }, { @@ -171,6 +175,7 @@ var testCases = []testCase{ pl.Installation.ID = subject return pl }(), + eventID: eventID, eventType: "integration_installation", wantCloudEventSubject: testSubject, }, { @@ -180,6 +185,7 @@ var testCases = []testCase{ pl.Comment.HTMLURL = fmt.Sprintf("http://test/%s", testSubject) return pl }(), + eventID: eventID, eventType: "issue_comment", wantCloudEventSubject: testSubject, }, { @@ -190,6 +196,7 @@ var testCases = []testCase{ pl.Issue.Number = subject return pl }(), + eventID: eventID, eventType: "issues", wantCloudEventSubject: testSubject, }, { @@ -199,6 +206,7 @@ var testCases = []testCase{ pl.Label.Name = testSubject return pl }(), + eventID: eventID, eventType: "label", wantCloudEventSubject: testSubject, }, { @@ -209,6 +217,7 @@ var testCases = []testCase{ pl.Member.ID = subject return pl }(), + eventID: eventID, eventType: "member", wantCloudEventSubject: testSubject, }, { @@ -219,6 +228,7 @@ var testCases = []testCase{ pl.Member.ID = subject return pl }(), + eventID: eventID, eventType: "membership", wantCloudEventSubject: testSubject, }, { @@ -229,6 +239,7 @@ var testCases = []testCase{ pl.Milestone.Number = subject return pl }(), + eventID: eventID, eventType: "milestone", wantCloudEventSubject: testSubject, }, { @@ -238,6 +249,7 @@ var testCases = []testCase{ pl.Action = testSubject return pl }(), + eventID: eventID, eventType: "organization", wantCloudEventSubject: testSubject, }, { @@ -247,6 +259,7 @@ var testCases = []testCase{ pl.Action = testSubject return pl }(), + eventID: eventID, eventType: "org_block", wantCloudEventSubject: testSubject, }, { @@ -257,6 +270,7 @@ var testCases = []testCase{ pl.ID = subject return pl }(), + eventID: eventID, eventType: "page_build", wantCloudEventSubject: testSubject, }, { @@ -267,6 +281,7 @@ var testCases = []testCase{ pl.HookID = subject return pl }(), + eventID: eventID, eventType: "ping", wantCloudEventSubject: testSubject, }, { @@ -276,6 +291,7 @@ var testCases = []testCase{ pl.Action = testSubject return pl }(), + eventID: eventID, eventType: "project_card", wantCloudEventSubject: testSubject, }, { @@ -285,6 +301,7 @@ var testCases = []testCase{ pl.Action = testSubject return pl }(), + eventID: eventID, eventType: "project_column", wantCloudEventSubject: testSubject, }, { @@ -294,6 +311,7 @@ var testCases = []testCase{ pl.Action = testSubject return pl }(), + eventID: eventID, eventType: "project", wantCloudEventSubject: testSubject, }, { @@ -304,8 +322,10 @@ var testCases = []testCase{ pl.Repository.ID = subject return pl }(), + eventID: eventID, eventType: "public", wantCloudEventSubject: testSubject, + wantCloudEventType: "dev.knative.source.github.public", }, { name: "valid pull_request", payload: func() interface{} { @@ -314,6 +334,7 @@ var testCases = []testCase{ pl.PullRequest.Number = subject return pl }(), + eventID: eventID, eventType: "pull_request", wantCloudEventType: "dev.knative.source.github.pull_request", wantCloudEventSubject: testSubject, @@ -325,6 +346,7 @@ var testCases = []testCase{ pl.Review.ID = subject return pl }(), + eventID: eventID, eventType: "pull_request_review", wantCloudEventSubject: testSubject, }, { @@ -335,6 +357,7 @@ var testCases = []testCase{ pl.Comment.ID = subject return pl }(), + eventID: eventID, eventType: "pull_request_review_comment", wantCloudEventSubject: testSubject, }, { @@ -344,6 +367,7 @@ var testCases = []testCase{ pl.Compare = fmt.Sprintf("http://test/%s", testSubject) return pl }(), + eventID: eventID, eventType: "push", wantCloudEventSubject: testSubject, }, { @@ -353,6 +377,7 @@ var testCases = []testCase{ pl.Release.TagName = testSubject return pl }(), + eventID: eventID, eventType: "release", wantCloudEventSubject: testSubject, }, { @@ -363,6 +388,7 @@ var testCases = []testCase{ pl.Repository.ID = subject return pl }(), + eventID: eventID, eventType: "repository", wantCloudEventSubject: testSubject, }, { @@ -372,6 +398,7 @@ var testCases = []testCase{ pl.Sha = testSubject return pl }(), + eventID: eventID, eventType: "status", wantCloudEventSubject: testSubject, }, { @@ -382,6 +409,7 @@ var testCases = []testCase{ pl.Team.ID = subject return pl }(), + eventID: eventID, eventType: "team", wantCloudEventSubject: testSubject, }, { @@ -392,6 +420,7 @@ var testCases = []testCase{ pl.Repository.ID = subject return pl }(), + eventID: eventID, eventType: "team_add", wantCloudEventSubject: testSubject, }, { @@ -402,236 +431,125 @@ var testCases = []testCase{ pl.Repository.ID = subject return pl }(), + eventID: eventID, eventType: "watch", wantCloudEventSubject: testSubject, }, } -// mockTransport is a simple fake HTTP transport -type mockTransport func(req *http.Request) (*http.Response, error) +func newTestAdapter(t *testing.T, ce cloudevents.Client) *gitHubAdapter { + env := envConfig{ + EnvConfig: adapter.EnvConfig{ + Namespace: "default", + }, + EnvSecret: secretToken, + EnvPort: "8080", + EnvOwnerRepo: "test.repo", + } + ctx, _ := pkgtesting.SetupFakeContext(t) + logger := zap.NewExample().Sugar() + ctx = logging.WithLogger(ctx, logger) -// RoundTrip implements the required RoundTripper interface for -// mockTransport -func (mt mockTransport) RoundTrip(req *http.Request) (*http.Response, error) { - return mt(req) + return NewAdapter(ctx, &env, ce).(*gitHubAdapter) } -// TestAllCases runs all the table tests -func TestAllCases(t *testing.T) { - for _, tc := range testCases { - h := &fakeHandler{ - //handler: tc.sink, - handler: sinkAccepted, // No tests expect the sink to do anything interesting - } - sinkServer := httptest.NewServer(h) - defer sinkServer.Close() +func TestGracefullShutdown(t *testing.T) { + ce := adaptertest.NewTestClient() + ra := newTestAdapter(t, ce) + stopCh := make(chan struct{}, 1) + + go func(stopCh chan struct{}) { + defer close(stopCh) + time.Sleep(time.Second) - ra, err := New(sinkServer.URL, testOwnerRepo) + }(stopCh) + + t.Logf("starting webhook server") + err := ra.Start(stopCh) + if err != nil { + t.Error(err) + } +} + +func TestServer(t *testing.T) { + for _, tc := range testCases { + ce := adaptertest.NewTestClient() + adapter := newTestAdapter(t, ce) + hook, err := gh.New() if err != nil { t.Fatal(err) } + router := adapter.newRouter(hook) + server := httptest.NewServer(router) + defer server.Close() - t.Run(tc.name, tc.runner(t, *ra)) + t.Run(tc.name, tc.runner(t, server.URL, ce)) } } // runner returns a testing func that can be passed to t.Run. -func (tc *testCase) runner(t *testing.T, ra Adapter) func(t *testing.T) { +func (tc *testCase) runner(t *testing.T, url string, ceClient *adaptertest.TestCloudEventsClient) func(t *testing.T) { return func(t *testing.T) { if tc.eventType == "" { t.Fatal("eventType is required for table tests") } - eventID := "12345" - if tc.eventID != "" { - eventID = tc.eventID + body, _ := json.Marshal(tc.payload) + req, err := http.NewRequest("POST", url, bytes.NewReader(body)) + if err != nil { + t.Fatal(err) } - hdr := http.Header{} - hdr.Set("X-GitHub-Event", tc.eventType) - hdr.Set("X-GitHub-Delivery", eventID) - evtErr := ra.handleEvent(tc.payload, hdr) - if err := tc.verifyErr(evtErr); err != nil { + req.Header.Set(GHHeaderEvent, tc.eventType) + req.Header.Set(GHHeaderDelivery, eventID) + resp, err := http.DefaultClient.Do(req) + if err != nil { t.Error(err) } - } -} - -func (tc *testCase) handleRequest(req *http.Request) (*http.Response, error) { - - codec := cehttp.Codec{} - - body, err := ioutil.ReadAll(req.Body) - if err != nil { - return nil, err - } - msg := &cehttp.Message{ - Header: req.Header, - Body: body, - } - - event, err := codec.Decode(context.Background(), msg) - if err != nil { - return nil, fmt.Errorf("unexpected error decoding cloudevent: %s", err) - } + defer resp.Body.Close() - if tc.wantCloudEventType != "" && tc.wantCloudEventType != event.Type() { - return nil, fmt.Errorf("want cloud event type %s, got %s", - tc.wantCloudEventType, event.Type()) + tc.validateAcceptedPayload(t, ceClient) } - - gotSource := event.Source() - if testSource != gotSource { - return nil, fmt.Errorf("want source %s, got %s", testSource, gotSource) - } - - gotSubject := event.Context.GetSubject() - if tc.wantCloudEventSubject != "" && tc.wantCloudEventSubject != gotSubject { - return nil, fmt.Errorf("want subject %s, got %s", tc.wantCloudEventSubject, gotSubject) - } - - return &http.Response{ - StatusCode: 200, - Body: ioutil.NopCloser(bytes.NewBufferString("")), - Header: make(http.Header), - }, nil } -func (tc *testCase) verifyErr(err error) error { - wantErr := tc.wantErr || tc.wantErrMsg != "" - - if wantErr && err == nil { - return errors.New("want error, got nil") +func (tc *testCase) validateAcceptedPayload(t *testing.T, ce *adaptertest.TestCloudEventsClient) { + t.Helper() + if len(ce.Sent()) != 1 { + return } - - if !wantErr && err != nil { - return fmt.Errorf("want no error, got %v", err) + eventSubject := ce.Sent()[0].Subject() + if eventSubject != tc.wantCloudEventSubject { + t.Fatalf("Expected %q event subject to be sent, got %q", tc.wantCloudEventSubject, eventSubject) } - if err != nil { - if diff := cmp.Diff(tc.wantErrMsg, err.Error()); diff != "" { - return fmt.Errorf("incorrect error (-want, +got): %v", diff) + if tc.wantCloudEventType != "" { + eventType := ce.Sent()[0].Type() + if eventType != tc.wantCloudEventType { + t.Fatalf("Expected %q event type to be sent, got %q", tc.wantCloudEventType, eventType) } } - return nil -} - -// Direct Unit tests - -var ( - // Headers that are added to the response, but we don't want to check in our assertions. - unimportantHeaders = map[string]struct{}{ - "accept-encoding": {}, - "content-length": {}, - "user-agent": {}, - "ce-time": {}, - "ce-traceparent": {}, - "traceparent": {}, - "x-b3-sampled": {}, - "x-b3-spanid": {}, - "x-b3-traceid": {}, - } -) - -type requestValidation struct { - Host string - Headers http.Header - Body string -} -func TestHandleEvent(t *testing.T) { - eventID := "12345" - eventType := "pull_request" - - expectedRequest := requestValidation{ - Headers: map[string][]string{ - "ce-specversion": {"1.0"}, - "ce-id": {"12345"}, - "ce-time": {"2019-01-29T09:35:10.69383396-08:00"}, - "ce-type": {"dev.knative.source.github.pull_request"}, - "ce-source": {testSource}, - "ce-subject": {testSubject}, - "content-type": {"application/json"}, - }, - Body: `{"action":"","number":0,"pull_request":{"url":"","id":0,"node_id":"","html_url":"","diff_url":"","patch_url":"","issue_url":"","number":1234,"state":"","locked":false,"title":"","user":{"login":"","id":0,"node_id":"","avatar_url":"","gravatar_id":"","url":"","html_url":"","followers_url":"","following_url":"","gists_url":"","starred_url":"","subscriptions_url":"","organizations_url":"","repos_url":"","events_url":"","received_events_url":"","type":"","site_admin":false},"body":"","created_at":"0001-01-01T00:00:00Z","updated_at":"0001-01-01T00:00:00Z","closed_at":null,"merged_at":null,"merge_commit_sha":null,"assignee":null,"assignees":null,"milestone":null,"commits_url":"","review_comments_url":"","review_comment_url":"","comments_url":"","statuses_url":"","labels":null,"head":{"label":"","ref":"","sha":"","user":{"login":"","id":0,"node_id":"","avatar_url":"","gravatar_id":"","url":"","html_url":"","followers_url":"","following_url":"","gists_url":"","starred_url":"","subscriptions_url":"","organizations_url":"","repos_url":"","events_url":"","received_events_url":"","type":"","site_admin":false},"repo":{"id":0,"node_id":"","name":"","full_name":"","owner":{"login":"","id":0,"node_id":"","avatar_url":"","gravatar_id":"","url":"","html_url":"","followers_url":"","following_url":"","gists_url":"","starred_url":"","subscriptions_url":"","organizations_url":"","repos_url":"","events_url":"","received_events_url":"","type":"","site_admin":false},"private":false,"html_url":"","description":"","fork":false,"url":"","forks_url":"","keys_url":"","collaborators_url":"","teams_url":"","hooks_url":"","issue_events_url":"","events_url":"","assignees_url":"","branches_url":"","tags_url":"","blobs_url":"","git_tags_url":"","git_refs_url":"","trees_url":"","statuses_url":"","languages_url":"","stargazers_url":"","contributors_url":"","subscribers_url":"","subscription_url":"","commits_url":"","git_commits_url":"","comments_url":"","issue_comment_url":"","contents_url":"","compare_url":"","merges_url":"","archive_url":"","downloads_url":"","issues_url":"","pulls_url":"","milestones_url":"","notifications_url":"","labels_url":"","releases_url":"","created_at":"0001-01-01T00:00:00Z","updated_at":"0001-01-01T00:00:00Z","pushed_at":"0001-01-01T00:00:00Z","git_url":"","ssh_url":"","clone_url":"","svn_url":"","homepage":null,"size":0,"stargazers_count":0,"watchers_count":0,"language":null,"has_issues":false,"has_downloads":false,"has_wiki":false,"has_pages":false,"forks_count":0,"mirror_url":null,"open_issues_count":0,"forks":0,"open_issues":0,"watchers":0,"default_branch":""}},"base":{"label":"","ref":"","sha":"","user":{"login":"","id":0,"node_id":"","avatar_url":"","gravatar_id":"","url":"","html_url":"","followers_url":"","following_url":"","gists_url":"","starred_url":"","subscriptions_url":"","organizations_url":"","repos_url":"","events_url":"","received_events_url":"","type":"","site_admin":false},"repo":{"id":0,"node_id":"","name":"","full_name":"","owner":{"login":"","id":0,"node_id":"","avatar_url":"","gravatar_id":"","url":"","html_url":"","followers_url":"","following_url":"","gists_url":"","starred_url":"","subscriptions_url":"","organizations_url":"","repos_url":"","events_url":"","received_events_url":"","type":"","site_admin":false},"private":false,"html_url":"","description":"","fork":false,"url":"","forks_url":"","keys_url":"","collaborators_url":"","teams_url":"","hooks_url":"","issue_events_url":"","events_url":"","assignees_url":"","branches_url":"","tags_url":"","blobs_url":"","git_tags_url":"","git_refs_url":"","trees_url":"","statuses_url":"","languages_url":"","stargazers_url":"","contributors_url":"","subscribers_url":"","subscription_url":"","commits_url":"","git_commits_url":"","comments_url":"","issue_comment_url":"","contents_url":"","compare_url":"","merges_url":"","archive_url":"","downloads_url":"","issues_url":"","pulls_url":"","milestones_url":"","notifications_url":"","labels_url":"","releases_url":"","created_at":"0001-01-01T00:00:00Z","updated_at":"0001-01-01T00:00:00Z","pushed_at":"0001-01-01T00:00:00Z","git_url":"","ssh_url":"","clone_url":"","svn_url":"","homepage":null,"size":0,"stargazers_count":0,"watchers_count":0,"language":null,"has_issues":false,"has_downloads":false,"has_wiki":false,"has_pages":false,"forks_count":0,"mirror_url":null,"open_issues_count":0,"forks":0,"open_issues":0,"watchers":0,"default_branch":""}},"_links":{"self":{"href":""},"html":{"href":""},"issue":{"href":""},"comments":{"href":""},"review_comments":{"href":""},"review_comment":{"href":""},"commits":{"href":""},"statuses":{"href":""}},"merged":false,"mergeable":null,"mergeable_state":"","merged_by":null,"comments":0,"review_comments":0,"commits":0,"additions":0,"deletions":0,"changed_files":0},"label":{"id":0,"node_id":"","description":"","url":"","name":"","color":"","default":false},"repository":{"id":0,"node_id":"","name":"","full_name":"","owner":{"login":"","node_id":"","id":0,"avatar_url":"","gravatar_id":"","url":"","html_url":"","followers_url":"","following_url":"","gists_url":"","starred_url":"","subscriptions_url":"","organizations_url":"","repos_url":"","events_url":"","received_events_url":"","type":"","site_admin":false},"private":false,"html_url":"","description":"","fork":false,"url":"","forks_url":"","keys_url":"","collaborators_url":"","teams_url":"","hooks_url":"","issue_events_url":"","events_url":"","assignees_url":"","branches_url":"","tags_url":"","blobs_url":"","git_tags_url":"","git_refs_url":"","trees_url":"","statuses_url":"","languages_url":"","stargazers_url":"","contributors_url":"","subscribers_url":"","subscription_url":"","commits_url":"","git_commits_url":"","comments_url":"","issue_comment_url":"","contents_url":"","compare_url":"","merges_url":"","archive_url":"","downloads_url":"","issues_url":"","pulls_url":"","milestones_url":"","notifications_url":"","labels_url":"","releases_url":"","created_at":"0001-01-01T00:00:00Z","updated_at":"0001-01-01T00:00:00Z","pushed_at":"0001-01-01T00:00:00Z","git_url":"","ssh_url":"","clone_url":"","svn_url":"","homepage":null,"size":0,"stargazers_count":0,"watchers_count":0,"language":null,"has_issues":false,"has_downloads":false,"has_wiki":false,"has_pages":false,"forks_count":0,"mirror_url":null,"open_issues_count":0,"forks":0,"open_issues":0,"watchers":0,"default_branch":""},"sender":{"login":"","id":0,"node_id":"","avatar_url":"","gravatar_id":"","url":"","html_url":"","followers_url":"","following_url":"","gists_url":"","starred_url":"","subscriptions_url":"","organizations_url":"","repos_url":"","events_url":"","received_events_url":"","type":"","site_admin":false},"assignee":null,"requested_reviewer":null,"requested_team":{"name":"","id":0,"node_id":"","slug":"","description":"","privacy":"","url":"","html_url":"","members_url":"","repositories_url":"","permission":""},"installation":{"id":0}}`, + eventID := ce.Sent()[0].ID() + if eventID != tc.eventID { + t.Fatalf("Expected %q event id to be sent, got %q", tc.eventID, eventID) } + data := ce.Sent()[0].Data() - h := &fakeHandler{ - //handler: tc.sink, - handler: sinkAccepted, // No tests expect the sink to do anything interesting - } - sinkServer := httptest.NewServer(h) - defer sinkServer.Close() + var got interface{} + var want interface{} - ra, err := New(sinkServer.URL, testOwnerRepo) + err := json.Unmarshal(data, &got) if err != nil { - t.Fatal(err) - } - - payload := gh.PullRequestPayload{} - subject, _ := strconv.ParseInt(testSubject, 10, 64) - payload.PullRequest.Number = subject - header := http.Header{} - header.Set("X-"+GHHeaderEvent, eventType) - header.Set("X-"+GHHeaderDelivery, eventID) - ra.HandleEvent(payload, http.Header(header)) - - // TODO(https://knative.dev/pkg/issues/250): clean this up when there is a shared test client. - - canonicalizeHeaders(expectedRequest) - if diff := cmp.Diff(expectedRequest.Headers, h.header); diff != "" { - t.Errorf("Unexpected difference (-want, +got): %v", diff) - } - - if diff := cmp.Diff(expectedRequest.Body, string(h.body)); diff != "" { - t.Errorf("Unexpected difference (-want, +got): %v", diff) + t.Fatalf("Could not unmarshal sent data: %v", err) } -} - -type fakeHandler struct { - body []byte - header http.Header - - handler func(http.ResponseWriter, *http.Request) -} - -func (h *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - body, err := ioutil.ReadAll(r.Body) + payload, err := json.Marshal(tc.payload) if err != nil { - http.Error(w, "can not read body", http.StatusBadRequest) - return + t.Fatalf("Could not marshal sent payload: %v", err) } - h.body = body - h.header = make(map[string][]string) - - for n, v := range r.Header { - ln := strings.ToLower(n) - if _, present := unimportantHeaders[ln]; !present { - h.header[ln] = v - } + err = json.Unmarshal(payload, &want) + if err != nil { + t.Fatalf("Could not unmarshal sent payload: %v", err) } - - defer r.Body.Close() - h.handler(w, r) -} - -func sinkAccepted(writer http.ResponseWriter, req *http.Request) { - writer.WriteHeader(http.StatusOK) -} - -func sinkRejected(writer http.ResponseWriter, _ *http.Request) { - writer.WriteHeader(http.StatusRequestTimeout) -} - -func canonicalizeHeaders(rvs ...requestValidation) { - // HTTP header names are case-insensitive, so normalize them to lower case for comparison. - for _, rv := range rvs { - headers := rv.Headers - for n, v := range headers { - delete(headers, n) - ln := strings.ToLower(n) - if _, present := unimportantHeaders[ln]; !present { - headers[ln] = v - } - } + if diff := cmp.Diff(want, got); diff != "" { + t.Fatalf("unexpected event data (-want, +got) = %v", diff) } } diff --git a/github/pkg/reconciler/source/resources/service.go b/github/pkg/reconciler/source/resources/service.go index fe41df0bd8..4d00f7327e 100644 --- a/github/pkg/reconciler/source/resources/service.go +++ b/github/pkg/reconciler/source/resources/service.go @@ -47,13 +47,24 @@ func MakeService(args *ServiceArgs) *v1.Service { SecretKeyRef: args.Source.Spec.SecretToken.SecretKeyRef, }, }, { - Name: "SINK", + Name: "K_SINK", Value: sinkURI.String(), }, { Name: "GITHUB_OWNER_REPO", Value: args.Source.Spec.OwnerAndRepository, + }, { + Name: "NAMESPACE", + Value: args.Source.Namespace, + }, { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, { + Name: "K_METRICS_CONFIG", + Value: "", + }, { + Name: "K_LOGGING_CONFIG", + Value: "", }} - containerArgs := []string{fmt.Sprintf("--sink=%s", sinkURI.String())} return &v1.Service{ ObjectMeta: metav1.ObjectMeta{ GenerateName: fmt.Sprintf("%s-", args.Source.Name), @@ -71,7 +82,6 @@ func MakeService(args *ServiceArgs) *v1.Service { Containers: []corev1.Container{{ Image: args.ReceiveAdapterImage, Env: env, - Args: containerArgs, }}, }, },