diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index f1d6e62f7c..39c6b40aaa 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -209,6 +209,13 @@ func TestCloudPubSubSourceBrokerWithPubSubChannel(t *testing.T) { PubSubSourceBrokerWithPubSubChannelTestImpl(t) } +// TestCloudStorageSourceBrokerWithPubSubChannel tests we can knock a Knative Service from a broker with PubSub Channel from a CloudPubSubSource. +func TestCloudStorageSourceBrokerWithPubSubChannel(t *testing.T) { + cancel := logstream.Start(t) + defer cancel() + StorageSourceBrokerWithPubSubChannelTestImpl(t) +} + // TestCloudStorageSource tests we can knock down a target from a CloudStorageSource. func TestCloudStorageSource(t *testing.T) { cancel := logstream.Start(t) diff --git a/test/e2e/lib/storage.go b/test/e2e/lib/storage.go new file mode 100644 index 0000000000..5558cd7eb5 --- /dev/null +++ b/test/e2e/lib/storage.go @@ -0,0 +1,105 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lib + +import ( + "context" + "fmt" + "testing" + + "cloud.google.com/go/storage" + kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing" + "google.golang.org/api/iterator" +) + +func MakeStorageOrDie(client *Client, + bucketName, storageName, targetName string, + so ...kngcptesting.CloudStorageSourceOption, +) { + so = append(so, kngcptesting.WithCloudStorageSourceBucket(bucketName)) + so = append(so, kngcptesting.WithCloudStorageSourceSink(ServiceGVK, targetName)) + eventsStorage := kngcptesting.NewCloudStorageSource(storageName, client.Namespace, so...) + client.CreateStorageOrFail(eventsStorage) + + client.Core.WaitForResourceReadyOrFail(storageName, CloudStorageSourceTypeMeta) +} + +func AddRandomFile(ctx context.Context, t *testing.T, bucketName, fileName, project string) { + // Add a random name file in the bucket + bucketHandle := getBucketHandle(ctx, t, bucketName, project) + wc := bucketHandle.Object(fileName).NewWriter(ctx) + // Write some text to object + if _, err := fmt.Fprintf(wc, "e2e test for storage importer.\n"); err != nil { + t.Error(err) + } + if err := wc.Close(); err != nil { + t.Error(err) + } + + // Delete test file deferred + defer func() { + o := bucketHandle.Object(fileName) + if err := o.Delete(ctx); err != nil { + t.Error(err) + } + }() +} + +// MakeBucket retrieves the bucket name for the test. If it does not exist, it will create it. +func MakeBucket(ctx context.Context, t *testing.T, project string) string { + if project == "" { + t.Fatalf("failed to find %q in envvars", ProwProjectKey) + } + client, err := storage.NewClient(ctx) + if err != nil { + t.Fatalf("failed to create storage client, %s", err.Error()) + } + it := client.Buckets(ctx, project) + bucketName := "storage-e2e-test-" + project + // Iterate buckets to check if there has a bucket for e2e test + for { + bucketAttrs, err := it.Next() + if err == iterator.Done { + // Create a new bucket if there is no existing e2e test bucket + bucket := client.Bucket(bucketName) + if e := bucket.Create(ctx, project, &storage.BucketAttrs{}); e != nil { + t.Fatalf("failed to create bucket, %s", e.Error()) + } + break + } + if err != nil { + t.Fatalf("failed to list buckets, %s", err.Error()) + } + // Break iteration if there has a bucket for e2e test + if bucketAttrs.Name == bucketName { + break + } + } + return bucketName +} + +func getBucketHandle(ctx context.Context, t *testing.T, bucketName, project string) *storage.BucketHandle { + // Prow sticks the project in this key + if project == "" { + t.Fatalf("failed to find %q in envvars", ProwProjectKey) + } + client, err := storage.NewClient(ctx) + if err != nil { + t.Fatalf("failed to create pubsub client, %s", err.Error()) + } + return client.Bucket(bucketName) +} diff --git a/test/e2e/test_broker_pubsub.go b/test/e2e/test_broker_pubsub.go index fdb777f392..3c3ad0e3fb 100644 --- a/test/e2e/test_broker_pubsub.go +++ b/test/e2e/test_broker_pubsub.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "net/url" + "os" "testing" "time" @@ -68,6 +69,10 @@ func BrokerWithPubSubChannelTestImpl(t *testing.T) { u := createBrokerWithPubSubChannel(t, client, + []v1.EnvVar{{ + Name: "TARGET", + Value: "falldown", + }}, brokerName, dummyTriggerName, kserviceName, @@ -113,6 +118,10 @@ func PubSubSourceBrokerWithPubSubChannelTestImpl(t *testing.T) { u := createBrokerWithPubSubChannel(t, client, + []v1.EnvVar{{ + Name: "TARGET", + Value: "falldown", + }}, brokerName, dummyTriggerName, kserviceName, @@ -155,8 +164,65 @@ func PubSubSourceBrokerWithPubSubChannelTestImpl(t *testing.T) { // TODO(nlopezgi): assert StackDriver metrics after https://github.com/google/knative-gcp/issues/317 is resolved } +func StorageSourceBrokerWithPubSubChannelTestImpl(t *testing.T) { + ctx := context.Background() + project := os.Getenv(lib.ProwProjectKey) + + bucketName := lib.MakeBucket(ctx, t, project) + storageName := helpers.AppendRandomString(bucketName + "-storage") + targetName := helpers.AppendRandomString(bucketName + "-target") + fileName := helpers.AppendRandomString("test-file-for-storage") + + brokerName := helpers.AppendRandomString("pubsub") + dummyTriggerName := "dummy-broker-" + brokerName + respTriggerName := "resp-broker-" + brokerName + kserviceName := helpers.AppendRandomString("kservice") + + client := lib.Setup(t, true) + defer lib.TearDown(client) + + u := createBrokerWithPubSubChannel(t, + client, + []v1.EnvVar{{ + Name: "SUBJECT", + Value: fileName, + }, { + Name: "TIME", + Value: "120", + }}, + brokerName, + dummyTriggerName, + kserviceName, + respTriggerName, + targetName, + ) + t.Logf("Created broker") + var url apis.URL = apis.URL(u) + // Just to make sure all resources are ready. + time.Sleep(5 * time.Second) + + // Create the Storage source. + lib.MakeStorageOrDie( + client, + bucketName, + storageName, + targetName, + kngcptesting.WithCloudStorageSourceSinkURI(&url), + ) + + // Add a random name file in the bucket + lib.AddRandomFile(ctx, t, bucketName, fileName, project) + + // Check if resp CloudEvent hits the target Service. + if done := jobDone(client, targetName, t); !done { + t.Error("resp event didn't hit the target pod") + t.Failed() + } +} + func createBrokerWithPubSubChannel(t *testing.T, client *lib.Client, + targetJobEnvVars []v1.EnvVar, brokerName, dummyTriggerName, kserviceName, respTriggerName, targetName string, ) url.URL { // Create a new Broker. @@ -180,10 +246,7 @@ func createBrokerWithPubSubChannel(t *testing.T, ) // Create a target Job to receive the events. - job := resources.TargetJob(targetName, []v1.EnvVar{{ - Name: "TARGET", - Value: "falldown", - }}) + job := resources.StorageTargetJob(targetName, targetJobEnvVars) client.CreateJobOrFail(job, lib.WithServiceForJob(targetName)) // Create a Trigger with the target Service subscriber. diff --git a/test/e2e/test_storage.go b/test/e2e/test_storage.go index 28e5bdbc81..f054c9fee9 100644 --- a/test/e2e/test_storage.go +++ b/test/e2e/test_storage.go @@ -19,20 +19,16 @@ package e2e import ( "context" "encoding/json" - "fmt" "net/http" "os" "testing" "time" - "cloud.google.com/go/storage" - "google.golang.org/api/iterator" v1 "k8s.io/api/core/v1" pkgmetrics "knative.dev/pkg/metrics" "knative.dev/pkg/test/helpers" "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" - kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing" "github.com/google/knative-gcp/test/e2e/lib" "github.com/google/knative-gcp/test/e2e/lib/metrics" "github.com/google/knative-gcp/test/e2e/lib/resources" @@ -41,56 +37,11 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) -// makeBucket retrieves the bucket name for the test. If it does not exist, it will create it. -func makeBucket(ctx context.Context, t *testing.T, project string) string { - if project == "" { - t.Fatalf("failed to find %q in envvars", lib.ProwProjectKey) - } - client, err := storage.NewClient(ctx) - if err != nil { - t.Fatalf("failed to create storage client, %s", err.Error()) - } - it := client.Buckets(ctx, project) - bucketName := "storage-e2e-test-" + project - // Iterate buckets to check if there has a bucket for e2e test - for { - bucketAttrs, err := it.Next() - if err == iterator.Done { - // Create a new bucket if there is no existing e2e test bucket - bucket := client.Bucket(bucketName) - if e := bucket.Create(ctx, project, &storage.BucketAttrs{}); e != nil { - t.Fatalf("failed to create bucket, %s", e.Error()) - } - break - } - if err != nil { - t.Fatalf("failed to list buckets, %s", err.Error()) - } - // Break iteration if there has a bucket for e2e test - if bucketAttrs.Name == bucketName { - break - } - } - return bucketName -} - -func getBucketHandle(ctx context.Context, t *testing.T, bucketName string, project string) *storage.BucketHandle { - // Prow sticks the project in this key - if project == "" { - t.Fatalf("failed to find %q in envvars", lib.ProwProjectKey) - } - client, err := storage.NewClient(ctx) - if err != nil { - t.Fatalf("failed to create pubsub client, %s", err.Error()) - } - return client.Bucket(bucketName) -} - func CloudStorageSourceWithTestImpl(t *testing.T, assertMetrics bool) { ctx := context.Background() project := os.Getenv(lib.ProwProjectKey) - bucketName := makeBucket(ctx, t, project) + bucketName := lib.MakeBucket(ctx, t, project) storageName := helpers.AppendRandomString(bucketName + "-storage") targetName := helpers.AppendRandomString(bucketName + "-target") @@ -113,31 +64,10 @@ func CloudStorageSourceWithTestImpl(t *testing.T, assertMetrics bool) { client.CreateJobOrFail(job, lib.WithServiceForJob(targetName)) // Create the Storage source. - eventsStorage := kngcptesting.NewCloudStorageSource(storageName, client.Namespace, - kngcptesting.WithCloudStorageSourceBucket(bucketName), - kngcptesting.WithCloudStorageSourceSink(lib.ServiceGVK, targetName)) - client.CreateStorageOrFail(eventsStorage) - - client.Core.WaitForResourceReadyOrFail(storageName, lib.CloudStorageSourceTypeMeta) + lib.MakeStorageOrDie(client, bucketName, storageName, targetName) // Add a random name file in the bucket - bucketHandle := getBucketHandle(ctx, t, bucketName, project) - wc := bucketHandle.Object(fileName).NewWriter(ctx) - // Write some text to object - if _, err := fmt.Fprintf(wc, "e2e test for storage importer.\n"); err != nil { - t.Error(err) - } - if err := wc.Close(); err != nil { - t.Error(err) - } - - // Delete test file deferred - defer func() { - o := bucketHandle.Object(fileName) - if err := o.Delete(ctx); err != nil { - t.Error(err) - } - }() + lib.AddRandomFile(ctx, t, bucketName, fileName, project) msg, err := client.WaitUntilJobDone(client.Namespace, targetName) if err != nil {