Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
adding e2e test PubSubSource->Broker with PubSubChannel (#550)
Browse files Browse the repository at this point in the history
* adding e2e test PubSubSource->Broker with PubSubChannel

* refactoring

* address review comments
  • Loading branch information
Nicolas Lopez authored Feb 19, 2020
1 parent 7075f74 commit 818488f
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 72 deletions.
7 changes: 7 additions & 0 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ func TestBrokerWithPubSubChannel(t *testing.T) {
BrokerWithPubSubChannelTestImpl(t)
}

// TestCloudPubSubSourceBrokerWithPubSubChannel tests we can knock a Knative Service from a broker with PubSub Channel from a CloudPubSubSource.
func TestCloudPubSubSourceBrokerWithPubSubChannel(t *testing.T) {
cancel := logstream.Start(t)
defer cancel()
PubSubSourceBrokerWithPubSubChannelTestImpl(t)
}

// TestCloudStorageSource tests we can knock down a target from a CloudStorageSource.
func TestCloudStorageSource(t *testing.T) {
cancel := logstream.Start(t)
Expand Down
79 changes: 79 additions & 0 deletions test/e2e/lib/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2020 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lib

import (
"net/http"
"os"
"testing"
"time"

"github.com/google/knative-gcp/pkg/apis/events/v1alpha1"
kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing"
"github.com/google/knative-gcp/test/e2e/lib/metrics"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgmetrics "knative.dev/pkg/metrics"
)

func MakePubSubOrDie(t *testing.T,
client *Client,
gvk metav1.GroupVersionKind,
psName, targetName, topicName string,
so ...kngcptesting.CloudPubSubSourceOption,
) {
so = append(so, kngcptesting.WithCloudPubSubSourceSink(gvk, targetName))
so = append(so, kngcptesting.WithCloudPubSubSourceTopic(topicName))
eventsPubsub := kngcptesting.NewCloudPubSubSource(psName, client.Namespace, so...)
client.CreatePubSubOrFail(eventsPubsub)

client.Core.WaitForResourceReadyOrFail(psName, CloudPubSubSourceTypeMeta)
}

func AssertMetrics(t *testing.T, client *Client, topicName, psName string) {
sleepTime := 1 * time.Minute
t.Logf("Sleeping %s to make sure metrics were pushed to stackdriver", sleepTime.String())
time.Sleep(sleepTime)

// If we reach this point, the projectID should have been set.
projectID := os.Getenv(ProwProjectKey)
f := map[string]interface{}{
"metric.type": EventCountMetricType,
"resource.type": GlobalMetricResourceType,
"metric.label.resource_group": PubsubResourceGroup,
"metric.label.event_type": v1alpha1.CloudPubSubSourcePublish,
"metric.label.event_source": v1alpha1.CloudPubSubSourceEventSource(projectID, topicName),
"metric.label.namespace_name": client.Namespace,
"metric.label.name": psName,
// We exit the target image before sending a response, thus check for 500.
"metric.label.response_code": http.StatusInternalServerError,
"metric.label.response_code_class": pkgmetrics.ResponseCodeClass(http.StatusInternalServerError),
}

filter := metrics.StringifyStackDriverFilter(f)
t.Logf("Filter expression: %s", filter)

actualCount, err := client.StackDriverEventCountMetricFor(client.Namespace, projectID, filter)
if err != nil {
t.Errorf("failed to get stackdriver event count metric: %v", err)
t.Fail()
}
expectedCount := int64(1)
if *actualCount != expectedCount {
t.Errorf("Actual count different than expected count, actual: %d, expected: %d", actualCount, expectedCount)
t.Fail()
}
}
120 changes: 99 additions & 21 deletions test/e2e/test_broker_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,25 @@ limitations under the License.
package e2e

import (
"context"
"encoding/json"
"net/url"
"testing"
"time"

"cloud.google.com/go/pubsub"
v1 "k8s.io/api/core/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
eventingtestlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/duck"
eventingtestresources "knative.dev/eventing/test/lib/resources"
"knative.dev/pkg/apis"
"knative.dev/pkg/test/helpers"

// The following line to load the gcp plugin (only required to authenticate against GKE clusters).
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing"
"github.com/google/knative-gcp/test/e2e/lib"
"github.com/google/knative-gcp/test/e2e/lib/resources"
)
Expand Down Expand Up @@ -61,6 +66,99 @@ func BrokerWithPubSubChannelTestImpl(t *testing.T) {
client := lib.Setup(t, true)
defer lib.TearDown(client)

u := createBrokerWithPubSubChannel(t,
client,
brokerName,
dummyTriggerName,
kserviceName,
respTriggerName,
targetName,
)

// Just to make sure all resources are ready.
time.Sleep(5 * time.Second)

// Create a sender Job to sender the event.
senderJob := resources.SenderJob(senderName, []v1.EnvVar{{
Name: "BROKER_URL",
Value: u.String(),
}})
client.CreateJobOrFail(senderJob)

// Check if dummy CloudEvent is sent out.
if done := jobDone(client, senderName, t); !done {
t.Error("dummy event wasn't sent to broker")
t.Failed()
}
// Check if resp CloudEvent hits the target Service.
if done := jobDone(client, targetName, t); !done {
t.Error("resp event didn't hit the target pod")
t.Failed()
}
}

func PubSubSourceBrokerWithPubSubChannelTestImpl(t *testing.T) {
topicName, deleteTopic := lib.MakeTopicOrDie(t)
defer deleteTopic()

brokerName := helpers.AppendRandomString("pubsub")
dummyTriggerName := "dummy-broker-" + brokerName
psName := helpers.AppendRandomString(topicName + "-pubsub")
respTriggerName := "resp-broker-" + brokerName
kserviceName := helpers.AppendRandomString("kservice")
targetName := helpers.AppendRandomString(topicName + "-target")

client := lib.Setup(t, true)
defer lib.TearDown(client)

u := createBrokerWithPubSubChannel(t,
client,
brokerName,
dummyTriggerName,
kserviceName,
respTriggerName,
targetName,
)
var url apis.URL = apis.URL(u)
// Just to make sure all resources are ready.
time.Sleep(5 * time.Second)

// Create the PubSub source.
lib.MakePubSubOrDie(t,
client,
lib.ServiceGVK,
psName,
targetName,
topicName,
kngcptesting.WithCloudPubSubSourceSinkURI(&url),
)

topic := lib.GetTopic(t, topicName)

r := topic.Publish(context.TODO(), &pubsub.Message{
Attributes: map[string]string{
"target": "falldown",
},
Data: []byte(`{"foo":bar}`),
})

_, err := r.Get(context.TODO())
if err != nil {
t.Logf("%s", err)
}

// Check if resp CloudEvent hits the target Service.
if done := jobDone(client, targetName, t); !done {
t.Error("resp event didn't hit the target pod")
t.Failed()
}
// TODO(nlopezgi): assert StackDriver metrics after https://github.com/google/knative-gcp/issues/317 is resolved
}

func createBrokerWithPubSubChannel(t *testing.T,
client *lib.Client,
brokerName, dummyTriggerName, kserviceName, respTriggerName, targetName string,
) url.URL {
// Create a new Broker.
// TODO(chizhg): maybe we don't need to create these RBAC resources as they will now be automatically created?
client.Core.CreateRBACResourcesForBrokers()
Expand Down Expand Up @@ -109,27 +207,7 @@ func BrokerWithPubSubChannelTestImpl(t *testing.T) {
if err != nil {
t.Error(err.Error())
}

// Just to make sure all resources are ready.
time.Sleep(5 * time.Second)

// Create a sender Job to sender the event.
senderJob := resources.SenderJob(senderName, []v1.EnvVar{{
Name: "BROKER_URL",
Value: u.String(),
}})
client.CreateJobOrFail(senderJob)

// Check if dummy CloudEvent is sent out.
if done := jobDone(client, senderName, t); !done {
t.Error("dummy event wasn't sent to broker")
t.Failed()
}
// Check if resp CloudEvent hits the target Service.
if done := jobDone(client, targetName, t); !done {
t.Error("resp event didn't hit the target pod")
t.Failed()
}
return u
}

func jobDone(client *lib.Client, podName string, t *testing.T) bool {
Expand Down
57 changes: 6 additions & 51 deletions test/e2e/test_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,16 @@ package e2e
import (
"context"
"encoding/json"
"net/http"
"os"
"testing"
"time"

"cloud.google.com/go/pubsub"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgmetrics "knative.dev/pkg/metrics"
"knative.dev/pkg/test/helpers"

"github.com/google/knative-gcp/pkg/apis/events/v1alpha1"
kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing"
"github.com/google/knative-gcp/test/e2e/lib"
"github.com/google/knative-gcp/test/e2e/lib/metrics"
"github.com/google/knative-gcp/test/e2e/lib/resources"

// The following line to load the gcp plugin (only required to authenticate against GKE clusters).
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)
Expand All @@ -51,12 +45,9 @@ func SmokeCloudPubSubSourceTestImpl(t *testing.T) {
defer lib.TearDown(client)

// Create the PubSub source.
eventsPubsub := kngcptesting.NewCloudPubSubSource(psName, client.Namespace,
kngcptesting.WithCloudPubSubSourceSink(metav1.GroupVersionKind{
Version: "v1",
Kind: "Service"}, svcName),
kngcptesting.WithCloudPubSubSourceTopic(topic))
client.CreatePubSubOrFail(eventsPubsub)
lib.MakePubSubOrDie(t, client, metav1.GroupVersionKind{
Version: "v1",
Kind: "Service"}, psName, svcName, topic)

client.Core.WaitForResourceReadyOrFail(psName, lib.CloudPubSubSourceTypeMeta)

Expand Down Expand Up @@ -85,12 +76,7 @@ func CloudPubSubSourceWithTargetTestImpl(t *testing.T, assertMetrics bool) {
client.CreateJobOrFail(job, lib.WithServiceForJob(targetName))

// Create the PubSub source.
eventsPubsub := kngcptesting.NewCloudPubSubSource(psName, client.Namespace,
kngcptesting.WithCloudPubSubSourceSink(lib.ServiceGVK, targetName),
kngcptesting.WithCloudPubSubSourceTopic(topicName))
client.CreatePubSubOrFail(eventsPubsub)

client.Core.WaitForResourceReadyOrFail(psName, lib.CloudPubSubSourceTypeMeta)
lib.MakePubSubOrDie(t, client, lib.ServiceGVK, psName, targetName, topicName)

topic := lib.GetTopic(t, topicName)

Expand Down Expand Up @@ -135,37 +121,6 @@ func CloudPubSubSourceWithTargetTestImpl(t *testing.T, assertMetrics bool) {

// Assert that we are actually sending event counts to StackDriver.
if assertMetrics {
sleepTime := 1 * time.Minute
t.Logf("Sleeping %s to make sure metrics were pushed to stackdriver", sleepTime.String())
time.Sleep(sleepTime)

// If we reach this point, the projectID should have been set.
projectID := os.Getenv(lib.ProwProjectKey)
f := map[string]interface{}{
"metric.type": lib.EventCountMetricType,
"resource.type": lib.GlobalMetricResourceType,
"metric.label.resource_group": lib.PubsubResourceGroup,
"metric.label.event_type": v1alpha1.CloudPubSubSourcePublish,
"metric.label.event_source": v1alpha1.CloudPubSubSourceEventSource(projectID, topicName),
"metric.label.namespace_name": client.Namespace,
"metric.label.name": psName,
// We exit the target image before sending a response, thus check for 500.
"metric.label.response_code": http.StatusInternalServerError,
"metric.label.response_code_class": pkgmetrics.ResponseCodeClass(http.StatusInternalServerError),
}

filter := metrics.StringifyStackDriverFilter(f)
t.Logf("Filter expression: %s", filter)

actualCount, err := client.StackDriverEventCountMetricFor(client.Namespace, projectID, filter)
if err != nil {
t.Errorf("failed to get stackdriver event count metric: %v", err)
t.Fail()
}
expectedCount := int64(1)
if *actualCount != expectedCount {
t.Errorf("Actual count different than expected count, actual: %d, expected: %d", actualCount, expectedCount)
t.Fail()
}
lib.AssertMetrics(t, client, topicName, psName)
}
}

0 comments on commit 818488f

Please sign in to comment.