Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
add e2e test for Storage>Broker with PubSub Channel
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolas Lopez committed Feb 20, 2020
1 parent dc8eb10 commit 85afea7
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 77 deletions.
7 changes: 7 additions & 0 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ func TestCloudPubSubSourceBrokerWithPubSubChannel(t *testing.T) {
PubSubSourceBrokerWithPubSubChannelTestImpl(t)
}

// TestCloudStorageSourceBrokerWithPubSubChannel tests we can knock a Knative Service from a broker with PubSub Channel from a CloudPubSubSource.
func TestCloudStorageSourceBrokerWithPubSubChannel(t *testing.T) {
cancel := logstream.Start(t)
defer cancel()
StorageSourceBrokerWithPubSubChannelTestImpl(t)
}

// TestCloudStorageSource tests we can knock down a target from a CloudStorageSource.
func TestCloudStorageSource(t *testing.T) {
cancel := logstream.Start(t)
Expand Down
105 changes: 105 additions & 0 deletions test/e2e/lib/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
Copyright 2020 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lib

import (
"context"
"fmt"
"testing"

"cloud.google.com/go/storage"
kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing"
"google.golang.org/api/iterator"
)

func MakeStorageOrDie(client *Client,
bucketName, storageName, targetName string,
so ...kngcptesting.CloudStorageSourceOption,
) {
so = append(so, kngcptesting.WithCloudStorageSourceBucket(bucketName))
so = append(so, kngcptesting.WithCloudStorageSourceSink(ServiceGVK, targetName))
eventsStorage := kngcptesting.NewCloudStorageSource(storageName, client.Namespace, so...)
client.CreateStorageOrFail(eventsStorage)

client.Core.WaitForResourceReadyOrFail(storageName, CloudStorageSourceTypeMeta)
}

func AddRandomFile(ctx context.Context, t *testing.T, bucketName, fileName, project string) {
// Add a random name file in the bucket
bucketHandle := getBucketHandle(ctx, t, bucketName, project)
wc := bucketHandle.Object(fileName).NewWriter(ctx)
// Write some text to object
if _, err := fmt.Fprintf(wc, "e2e test for storage importer.\n"); err != nil {
t.Error(err)
}
if err := wc.Close(); err != nil {
t.Error(err)
}

// Delete test file deferred
defer func() {
o := bucketHandle.Object(fileName)
if err := o.Delete(ctx); err != nil {
t.Error(err)
}
}()
}

// MakeBucket retrieves the bucket name for the test. If it does not exist, it will create it.
func MakeBucket(ctx context.Context, t *testing.T, project string) string {
if project == "" {
t.Fatalf("failed to find %q in envvars", ProwProjectKey)
}
client, err := storage.NewClient(ctx)
if err != nil {
t.Fatalf("failed to create storage client, %s", err.Error())
}
it := client.Buckets(ctx, project)
bucketName := "storage-e2e-test-" + project
// Iterate buckets to check if there has a bucket for e2e test
for {
bucketAttrs, err := it.Next()
if err == iterator.Done {
// Create a new bucket if there is no existing e2e test bucket
bucket := client.Bucket(bucketName)
if e := bucket.Create(ctx, project, &storage.BucketAttrs{}); e != nil {
t.Fatalf("failed to create bucket, %s", e.Error())
}
break
}
if err != nil {
t.Fatalf("failed to list buckets, %s", err.Error())
}
// Break iteration if there has a bucket for e2e test
if bucketAttrs.Name == bucketName {
break
}
}
return bucketName
}

func getBucketHandle(ctx context.Context, t *testing.T, bucketName, project string) *storage.BucketHandle {
// Prow sticks the project in this key
if project == "" {
t.Fatalf("failed to find %q in envvars", ProwProjectKey)
}
client, err := storage.NewClient(ctx)
if err != nil {
t.Fatalf("failed to create pubsub client, %s", err.Error())
}
return client.Bucket(bucketName)
}
71 changes: 67 additions & 4 deletions test/e2e/test_broker_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"net/url"
"os"
"testing"
"time"

Expand Down Expand Up @@ -68,6 +69,10 @@ func BrokerWithPubSubChannelTestImpl(t *testing.T) {

u := createBrokerWithPubSubChannel(t,
client,
[]v1.EnvVar{{
Name: "TARGET",
Value: "falldown",
}},
brokerName,
dummyTriggerName,
kserviceName,
Expand Down Expand Up @@ -113,6 +118,10 @@ func PubSubSourceBrokerWithPubSubChannelTestImpl(t *testing.T) {

u := createBrokerWithPubSubChannel(t,
client,
[]v1.EnvVar{{
Name: "TARGET",
Value: "falldown",
}},
brokerName,
dummyTriggerName,
kserviceName,
Expand Down Expand Up @@ -155,8 +164,65 @@ func PubSubSourceBrokerWithPubSubChannelTestImpl(t *testing.T) {
// TODO(nlopezgi): assert StackDriver metrics after https://github.com/google/knative-gcp/issues/317 is resolved
}

func StorageSourceBrokerWithPubSubChannelTestImpl(t *testing.T) {
ctx := context.Background()
project := os.Getenv(lib.ProwProjectKey)

bucketName := lib.MakeBucket(ctx, t, project)
storageName := helpers.AppendRandomString(bucketName + "-storage")
targetName := helpers.AppendRandomString(bucketName + "-target")
fileName := helpers.AppendRandomString("test-file-for-storage")

brokerName := helpers.AppendRandomString("pubsub")
dummyTriggerName := "dummy-broker-" + brokerName
respTriggerName := "resp-broker-" + brokerName
kserviceName := helpers.AppendRandomString("kservice")

client := lib.Setup(t, true)
defer lib.TearDown(client)

u := createBrokerWithPubSubChannel(t,
client,
[]v1.EnvVar{{
Name: "SUBJECT",
Value: fileName,
}, {
Name: "TIME",
Value: "120",
}},
brokerName,
dummyTriggerName,
kserviceName,
respTriggerName,
targetName,
)
t.Logf("Created broker")
var url apis.URL = apis.URL(u)
// Just to make sure all resources are ready.
time.Sleep(5 * time.Second)

// Create the Storage source.
lib.MakeStorageOrDie(
client,
bucketName,
storageName,
targetName,
kngcptesting.WithCloudStorageSourceSinkURI(&url),
)

// Add a random name file in the bucket
lib.AddRandomFile(ctx, t, bucketName, fileName, project)

// Check if resp CloudEvent hits the target Service.
if done := jobDone(client, targetName, t); !done {
t.Error("resp event didn't hit the target pod")
t.Failed()
}
}

func createBrokerWithPubSubChannel(t *testing.T,
client *lib.Client,
targetJobEnvVars []v1.EnvVar,
brokerName, dummyTriggerName, kserviceName, respTriggerName, targetName string,
) url.URL {
// Create a new Broker.
Expand All @@ -180,10 +246,7 @@ func createBrokerWithPubSubChannel(t *testing.T,
)

// Create a target Job to receive the events.
job := resources.TargetJob(targetName, []v1.EnvVar{{
Name: "TARGET",
Value: "falldown",
}})
job := resources.StorageTargetJob(targetName, targetJobEnvVars)
client.CreateJobOrFail(job, lib.WithServiceForJob(targetName))

// Create a Trigger with the target Service subscriber.
Expand Down
76 changes: 3 additions & 73 deletions test/e2e/test_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,16 @@ package e2e
import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"testing"
"time"

"cloud.google.com/go/storage"
"google.golang.org/api/iterator"
v1 "k8s.io/api/core/v1"
pkgmetrics "knative.dev/pkg/metrics"
"knative.dev/pkg/test/helpers"

"github.com/google/knative-gcp/pkg/apis/events/v1alpha1"
kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing"
"github.com/google/knative-gcp/test/e2e/lib"
"github.com/google/knative-gcp/test/e2e/lib/metrics"
"github.com/google/knative-gcp/test/e2e/lib/resources"
Expand All @@ -41,56 +37,11 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)

// makeBucket retrieves the bucket name for the test. If it does not exist, it will create it.
func makeBucket(ctx context.Context, t *testing.T, project string) string {
if project == "" {
t.Fatalf("failed to find %q in envvars", lib.ProwProjectKey)
}
client, err := storage.NewClient(ctx)
if err != nil {
t.Fatalf("failed to create storage client, %s", err.Error())
}
it := client.Buckets(ctx, project)
bucketName := "storage-e2e-test-" + project
// Iterate buckets to check if there has a bucket for e2e test
for {
bucketAttrs, err := it.Next()
if err == iterator.Done {
// Create a new bucket if there is no existing e2e test bucket
bucket := client.Bucket(bucketName)
if e := bucket.Create(ctx, project, &storage.BucketAttrs{}); e != nil {
t.Fatalf("failed to create bucket, %s", e.Error())
}
break
}
if err != nil {
t.Fatalf("failed to list buckets, %s", err.Error())
}
// Break iteration if there has a bucket for e2e test
if bucketAttrs.Name == bucketName {
break
}
}
return bucketName
}

func getBucketHandle(ctx context.Context, t *testing.T, bucketName string, project string) *storage.BucketHandle {
// Prow sticks the project in this key
if project == "" {
t.Fatalf("failed to find %q in envvars", lib.ProwProjectKey)
}
client, err := storage.NewClient(ctx)
if err != nil {
t.Fatalf("failed to create pubsub client, %s", err.Error())
}
return client.Bucket(bucketName)
}

func CloudStorageSourceWithTestImpl(t *testing.T, assertMetrics bool) {
ctx := context.Background()
project := os.Getenv(lib.ProwProjectKey)

bucketName := makeBucket(ctx, t, project)
bucketName := lib.MakeBucket(ctx, t, project)
storageName := helpers.AppendRandomString(bucketName + "-storage")
targetName := helpers.AppendRandomString(bucketName + "-target")

Expand All @@ -113,31 +64,10 @@ func CloudStorageSourceWithTestImpl(t *testing.T, assertMetrics bool) {
client.CreateJobOrFail(job, lib.WithServiceForJob(targetName))

// Create the Storage source.
eventsStorage := kngcptesting.NewCloudStorageSource(storageName, client.Namespace,
kngcptesting.WithCloudStorageSourceBucket(bucketName),
kngcptesting.WithCloudStorageSourceSink(lib.ServiceGVK, targetName))
client.CreateStorageOrFail(eventsStorage)

client.Core.WaitForResourceReadyOrFail(storageName, lib.CloudStorageSourceTypeMeta)
lib.MakeStorageOrDie(client, bucketName, storageName, targetName)

// Add a random name file in the bucket
bucketHandle := getBucketHandle(ctx, t, bucketName, project)
wc := bucketHandle.Object(fileName).NewWriter(ctx)
// Write some text to object
if _, err := fmt.Fprintf(wc, "e2e test for storage importer.\n"); err != nil {
t.Error(err)
}
if err := wc.Close(); err != nil {
t.Error(err)
}

// Delete test file deferred
defer func() {
o := bucketHandle.Object(fileName)
if err := o.Delete(ctx); err != nil {
t.Error(err)
}
}()
lib.AddRandomFile(ctx, t, bucketName, fileName, project)

msg, err := client.WaitUntilJobDone(client.Namespace, targetName)
if err != nil {
Expand Down

0 comments on commit 85afea7

Please sign in to comment.