diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index f1d6e62f7c..39c6b40aaa 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -209,6 +209,13 @@ func TestCloudPubSubSourceBrokerWithPubSubChannel(t *testing.T) { PubSubSourceBrokerWithPubSubChannelTestImpl(t) } +// TestCloudStorageSourceBrokerWithPubSubChannel tests we can knock a Knative Service from a broker with PubSub Channel from a CloudPubSubSource. +func TestCloudStorageSourceBrokerWithPubSubChannel(t *testing.T) { + cancel := logstream.Start(t) + defer cancel() + StorageSourceBrokerWithPubSubChannelTestImpl(t) +} + // TestCloudStorageSource tests we can knock down a target from a CloudStorageSource. func TestCloudStorageSource(t *testing.T) { cancel := logstream.Start(t) diff --git a/test/e2e/lib/storage.go b/test/e2e/lib/storage.go new file mode 100644 index 0000000000..c221f6d826 --- /dev/null +++ b/test/e2e/lib/storage.go @@ -0,0 +1,118 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lib + +import ( + "context" + "fmt" + "testing" + + "cloud.google.com/go/storage" + kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing" + "github.com/google/knative-gcp/test/e2e/lib/resources" + "google.golang.org/api/iterator" + v1 "k8s.io/api/core/v1" +) + +func MakeStorageOrDie(client *Client, + bucketName, storageName, targetName string, + so ...kngcptesting.CloudStorageSourceOption, +) { + so = append(so, kngcptesting.WithCloudStorageSourceBucket(bucketName)) + so = append(so, kngcptesting.WithCloudStorageSourceSink(ServiceGVK, targetName)) + eventsStorage := kngcptesting.NewCloudStorageSource(storageName, client.Namespace, so...) + client.CreateStorageOrFail(eventsStorage) + + client.Core.WaitForResourceReadyOrFail(storageName, CloudStorageSourceTypeMeta) +} + +func MakeStorageJobOrDie(client *Client, fileName, targetName string) { + job := resources.StorageTargetJob(targetName, []v1.EnvVar{{ + Name: "SUBJECT", + Value: fileName, + }, { + Name: "TIME", + Value: "120", + }}) + client.CreateJobOrFail(job, WithServiceForJob(targetName)) +} + +func AddRandomFile(ctx context.Context, t *testing.T, bucketName, fileName, project string) { + // Add a random name file in the bucket + bucketHandle := getBucketHandle(ctx, t, bucketName, project) + wc := bucketHandle.Object(fileName).NewWriter(ctx) + // Write some text to object + if _, err := fmt.Fprintf(wc, "e2e test for storage importer.\n"); err != nil { + t.Error(err) + } + if err := wc.Close(); err != nil { + t.Error(err) + } + + // Delete test file deferred + defer func() { + o := bucketHandle.Object(fileName) + if err := o.Delete(ctx); err != nil { + t.Error(err) + } + }() +} + +// MakeBucket retrieves the bucket name for the test. If it does not exist, it will create it. +func MakeBucket(ctx context.Context, t *testing.T, project string) string { + if project == "" { + t.Fatalf("failed to find %q in envvars", ProwProjectKey) + } + client, err := storage.NewClient(ctx) + if err != nil { + t.Fatalf("failed to create storage client, %s", err.Error()) + } + it := client.Buckets(ctx, project) + bucketName := "storage-e2e-test-" + project + // Iterate buckets to check if there has a bucket for e2e test + for { + bucketAttrs, err := it.Next() + if err == iterator.Done { + // Create a new bucket if there is no existing e2e test bucket + bucket := client.Bucket(bucketName) + if e := bucket.Create(ctx, project, &storage.BucketAttrs{}); e != nil { + t.Fatalf("failed to create bucket, %s", e.Error()) + } + break + } + if err != nil { + t.Fatalf("failed to list buckets, %s", err.Error()) + } + // Break iteration if there has a bucket for e2e test + if bucketAttrs.Name == bucketName { + break + } + } + return bucketName +} + +func getBucketHandle(ctx context.Context, t *testing.T, bucketName, project string) *storage.BucketHandle { + // Prow sticks the project in this key + if project == "" { + t.Fatalf("failed to find %q in envvars", ProwProjectKey) + } + client, err := storage.NewClient(ctx) + if err != nil { + t.Fatalf("failed to create pubsub client, %s", err.Error()) + } + return client.Bucket(bucketName) +} diff --git a/test/e2e/test_broker_pubsub.go b/test/e2e/test_broker_pubsub.go index fdb777f392..2ef5b8f744 100644 --- a/test/e2e/test_broker_pubsub.go +++ b/test/e2e/test_broker_pubsub.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "net/url" + "os" "testing" "time" @@ -56,24 +57,16 @@ Note: the number denotes the sequence of the event that flows in this test case. */ func BrokerWithPubSubChannelTestImpl(t *testing.T) { - brokerName := helpers.AppendRandomString("pubsub") - dummyTriggerName := "dummy-broker-" + brokerName - respTriggerName := "resp-broker-" + brokerName - kserviceName := helpers.AppendRandomString("kservice") senderName := helpers.AppendRandomString("sender") targetName := helpers.AppendRandomString("target") client := lib.Setup(t, true) defer lib.TearDown(client) - u := createBrokerWithPubSubChannel(t, - client, - brokerName, - dummyTriggerName, - kserviceName, - respTriggerName, - targetName, - ) + // Create a target Job to receive the events. + makeTargetJobOrDie(client, targetName) + + u := createBrokerWithPubSubChannel(t, client, targetName) // Just to make sure all resources are ready. time.Sleep(5 * time.Second) @@ -101,24 +94,17 @@ func PubSubSourceBrokerWithPubSubChannelTestImpl(t *testing.T) { topicName, deleteTopic := lib.MakeTopicOrDie(t) defer deleteTopic() - brokerName := helpers.AppendRandomString("pubsub") - dummyTriggerName := "dummy-broker-" + brokerName psName := helpers.AppendRandomString(topicName + "-pubsub") - respTriggerName := "resp-broker-" + brokerName - kserviceName := helpers.AppendRandomString("kservice") targetName := helpers.AppendRandomString(topicName + "-target") client := lib.Setup(t, true) defer lib.TearDown(client) - u := createBrokerWithPubSubChannel(t, - client, - brokerName, - dummyTriggerName, - kserviceName, - respTriggerName, - targetName, - ) + // Create a target Job to receive the events. + makeTargetJobOrDie(client, targetName) + + u := createBrokerWithPubSubChannel(t, client, targetName) + var url apis.URL = apis.URL(u) // Just to make sure all resources are ready. time.Sleep(5 * time.Second) @@ -155,10 +141,51 @@ func PubSubSourceBrokerWithPubSubChannelTestImpl(t *testing.T) { // TODO(nlopezgi): assert StackDriver metrics after https://github.com/google/knative-gcp/issues/317 is resolved } -func createBrokerWithPubSubChannel(t *testing.T, - client *lib.Client, - brokerName, dummyTriggerName, kserviceName, respTriggerName, targetName string, -) url.URL { +func StorageSourceBrokerWithPubSubChannelTestImpl(t *testing.T) { + ctx := context.Background() + project := os.Getenv(lib.ProwProjectKey) + + bucketName := lib.MakeBucket(ctx, t, project) + storageName := helpers.AppendRandomString(bucketName + "-storage") + targetName := helpers.AppendRandomString(bucketName + "-target") + fileName := helpers.AppendRandomString("test-file-for-storage") + + client := lib.Setup(t, true) + defer lib.TearDown(client) + + // Create a target StorageJob to receive the events. + lib.MakeStorageJobOrDie(client, fileName, targetName) + + u := createBrokerWithPubSubChannel(t, client, targetName) + + var url apis.URL = apis.URL(u) + // Just to make sure all resources are ready. + time.Sleep(5 * time.Second) + + // Create the Storage source. + lib.MakeStorageOrDie( + client, + bucketName, + storageName, + targetName, + kngcptesting.WithCloudStorageSourceSinkURI(&url), + ) + + // Add a random name file in the bucket + lib.AddRandomFile(ctx, t, bucketName, fileName, project) + + // Check if resp CloudEvent hits the target Service. + if done := jobDone(client, targetName, t); !done { + t.Error("resp event didn't hit the target pod") + t.Failed() + } +} + +func createBrokerWithPubSubChannel(t *testing.T, client *lib.Client, targetName string) url.URL { + brokerName := helpers.AppendRandomString("pubsub") + dummyTriggerName := "dummy-broker-" + brokerName + respTriggerName := "resp-broker-" + brokerName + kserviceName := helpers.AppendRandomString("kservice") // Create a new Broker. // TODO(chizhg): maybe we don't need to create these RBAC resources as they will now be automatically created? client.Core.CreateRBACResourcesForBrokers() @@ -179,13 +206,6 @@ func createBrokerWithPubSubChannel(t *testing.T, eventingtestresources.WithSubscriberKServiceRefForTrigger(kserviceName), ) - // Create a target Job to receive the events. - job := resources.TargetJob(targetName, []v1.EnvVar{{ - Name: "TARGET", - Value: "falldown", - }}) - client.CreateJobOrFail(job, lib.WithServiceForJob(targetName)) - // Create a Trigger with the target Service subscriber. client.Core.CreateTriggerOrFail( respTriggerName, @@ -210,6 +230,14 @@ func createBrokerWithPubSubChannel(t *testing.T, return u } +func makeTargetJobOrDie(client *lib.Client, targetName string) { + job := resources.TargetJob(targetName, []v1.EnvVar{{ + Name: "TARGET", + Value: "falldown", + }}) + client.CreateJobOrFail(job, lib.WithServiceForJob(targetName)) +} + func jobDone(client *lib.Client, podName string, t *testing.T) bool { msg, err := client.WaitUntilJobDone(client.Namespace, podName) if err != nil { diff --git a/test/e2e/test_storage.go b/test/e2e/test_storage.go index 28e5bdbc81..61fb6751a3 100644 --- a/test/e2e/test_storage.go +++ b/test/e2e/test_storage.go @@ -19,78 +19,27 @@ package e2e import ( "context" "encoding/json" - "fmt" "net/http" "os" "testing" "time" - "cloud.google.com/go/storage" - "google.golang.org/api/iterator" - v1 "k8s.io/api/core/v1" pkgmetrics "knative.dev/pkg/metrics" "knative.dev/pkg/test/helpers" "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" - kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing" "github.com/google/knative-gcp/test/e2e/lib" "github.com/google/knative-gcp/test/e2e/lib/metrics" - "github.com/google/knative-gcp/test/e2e/lib/resources" // The following line to load the gcp plugin (only required to authenticate against GKE clusters). _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) -// makeBucket retrieves the bucket name for the test. If it does not exist, it will create it. -func makeBucket(ctx context.Context, t *testing.T, project string) string { - if project == "" { - t.Fatalf("failed to find %q in envvars", lib.ProwProjectKey) - } - client, err := storage.NewClient(ctx) - if err != nil { - t.Fatalf("failed to create storage client, %s", err.Error()) - } - it := client.Buckets(ctx, project) - bucketName := "storage-e2e-test-" + project - // Iterate buckets to check if there has a bucket for e2e test - for { - bucketAttrs, err := it.Next() - if err == iterator.Done { - // Create a new bucket if there is no existing e2e test bucket - bucket := client.Bucket(bucketName) - if e := bucket.Create(ctx, project, &storage.BucketAttrs{}); e != nil { - t.Fatalf("failed to create bucket, %s", e.Error()) - } - break - } - if err != nil { - t.Fatalf("failed to list buckets, %s", err.Error()) - } - // Break iteration if there has a bucket for e2e test - if bucketAttrs.Name == bucketName { - break - } - } - return bucketName -} - -func getBucketHandle(ctx context.Context, t *testing.T, bucketName string, project string) *storage.BucketHandle { - // Prow sticks the project in this key - if project == "" { - t.Fatalf("failed to find %q in envvars", lib.ProwProjectKey) - } - client, err := storage.NewClient(ctx) - if err != nil { - t.Fatalf("failed to create pubsub client, %s", err.Error()) - } - return client.Bucket(bucketName) -} - func CloudStorageSourceWithTestImpl(t *testing.T, assertMetrics bool) { ctx := context.Background() project := os.Getenv(lib.ProwProjectKey) - bucketName := makeBucket(ctx, t, project) + bucketName := lib.MakeBucket(ctx, t, project) storageName := helpers.AppendRandomString(bucketName + "-storage") targetName := helpers.AppendRandomString(bucketName + "-target") @@ -103,41 +52,13 @@ func CloudStorageSourceWithTestImpl(t *testing.T, assertMetrics bool) { fileName := helpers.AppendRandomString("test-file-for-storage") // Create a storage_target Job to receive the events. - job := resources.StorageTargetJob(targetName, []v1.EnvVar{{ - Name: "SUBJECT", - Value: fileName, - }, { - Name: "TIME", - Value: "120", - }}) - client.CreateJobOrFail(job, lib.WithServiceForJob(targetName)) + lib.MakeStorageJobOrDie(client, fileName, targetName) // Create the Storage source. - eventsStorage := kngcptesting.NewCloudStorageSource(storageName, client.Namespace, - kngcptesting.WithCloudStorageSourceBucket(bucketName), - kngcptesting.WithCloudStorageSourceSink(lib.ServiceGVK, targetName)) - client.CreateStorageOrFail(eventsStorage) - - client.Core.WaitForResourceReadyOrFail(storageName, lib.CloudStorageSourceTypeMeta) + lib.MakeStorageOrDie(client, bucketName, storageName, targetName) // Add a random name file in the bucket - bucketHandle := getBucketHandle(ctx, t, bucketName, project) - wc := bucketHandle.Object(fileName).NewWriter(ctx) - // Write some text to object - if _, err := fmt.Fprintf(wc, "e2e test for storage importer.\n"); err != nil { - t.Error(err) - } - if err := wc.Close(); err != nil { - t.Error(err) - } - - // Delete test file deferred - defer func() { - o := bucketHandle.Object(fileName) - if err := o.Delete(ctx); err != nil { - t.Error(err) - } - }() + lib.AddRandomFile(ctx, t, bucketName, fileName, project) msg, err := client.WaitUntilJobDone(client.Namespace, targetName) if err != nil {