diff --git a/pkg/reconciler/events/auditlogs/auditlogs_test.go b/pkg/reconciler/events/auditlogs/auditlogs_test.go index e53a13a11f..2eb926c9ae 100644 --- a/pkg/reconciler/events/auditlogs/auditlogs_test.go +++ b/pkg/reconciler/events/auditlogs/auditlogs_test.go @@ -661,6 +661,7 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceSink(sinkGVK, sinkName), WithCloudAuditLogsSourceProjectID(testProject), + WithCloudAuditLogsSourceSubscriptionID(SubscriptionID), WithInitCloudAuditLogsSourceConditions, WithCloudAuditLogsSourceTopicReady(testTopicID), WithCloudAuditLogsSourcePullSubscriptionReady(), @@ -717,6 +718,7 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceSink(sinkGVK, sinkName), WithCloudAuditLogsSourceProjectID(testProject), + WithCloudAuditLogsSourceSubscriptionID(SubscriptionID), WithInitCloudAuditLogsSourceConditions, WithCloudAuditLogsSourceTopicReady(testTopicID), WithCloudAuditLogsSourcePullSubscriptionReady(), @@ -773,6 +775,7 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceSink(sinkGVK, sinkName), WithCloudAuditLogsSourceProjectID(testProject), + WithCloudAuditLogsSourceSubscriptionID(SubscriptionID), WithInitCloudAuditLogsSourceConditions, WithCloudAuditLogsSourceTopicReady(testTopicID), WithCloudAuditLogsSourcePullSubscriptionReady(), @@ -829,6 +832,7 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceSink(sinkGVK, sinkName), WithCloudAuditLogsSourceProjectID(testProject), + WithCloudAuditLogsSourceSubscriptionID(SubscriptionID), WithInitCloudAuditLogsSourceConditions, WithCloudAuditLogsSourceTopicReady(testTopicID), WithCloudAuditLogsSourcePullSubscriptionReady(), @@ -896,6 +900,7 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceMethodName(testMethodName), WithCloudAuditLogsSourceProjectID(testProject), + WithCloudAuditLogsSourceSubscriptionID(SubscriptionID), WithInitCloudAuditLogsSourceConditions, WithCloudAuditLogsSourceTopicReady(testTopicID), WithCloudAuditLogsSourcePullSubscriptionReady(), @@ -963,6 +968,7 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceMethodName(testMethodName), WithCloudAuditLogsSourceProjectID(testProject), + WithCloudAuditLogsSourceSubscriptionID(SubscriptionID), WithInitCloudAuditLogsSourceConditions, WithCloudAuditLogsSourceTopicReady(testTopicID), WithCloudAuditLogsSourcePullSubscriptionReady(), @@ -1025,6 +1031,7 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceMethodName(testMethodName), WithCloudAuditLogsSourceProjectID(testProject), + WithCloudAuditLogsSourceSubscriptionID(SubscriptionID), WithInitCloudAuditLogsSourceConditions, WithCloudAuditLogsSourceTopicReady(testTopicID), WithCloudAuditLogsSourcePullSubscriptionReady(), @@ -1090,6 +1097,7 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceSink(sinkGVK, sinkName), WithCloudAuditLogsSourceProjectID(testProject), + WithCloudAuditLogsSourceSubscriptionID(SubscriptionID), WithInitCloudAuditLogsSourceConditions, WithCloudAuditLogsSourceTopicReady(testTopicID), WithCloudAuditLogsSourcePullSubscriptionReady(), diff --git a/pkg/reconciler/events/build/build_test.go b/pkg/reconciler/events/build/build_test.go index 2308b6609f..6ace3288fe 100644 --- a/pkg/reconciler/events/build/build_test.go +++ b/pkg/reconciler/events/build/build_test.go @@ -336,6 +336,7 @@ func TestAllCases(t *testing.T) { WithInitCloudBuildSourceConditions, WithCloudBuildSourcePullSubscriptionReady(), WithCloudBuildSourceSinkURI(pubsubSinkURL), + WithCloudBuildSourceSubscriptionID(SubscriptionID), ), }, { Object: NewCloudBuildSource(buildName, testNS, @@ -346,6 +347,7 @@ func TestAllCases(t *testing.T) { WithInitCloudBuildSourceConditions, WithCloudBuildSourcePullSubscriptionReady(), WithCloudBuildSourceSinkURI(pubsubSinkURL), + WithCloudBuildSourceSubscriptionID(SubscriptionID), WithCloudBuildSourceFinalizers("cloudbuildsources.events.cloud.google.com"), ), }}, diff --git a/pkg/reconciler/events/pubsub/pubsub_test.go b/pkg/reconciler/events/pubsub/pubsub_test.go index 9111b10a40..4d33e9b980 100644 --- a/pkg/reconciler/events/pubsub/pubsub_test.go +++ b/pkg/reconciler/events/pubsub/pubsub_test.go @@ -335,6 +335,7 @@ func TestAllCases(t *testing.T) { WithInitCloudPubSubSourceConditions, WithCloudPubSubSourcePullSubscriptionReady(), WithCloudPubSubSourceSinkURI(pubsubSinkURL), + WithCloudPubSubSourceSubscriptionID(SubscriptionID), ), }, { Object: NewCloudPubSubSource(pubsubName, testNS, @@ -345,6 +346,7 @@ func TestAllCases(t *testing.T) { WithInitCloudPubSubSourceConditions, WithCloudPubSubSourcePullSubscriptionReady(), WithCloudPubSubSourceSinkURI(pubsubSinkURL), + WithCloudPubSubSourceSubscriptionID(SubscriptionID), WithCloudPubSubSourceFinalizers("cloudpubsubsources.events.cloud.google.com"), ), }}, diff --git a/pkg/reconciler/events/scheduler/scheduler_test.go b/pkg/reconciler/events/scheduler/scheduler_test.go index 2327fa27df..4d831c0bd9 100644 --- a/pkg/reconciler/events/scheduler/scheduler_test.go +++ b/pkg/reconciler/events/scheduler/scheduler_test.go @@ -705,6 +705,7 @@ func TestAllCases(t *testing.T) { WithInitCloudSchedulerSourceConditions, WithCloudSchedulerSourceTopicReady(testTopicID, testProject), WithCloudSchedulerSourcePullSubscriptionReady(), + WithCloudSchedulerSourceSubscriptionID(SubscriptionID), WithCloudSchedulerSourceJobNotReady(reconciledFailedReason, fmt.Sprintf("%s: %s", failedToReconcileJobMsg, "create-client-induced-error")), WithCloudSchedulerSourceSinkURI(schedulerSinkURL)), }}, @@ -766,6 +767,7 @@ func TestAllCases(t *testing.T) { WithInitCloudSchedulerSourceConditions, WithCloudSchedulerSourceTopicReady(testTopicID, testProject), WithCloudSchedulerSourcePullSubscriptionReady(), + WithCloudSchedulerSourceSubscriptionID(SubscriptionID), WithCloudSchedulerSourceJobNotReady(reconciledFailedReason, fmt.Sprintf("%s: %s", failedToReconcileJobMsg, "get-job-induced-error")), WithCloudSchedulerSourceSinkURI(schedulerSinkURL)), }}, @@ -827,6 +829,7 @@ func TestAllCases(t *testing.T) { WithInitCloudSchedulerSourceConditions, WithCloudSchedulerSourceTopicReady(testTopicID, testProject), WithCloudSchedulerSourcePullSubscriptionReady(), + WithCloudSchedulerSourceSubscriptionID(SubscriptionID), WithCloudSchedulerSourceJobNotReady(reconciledFailedReason, fmt.Sprintf("%s: rpc error: code = %s desc = %s", failedToReconcileJobMsg, codes.Unknown, "get-job-induced-error")), WithCloudSchedulerSourceSinkURI(schedulerSinkURL)), }}, @@ -889,6 +892,7 @@ func TestAllCases(t *testing.T) { WithInitCloudSchedulerSourceConditions, WithCloudSchedulerSourceTopicReady(testTopicID, testProject), WithCloudSchedulerSourcePullSubscriptionReady(), + WithCloudSchedulerSourceSubscriptionID(SubscriptionID), WithCloudSchedulerSourceJobNotReady(reconciledFailedReason, fmt.Sprintf("%s: %s", failedToReconcileJobMsg, "create-job-induced-error")), WithCloudSchedulerSourceSinkURI(schedulerSinkURL)), }}, @@ -950,6 +954,7 @@ func TestAllCases(t *testing.T) { WithInitCloudSchedulerSourceConditions, WithCloudSchedulerSourceTopicReady(testTopicID, testProject), WithCloudSchedulerSourcePullSubscriptionReady(), + WithCloudSchedulerSourceSubscriptionID(SubscriptionID), WithCloudSchedulerSourceJobReady(jobName), WithCloudSchedulerSourceSinkURI(schedulerSinkURL)), }}, @@ -1006,6 +1011,7 @@ func TestAllCases(t *testing.T) { WithInitCloudSchedulerSourceConditions, WithCloudSchedulerSourceTopicReady(testTopicID, testProject), WithCloudSchedulerSourcePullSubscriptionReady(), + WithCloudSchedulerSourceSubscriptionID(SubscriptionID), WithCloudSchedulerSourceJobReady(jobName), WithCloudSchedulerSourceSinkURI(schedulerSinkURL)), }}, @@ -1028,6 +1034,7 @@ func TestAllCases(t *testing.T) { WithInitCloudSchedulerSourceConditions, WithCloudSchedulerSourceTopicReady(testTopicID, testProject), WithCloudSchedulerSourcePullSubscriptionReady(), + WithCloudSchedulerSourceSubscriptionID(SubscriptionID), WithCloudSchedulerSourceJobReady(jobName), WithCloudSchedulerSourceSinkURI(schedulerSinkURL), WithCloudSchedulerSourceDeletionTimestamp, diff --git a/pkg/reconciler/events/storage/storage_test.go b/pkg/reconciler/events/storage/storage_test.go index afdd5d5235..2c5f538807 100644 --- a/pkg/reconciler/events/storage/storage_test.go +++ b/pkg/reconciler/events/storage/storage_test.go @@ -705,6 +705,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceTopicReady(testTopicID), WithCloudStorageSourceProjectID(testProject), WithCloudStorageSourcePullSubscriptionReady(), + WithCloudStorageSourceSubscriptionID(SubscriptionID), WithCloudStorageSourceSinkURI(storageSinkURL), WithCloudStorageSourceNotificationNotReady(reconciledNotificationFailed, fmt.Sprintf("%s: %s", failedToReconcileNotificationMsg, "create-client-induced-error")), ), @@ -773,6 +774,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceTopicReady(testTopicID), WithCloudStorageSourceProjectID(testProject), WithCloudStorageSourcePullSubscriptionReady(), + WithCloudStorageSourceSubscriptionID(SubscriptionID), WithCloudStorageSourceSinkURI(storageSinkURL), WithCloudStorageSourceNotificationNotReady(reconciledNotificationFailed, fmt.Sprintf("%s: %s", failedToReconcileNotificationMsg, "bucket-notifications-induced-error")), ), @@ -840,6 +842,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceTopicReady(testTopicID), WithCloudStorageSourceProjectID(testProject), WithCloudStorageSourcePullSubscriptionReady(), + WithCloudStorageSourceSubscriptionID(SubscriptionID), WithCloudStorageSourceSinkURI(storageSinkURL), WithCloudStorageSourceNotificationNotReady(reconciledNotificationFailed, fmt.Sprintf("%s: %s", failedToReconcileNotificationMsg, "bucket-add-notification-induced-error")), ), @@ -907,6 +910,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceTopicReady(testTopicID), WithCloudStorageSourceProjectID(testProject), WithCloudStorageSourcePullSubscriptionReady(), + WithCloudStorageSourceSubscriptionID(SubscriptionID), WithCloudStorageSourceSinkURI(storageSinkURL), WithCloudStorageSourceNotificationReady(notificationId)), }}, diff --git a/pkg/reconciler/intevents/reconciler.go b/pkg/reconciler/intevents/reconciler.go index 5328ffd511..6e0857f964 100644 --- a/pkg/reconciler/intevents/reconciler.go +++ b/pkg/reconciler/intevents/reconciler.go @@ -184,6 +184,7 @@ func (psb *PubSubBase) ReconcilePullSubscription(ctx context.Context, pubsubable return ps, pkgreconciler.NewEvent(corev1.EventTypeWarning, PullSubscriptionStatusPropagateFailedReason, "Failed to propagate PullSubscription status: %s", err.Error()) } + status.SubscriptionID = ps.Status.SubscriptionID status.SinkURI = ps.Status.SinkURI return ps, nil } diff --git a/pkg/reconciler/testing/auditlogs.go b/pkg/reconciler/testing/auditlogs.go index 651b82e6b1..4d41fa86a2 100644 --- a/pkg/reconciler/testing/auditlogs.go +++ b/pkg/reconciler/testing/auditlogs.go @@ -133,6 +133,12 @@ func WithCloudAuditLogsSourceProjectID(projectID string) CloudAuditLogsSourceOpt } } +func WithCloudAuditLogsSourceSubscriptionID(subscriptionID string) CloudAuditLogsSourceOption { + return func(s *v1alpha1.CloudAuditLogsSource) { + s.Status.SubscriptionID = subscriptionID + } +} + func WithCloudAuditLogsSourceSinkID(sinkID string) CloudAuditLogsSourceOption { return func(s *v1alpha1.CloudAuditLogsSource) { s.Status.StackdriverSink = sinkID diff --git a/pkg/reconciler/testing/build.go b/pkg/reconciler/testing/build.go index b9ebba452c..4840036b57 100644 --- a/pkg/reconciler/testing/build.go +++ b/pkg/reconciler/testing/build.go @@ -133,6 +133,12 @@ func WithCloudBuildSourceSinkURI(url *apis.URL) CloudBuildSourceOption { } } +func WithCloudBuildSourceSubscriptionID(subscriptionID string) CloudBuildSourceOption { + return func(bs *v1alpha1.CloudBuildSource) { + bs.Status.SubscriptionID = subscriptionID + } +} + func WithCloudBuildSourceFinalizers(finalizers ...string) CloudBuildSourceOption { return func(bs *v1alpha1.CloudBuildSource) { bs.Finalizers = finalizers diff --git a/pkg/reconciler/testing/pubsub.go b/pkg/reconciler/testing/pubsub.go index dca022b870..0c07aebbc5 100644 --- a/pkg/reconciler/testing/pubsub.go +++ b/pkg/reconciler/testing/pubsub.go @@ -131,6 +131,12 @@ func WithCloudPubSubSourceSinkURI(url *apis.URL) CloudPubSubSourceOption { } } +func WithCloudPubSubSourceSubscriptionID(subscriptionID string) CloudPubSubSourceOption { + return func(ps *v1alpha1.CloudPubSubSource) { + ps.Status.SubscriptionID = subscriptionID + } +} + func WithCloudPubSubSourceFinalizers(finalizers ...string) CloudPubSubSourceOption { return func(ps *v1alpha1.CloudPubSubSource) { ps.Finalizers = finalizers diff --git a/pkg/reconciler/testing/pullsubscription.go b/pkg/reconciler/testing/pullsubscription.go index 2bc983a954..7802cdd286 100644 --- a/pkg/reconciler/testing/pullsubscription.go +++ b/pkg/reconciler/testing/pullsubscription.go @@ -33,6 +33,9 @@ import ( // PullSubscriptionOption enables further configuration of a PullSubscription. type PullSubscriptionOption func(*v1alpha1.PullSubscription) +const ( + SubscriptionID = "subID" +) // NewPullSubscription creates a PullSubscription with PullSubscriptionOptions func NewPullSubscription(name, namespace string, so ...PullSubscriptionOption) *v1alpha1.PullSubscription { s := &v1alpha1.PullSubscription{ @@ -189,7 +192,7 @@ func WithPullSubscriptionReady(sink *apis.URL) PullSubscriptionOption { s.Status.InitializeConditions() s.Status.MarkSink(sink) s.Status.MarkDeployed() - s.Status.MarkSubscribed("subID") + s.Status.MarkSubscribed(SubscriptionID) } } diff --git a/pkg/reconciler/testing/scheduler.go b/pkg/reconciler/testing/scheduler.go index 27a0af02fe..a0fa2784f5 100644 --- a/pkg/reconciler/testing/scheduler.go +++ b/pkg/reconciler/testing/scheduler.go @@ -185,6 +185,12 @@ func WithCloudSchedulerSourceSinkURI(url *apis.URL) CloudSchedulerSourceOption { } } +func WithCloudSchedulerSourceSubscriptionID(subscriptionID string) CloudSchedulerSourceOption { + return func(s *v1alpha1.CloudSchedulerSource) { + s.Status.SubscriptionID = subscriptionID + } +} + // WithCloudSchedulerSourceJobName sets the status for job Name func WithCloudSchedulerSourceJobName(jobName string) CloudSchedulerSourceOption { return func(s *v1alpha1.CloudSchedulerSource) { diff --git a/pkg/reconciler/testing/storage.go b/pkg/reconciler/testing/storage.go index 318b40eb42..3e970ad5f7 100644 --- a/pkg/reconciler/testing/storage.go +++ b/pkg/reconciler/testing/storage.go @@ -204,6 +204,12 @@ func WithCloudStorageSourceProjectID(projectID string) CloudStorageSourceOption } } +func WithCloudStorageSourceSubscriptionID(subscriptionID string) CloudStorageSourceOption { + return func(s *v1alpha1.CloudStorageSource) { + s.Status.SubscriptionID = subscriptionID + } +} + func WithCloudStorageSourceStatusObservedGeneration(generation int64) CloudStorageSourceOption { return func(s *v1alpha1.CloudStorageSource) { s.Status.Status.ObservedGeneration = generation diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 57114b3d09..b8e6bb7ed4 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -174,7 +174,7 @@ func TestPullSubscriptionWithTarget(t *testing.T) { PullSubscriptionWithTargetTestImpl(t, authConfig) } -// TestSmokeCloudPubSubSource makes sure we can run tests on the CloudPubSubSource. +// TestSmokeCloudPubSubSource we can create a CloudPubSubSource to ready state and we can delete a CloudPubSubSource and its underlying resources. func TestSmokeCloudPubSubSource(t *testing.T) { cancel := logstream.Start(t) defer cancel() @@ -262,6 +262,13 @@ func TestCloudSchedulerSourceBrokerWithPubSubChannel(t *testing.T) { SchedulerSourceBrokerWithPubSubChannelTestImpl(t, authConfig) } +// TestSmokeCloudStorageSource tests if we can create a CloudStorageSource to ready state and delete a CloudStorageSource and its underlying resources. +func TestSmokeCloudStorageSource(t *testing.T) { + cancel := logstream.Start(t) + defer cancel() + SmokeCloudStorageSourceTestImpl(t, authConfig) +} + // TestCloudStorageSourceWithTarget tests we can knock down a target from a CloudStorageSource. func TestCloudStorageSourceWithTarget(t *testing.T) { cancel := logstream.Start(t) @@ -277,6 +284,13 @@ func TestCloudStorageSourceStackDriverMetrics(t *testing.T) { CloudStorageSourceWithTargetTestImpl(t, true /*assertMetrics */, authConfig) } +// TestSmokeCloudAuditLogsSource tests if we can create a CloudAuditLogsSource to ready state and delete a CloudAuditLogsSource and its underlying resources. +func TestSmokeCloudAuditLogsSource(t *testing.T) { + cancel := logstream.Start(t) + defer cancel() + SmokeCloudAuditLogsSourceTestImpl(t, authConfig) +} + // TestCloudAuditLogsSource tests we can knock down a target from an CloudAuditLogsSource. func TestCloudAuditLogsSourceWithTarget(t *testing.T) { cancel := logstream.Start(t) @@ -284,11 +298,11 @@ func TestCloudAuditLogsSourceWithTarget(t *testing.T) { CloudAuditLogsSourceWithTargetTestImpl(t, authConfig) } -// TestSmokeCloudSchedulerSourceSetup tests if we can create a CloudSchedulerSource resource and get it to a ready state. -func TestSmokeCloudSchedulerSourceSetup(t *testing.T) { +// TestSmokeCloudSchedulerSource tests if we can create a CloudSchedulerSource to ready state and delete a CloudSchedulerSource and its underlying resources. +func TestSmokeCloudSchedulerSource(t *testing.T) { cancel := logstream.Start(t) defer cancel() - SmokeCloudSchedulerSourceSetup(t, authConfig) + SmokeCloudSchedulerSourceTestImpl(t, authConfig) } // TestCloudSchedulerSourceWithTargetTestImpl tests if we can receive an event on a bespoke sink from a CloudSchedulerSource source. @@ -298,6 +312,14 @@ func TestCloudSchedulerSourceWithTargetTestImpl(t *testing.T) { CloudSchedulerSourceWithTargetTestImpl(t, authConfig) } +// TestSmokeGCPBroker tests if we can create a GCPBroker to ready state and delete a GCPBroker and its underlying resources. +func TestSmokeGCPBroker(t *testing.T) { + cancel := logstream.Start(t) + defer cancel() + SmokeGCPBrokerTestImpl(t, authConfig) +} + + // TestGCPBroker tests we can knock a Knative Service from a gcp broker. func TestGCPBroker(t *testing.T) { cancel := logstream.Start(t) diff --git a/test/e2e/lib/auditlogs.go b/test/e2e/lib/auditlogs.go index 6f4e1acc55..9cb1252589 100644 --- a/test/e2e/lib/auditlogs.go +++ b/test/e2e/lib/auditlogs.go @@ -17,11 +17,17 @@ limitations under the License. package lib import ( + "context" "fmt" + "google.golang.org/grpc/status" + "os" + "testing" + "cloud.google.com/go/logging/logadmin" "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing" "github.com/google/knative-gcp/test/e2e/lib/resources" + "google.golang.org/grpc/codes" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -75,3 +81,24 @@ func MakeAuditLogsJobOrDie(client *Client, methodName, project, resourceName, se }}) client.CreateJobOrFail(job, WithServiceForJob(targetName)) } + +func StackdriverSinkExists(t *testing.T, sinkID string) bool { + t.Helper() + ctx := context.Background() + project := os.Getenv(ProwProjectKey) + client, err := logadmin.NewClient(ctx, project) + if err != nil { + t.Fatalf("failed to create LogAdmin client, %s", err.Error()) + } + defer client.Close() + + _, err = client.Sink(ctx, sinkID) + if err != nil { + if status.Code(err) == codes.NotFound { + return false + } + + t.Fatalf("Failed from LogAdmin client while retrieving StackdriverSink %s with error %s", sinkID, err.Error()) + } + return true +} diff --git a/test/e2e/lib/creation.go b/test/e2e/lib/creation.go index ab2432e947..bcb6976512 100644 --- a/test/e2e/lib/creation.go +++ b/test/e2e/lib/creation.go @@ -17,6 +17,8 @@ limitations under the License. package lib import ( + "github.com/google/knative-gcp/pkg/apis/broker/v1beta1" + knativegcptestresources "github.com/google/knative-gcp/test/e2e/lib/resources" batchv1 "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/api/meta" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -105,6 +107,21 @@ func (c *Client) CreateSchedulerOrFail(scheduler *eventsv1alpha1.CloudSchedulerS c.Tracker.AddObj(scheduler) } +// CreateGCPBrokerV1Beta1OrFail will create a GCP Broker or fail the test if there is an error. +func (c *Client) CreateGCPBrokerV1Beta1OrFail(name string, options ...knativegcptestresources.BrokerV1Beta1Option) *v1beta1.Broker { + namespace := c.Namespace + broker := knativegcptestresources.BrokerV1Beta1(name, options...) + brokers := c.KnativeGCP.EventingV1beta1().Brokers(namespace) + c.T.Logf("Creating broker %s", name) + // update broker with the new reference + broker, err := brokers.Create(broker) + if err != nil { + c.T.Fatalf("Failed to create broker %q: %v", name, err) + } + c.Tracker.AddObj(broker) + return broker +} + // WithServiceForJob returns an option that creates a Service binded with the given job. func WithServiceForJob(name string) func(*batchv1.Job, *Client) error { return func(job *batchv1.Job, client *Client) error { diff --git a/test/e2e/lib/deletion.go b/test/e2e/lib/deletion.go new file mode 100644 index 0000000000..702995253f --- /dev/null +++ b/test/e2e/lib/deletion.go @@ -0,0 +1,67 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lib + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func (c *Client) DeletePubSubOrFail(name string) { + c.T.Helper() + pubsubs := c.KnativeGCP.EventsV1alpha1().CloudPubSubSources(c.Namespace) + err := pubsubs.Delete(name, &metav1.DeleteOptions{}) + if err != nil { + c.T.Fatalf("Failed to delete pubsub %s/%s: %v", c.Namespace, name, err) + } +} + +func (c *Client) DeleteSchedulerOrFail(name string) { + c.T.Helper() + schedulers := c.KnativeGCP.EventsV1alpha1().CloudSchedulerSources(c.Namespace) + err := schedulers.Delete(name, &metav1.DeleteOptions{}) + if err != nil { + c.T.Fatalf("Failed to delete scheduler %s/%s: %v", c.Namespace, name, err) + } +} + +func (c *Client) DeleteStorageOrFail(name string) { + c.T.Helper() + storages := c.KnativeGCP.EventsV1alpha1().CloudStorageSources(c.Namespace) + err := storages.Delete(name, &metav1.DeleteOptions{}) + if err != nil { + c.T.Fatalf("Failed to delete storage %s/%s: %v", c.Namespace, name, err) + } +} + +func (c *Client) DeleteAuditLogsOrFail(name string) { + c.T.Helper() + auditLogs := c.KnativeGCP.EventsV1alpha1().CloudAuditLogsSources(c.Namespace) + err := auditLogs.Delete(name, &metav1.DeleteOptions{}) + if err != nil { + c.T.Fatalf("Failed to delete storage %s/%s: %v", c.Namespace, name, err) + } +} + + +func (c *Client) DeleteGCPBrokerOrFail(name string) { + c.T.Helper() + brokers := c.KnativeGCP.EventingV1beta1().Brokers(c.Namespace) + err := brokers.Delete(name, &metav1.DeleteOptions{}) + if err != nil { + c.T.Fatalf("Failed to delete gcp broker %s/%s: %v", c.Namespace, name, err) + } +} diff --git a/test/e2e/lib/get.go b/test/e2e/lib/get.go new file mode 100644 index 0000000000..39de894d0d --- /dev/null +++ b/test/e2e/lib/get.go @@ -0,0 +1,64 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lib + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + eventsv1alpha1 "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" +) + +func (c *Client) GetPubSubOrFail(name string) *eventsv1alpha1.CloudPubSubSource { + c.T.Helper() + pubsubs := c.KnativeGCP.EventsV1alpha1().CloudPubSubSources(c.Namespace) + existing, err := pubsubs.Get(name, metav1.GetOptions{}) + if err != nil { + c.T.Fatalf("Failed to get pubsub %s/%s: %v", c.Namespace, name, err) + } + return existing +} + +func (c *Client) GetSchedulerOrFail(name string) *eventsv1alpha1.CloudSchedulerSource { + c.T.Helper() + schedulers:= c.KnativeGCP.EventsV1alpha1().CloudSchedulerSources(c.Namespace) + existing, err := schedulers.Get(name, metav1.GetOptions{}) + if err != nil { + c.T.Fatalf("Failed to get scheduler %s/%s: %v", c.Namespace, name, err) + } + return existing +} + +func (c *Client) GetStorageOrFail(name string) *eventsv1alpha1.CloudStorageSource{ + c.T.Helper() + storages:= c.KnativeGCP.EventsV1alpha1().CloudStorageSources(c.Namespace) + existing, err := storages.Get(name, metav1.GetOptions{}) + if err != nil { + c.T.Fatalf("Failed to get storages %s/%s: %v", c.Namespace, name, err) + } + return existing +} + + +func (c *Client) GetAuditLogsOrFail(name string) *eventsv1alpha1.CloudAuditLogsSource{ + c.T.Helper() + auditLogs:= c.KnativeGCP.EventsV1alpha1().CloudAuditLogsSources(c.Namespace) + existing, err := auditLogs.Get(name, metav1.GetOptions{}) + if err != nil { + c.T.Fatalf("Failed to get auditlogs %s/%s: %v", c.Namespace, name, err) + } + return existing +} diff --git a/test/e2e/lib/pubsub.go b/test/e2e/lib/pubsub.go index c9a5d966d5..35450c2f63 100644 --- a/test/e2e/lib/pubsub.go +++ b/test/e2e/lib/pubsub.go @@ -22,12 +22,12 @@ import ( "testing" "time" - "github.com/google/knative-gcp/test/e2e/lib/resources" v1 "k8s.io/api/core/v1" "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing" "github.com/google/knative-gcp/test/e2e/lib/metrics" + "github.com/google/knative-gcp/test/e2e/lib/resources" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgmetrics "knative.dev/pkg/metrics" ) @@ -41,8 +41,8 @@ func MakePubSubOrDie(client *Client, so = append(so, kngcptesting.WithCloudPubSubSourceSink(sinkGVK, sinkName)) so = append(so, kngcptesting.WithCloudPubSubSourceTopic(topicName)) so = append(so, kngcptesting.WithCloudPubSubSourceGCPServiceAccount(pubsubServiceAccount)) - eventsPubsub := kngcptesting.NewCloudPubSubSource(psName, client.Namespace, so...) - client.CreatePubSubOrFail(eventsPubsub) + eventsPubSub := kngcptesting.NewCloudPubSubSource(psName, client.Namespace, so...) + client.CreatePubSubOrFail(eventsPubSub) client.Core.WaitForResourceReadyOrFail(psName, CloudPubSubSourceTypeMeta) } diff --git a/test/e2e/lib/resources/constants.go b/test/e2e/lib/resources/constants.go index 28a5b097da..6fe63bd36a 100644 --- a/test/e2e/lib/resources/constants.go +++ b/test/e2e/lib/resources/constants.go @@ -16,6 +16,8 @@ limitations under the License. package resources +import "time" + // API versions for the resources. const ( BatchAPIVersion = "batch/v1" @@ -53,3 +55,9 @@ const ( const ( KServiceKind string = "Service" ) + +// WaitDeletionTime for deleting resources +const ( + WaitDeletionTime = 20 * time.Second +) + diff --git a/test/e2e/lib/resources/knative_gcp.go b/test/e2e/lib/resources/knative_gcp.go new file mode 100644 index 0000000000..9fcc9794f4 --- /dev/null +++ b/test/e2e/lib/resources/knative_gcp.go @@ -0,0 +1,58 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + + +package resources + +// This file contains functions that construct knative-gcp resources. + +import ( + "github.com/google/knative-gcp/pkg/apis/broker/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + +) + +// BrokerV1Beta1Option enables further configuration of a Broker. +type BrokerV1Beta1Option func(*v1beta1.Broker) + + +// WithBrokerClassForBrokerV1Beta1 returns a function that adds a brokerClass +// annotation to the given Broker. +func WithBrokerClassForBrokerV1Beta1(brokerClass string) BrokerV1Beta1Option { + return func(b *v1beta1.Broker) { + annotations := b.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string, 1) + } + annotations["eventing.knative.dev/broker.class"] = brokerClass + b.SetAnnotations(annotations) + } +} + + +// Broker returns a Broker. +func BrokerV1Beta1(name string, options ...BrokerV1Beta1Option) *v1beta1.Broker { + broker := &v1beta1.Broker{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + for _, option := range options { + option(broker) + } + return broker +} + diff --git a/test/e2e/lib/scheduler.go b/test/e2e/lib/scheduler.go index f175ff28c5..3b81afa05a 100644 --- a/test/e2e/lib/scheduler.go +++ b/test/e2e/lib/scheduler.go @@ -17,9 +17,16 @@ limitations under the License. package lib import ( + "context" + "testing" + + "github.com/google/knative-gcp/pkg/gclient/scheduler" kngcpresources "github.com/google/knative-gcp/pkg/reconciler/events/scheduler/resources" kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing" "github.com/google/knative-gcp/test/e2e/lib/resources" + schedulerpb "google.golang.org/genproto/googleapis/cloud/scheduler/v1" + "google.golang.org/grpc/codes" + gstatus "google.golang.org/grpc/status" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -62,3 +69,27 @@ func MakeSchedulerJobOrDie(client *Client, data, targetName, eventType string) { }) client.CreateJobOrFail(job, WithServiceForJob(targetName)) } + +func SchedulerJobExists(t *testing.T, jobName string) bool { + t.Helper() + ctx := context.Background() + client, err := scheduler.NewClient(ctx) + if err != nil { + t.Fatalf("failed to create scheduler client, %s", err.Error()) + } + defer client.Close() + + _, err = client.GetJob(ctx, &schedulerpb.GetJobRequest{Name: jobName}) + if err != nil { + st, ok := gstatus.FromError(err) + if !ok { + t.Fatalf("Failed from CloudSchedulerSource client while retrieving CloudSchedulerSource job %s with error %s", jobName, err.Error()) + } + if st.Code() == codes.NotFound { + return false + } + + t.Fatalf("Failed from CloudSchedulerSource client while retrieving CloudSchedulerSource job %s with error %s with status code %s", jobName, err.Error(), st.Code()) + } + return true +} diff --git a/test/e2e/lib/storage.go b/test/e2e/lib/storage.go index a74ccc2601..a9c52a1082 100644 --- a/test/e2e/lib/storage.go +++ b/test/e2e/lib/storage.go @@ -132,3 +132,27 @@ func getBucketHandle(ctx context.Context, t *testing.T, bucketName, project stri } return client.Bucket(bucketName) } + +func NotificationExists(t *testing.T, bucketName, notificationID string) bool { + t.Helper() + ctx := context.Background() + client, err := storage.NewClient(ctx) + if err != nil { + t.Fatalf("failed to create storage client, %s", err.Error()) + } + defer client.Close() + + client.Bucket(bucketName) + bucket := client.Bucket(bucketName) + + notifications, err := bucket.Notifications(ctx) + if err != nil { + t.Fatalf("Failed to fetch existing notifications %s", err.Error()) + } + + if _, ok := notifications[notificationID];ok { + return true + } + return false + +} diff --git a/test/e2e/lib/subscription.go b/test/e2e/lib/subscription.go new file mode 100644 index 0000000000..19ce812199 --- /dev/null +++ b/test/e2e/lib/subscription.go @@ -0,0 +1,50 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lib + +import ( + "context" + "os" + "testing" + + "cloud.google.com/go/pubsub" + + // The following line to load the gcp plugin (only required to authenticate against GKE clusters). + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" +) + + +func SubscriptionExists(t *testing.T, subID string) bool { + t.Helper() + ctx := context.Background() + // Prow sticks the project in this key + project := os.Getenv(ProwProjectKey) + if project == "" { + t.Fatalf("failed to find %q in envvars", ProwProjectKey) + } + client, err := pubsub.NewClient(ctx, project) + if err != nil { + t.Fatalf("failed to create pubsub client, %s", err.Error()) + } + defer client.Close() + sub:=client.Subscription(subID) + exists, err := sub.Exists(ctx) + if err != nil { + t.Fatalf("failed to verify whether Pub/Sub subscription exists, %s", err.Error()) + } + return exists +} diff --git a/test/e2e/lib/topics.go b/test/e2e/lib/topics.go index 7f5a7113b8..d898d0c1d9 100644 --- a/test/e2e/lib/topics.go +++ b/test/e2e/lib/topics.go @@ -121,3 +121,24 @@ func DeleteTopicOrDie(t *testing.T, topicName string) { } } } + +func TopicExists(t *testing.T, topicID string) bool { + t.Helper() + ctx := context.Background() + // Prow sticks the project in this key + project := os.Getenv(ProwProjectKey) + if project == "" { + t.Fatalf("failed to find %q in envvars", ProwProjectKey) + } + client, err := pubsub.NewClient(ctx, project) + if err != nil { + t.Fatalf("failed to create pubsub client, %s", err.Error()) + } + defer client.Close() + topic:=client.Topic(topicID) + exists, err := topic.Exists(ctx) + if err != nil { + t.Fatalf("failed to verify whether Pub/Sub topic exists, %s", err.Error()) + } + return exists +} diff --git a/test/e2e/test_auditlogs.go b/test/e2e/test_auditlogs.go index 11736a8870..448ae98a7b 100644 --- a/test/e2e/test_auditlogs.go +++ b/test/e2e/test_auditlogs.go @@ -25,6 +25,7 @@ import ( "github.com/google/knative-gcp/pkg/apis/events/v1beta1" "github.com/google/knative-gcp/test/e2e/lib" + "github.com/google/knative-gcp/test/e2e/lib/resources" "knative.dev/pkg/test/helpers" @@ -32,6 +33,71 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) +// SmokeCloudAuditLogsSourceTestImpl tests if a CloudAuditLogsSource object can be created to ready state and delete a CloudAuditLogsSource resource and its underlying resources.. +func SmokeCloudAuditLogsSourceTestImpl(t *testing.T, authConfig lib.AuthConfig) { + t.Helper() + client := lib.Setup(t, true, authConfig.WorkloadIdentity) + defer lib.TearDown(client) + + project := os.Getenv(lib.ProwProjectKey) + + auditlogsName := helpers.AppendRandomString("auditlogs-e2e-test") + svcName := helpers.AppendRandomString(auditlogsName + "-event-display") + topicName := helpers.AppendRandomString(auditlogsName + "-topic") + resourceName := fmt.Sprintf("projects/%s/topics/%s", project, topicName) + + lib.MakeAuditLogsOrDie(client, + lib.ServiceGVK, + auditlogsName, + lib.PubSubCreateTopicMethodName, + project, + resourceName, + lib.PubSubServiceName, + svcName, + authConfig.PubsubServiceAccount, + ) + + createdAuditLogs := client.GetAuditLogsOrFail(auditlogsName) + + topicID := createdAuditLogs.Status.TopicID + subID := createdAuditLogs.Status.SubscriptionID + sinkID := createdAuditLogs.Status.StackdriverSink + + createdSinkExists := lib.StackdriverSinkExists(t, sinkID) + if !createdSinkExists { + t.Errorf("Expected StackdriverSink%q to exist", sinkID) + } + + createdTopicExists := lib.TopicExists(t, topicID) + if !createdTopicExists { + t.Errorf("Expected topic%q to exist", topicID) + } + + createdSubExists := lib.SubscriptionExists(t, subID) + if !createdSubExists { + t.Errorf("Expected subscription %q to exist", subID) + } + client.DeleteAuditLogsOrFail(auditlogsName) + //Wait for 20 seconds for topic, subscription and notification to get deleted in gcp + time.Sleep(resources.WaitDeletionTime) + + + deletedSinkExists := lib.StackdriverSinkExists(t, sinkID) + if deletedSinkExists { + t.Errorf("Expected s%q StackdriverSink to get deleted", sinkID) + } + + deletedTopicExists := lib.TopicExists(t, topicID) + if deletedTopicExists { + t.Errorf("Expected topic %q to get deleted", topicID) + } + + deletedSubExists := lib.SubscriptionExists(t, subID) + if deletedSubExists { + t.Errorf("Expected subscription %q to get deleted", subID) + } +} + func CloudAuditLogsSourceWithTargetTestImpl(t *testing.T, authConfig lib.AuthConfig) { project := os.Getenv(lib.ProwProjectKey) diff --git a/test/e2e/test_gcp_broker.go b/test/e2e/test_gcp_broker.go index 359c36e8b0..b1f5923b5f 100644 --- a/test/e2e/test_gcp_broker.go +++ b/test/e2e/test_gcp_broker.go @@ -19,7 +19,11 @@ package e2e import ( "net/url" "testing" + "time" + "github.com/google/knative-gcp/pkg/apis/broker/v1beta1" + brokerresources "github.com/google/knative-gcp/pkg/reconciler/broker/resources" + knativegcptestresources "github.com/google/knative-gcp/test/e2e/lib/resources" eventingtestlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/duck" eventingtestresources "knative.dev/eventing/test/lib/resources" @@ -94,10 +98,54 @@ func SchedulerSourceWithGCPBrokerTestImpl(t *testing.T, authConfig lib.AuthConfi } +// SmokeGCPBrokerTestImpl tests we can create a GCPBroker to ready state and we can delete a GCPBroker and its underlying resources. +func SmokeGCPBrokerTestImpl(t *testing.T, authConfig lib.AuthConfig) { + client := lib.Setup(t, true, authConfig.WorkloadIdentity) + defer lib.TearDown(client) + + brokerName := helpers.AppendRandomString("gcp") + // Create a new GCP Broker. + gcpBroker := client.CreateGCPBrokerV1Beta1OrFail(brokerName, knativegcptestresources.WithBrokerClassForBrokerV1Beta1(v1beta1.BrokerClass)) + + // Wait for broker ready. + client.Core.WaitForResourceReadyOrFail(brokerName, eventingtestlib.BrokerTypeMeta) + + brokerresources.GenerateDecouplingTopicName(gcpBroker) + + topicID := brokerresources.GenerateDecouplingTopicName(gcpBroker) + subID := brokerresources.GenerateDecouplingSubscriptionName(gcpBroker) + + createdTopicExists := lib.TopicExists(t, topicID) + if !createdTopicExists { + t.Errorf("Expected topic%q to exist", topicID) + } + + createdSubExists := lib.SubscriptionExists(t, subID) + if !createdSubExists { + t.Errorf("Expected subscription %q to exist", subID) + } + + client.DeleteGCPBrokerOrFail(brokerName) + //Wait for 20 seconds for subscription to get deleted in gcp + time.Sleep(knativegcptestresources.WaitDeletionTime) + + deletedTopicExists := lib.TopicExists(t, topicID) + if deletedTopicExists { + t.Errorf("Expected topic %q to get deleted", topicID) + } + deletedSubExists := lib.SubscriptionExists(t, subID) + if deletedSubExists { + t.Errorf("Expected subscription %q to get deleted", subID) + } + t.Logf("topic id is: %v /n, sub id is: %v /n", topicID, subID) + t.Logf("createdSubExists id is: %t /n, deletedSubExists is: %t /n", createdSubExists, deletedSubExists) + t.Logf("createdTopicExists id is: %t /n, deletedTopicExists is: %t /n", createdTopicExists, deletedTopicExists) +} + func createGCPBroker(client *lib.Client) (url.URL, string) { brokerName := helpers.AppendRandomString("gcp") // Create a new GCP Broker. - client.Core.CreateBrokerV1Beta1OrFail(brokerName, eventingtestresources.WithBrokerClassForBrokerV1Beta1("googlecloud")) + client.CreateGCPBrokerV1Beta1OrFail(brokerName, knativegcptestresources.WithBrokerClassForBrokerV1Beta1(v1beta1.BrokerClass)) // Wait for broker ready. client.Core.WaitForResourceReadyOrFail(brokerName, eventingtestlib.BrokerTypeMeta) diff --git a/test/e2e/test_pubsub.go b/test/e2e/test_pubsub.go index 0aa88551dd..00ec81d698 100644 --- a/test/e2e/test_pubsub.go +++ b/test/e2e/test_pubsub.go @@ -22,6 +22,7 @@ import ( "fmt" "os" "testing" + "time" "cloud.google.com/go/pubsub" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -29,12 +30,13 @@ import ( "github.com/google/knative-gcp/pkg/apis/events/v1beta1" "github.com/google/knative-gcp/test/e2e/lib" + "github.com/google/knative-gcp/test/e2e/lib/resources" // The following line to load the gcp plugin (only required to authenticate against GKE clusters). _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) -// SmokeCloudPubSubSourceTestImpl tests we can create a CloudPubSubSource to ready state. +// SmokeCloudPubSubSourceTestImpl tests we can create a CloudPubSubSource to ready state and we can delete a CloudPubSubSource and its underlying resources. func SmokeCloudPubSubSourceTestImpl(t *testing.T, authConfig lib.AuthConfig) { t.Helper() topic, deleteTopic := lib.MakeTopicOrDie(t) @@ -51,8 +53,21 @@ func SmokeCloudPubSubSourceTestImpl(t *testing.T, authConfig lib.AuthConfig) { Version: "v1", Kind: "Service"}, psName, svcName, topic, authConfig.PubsubServiceAccount) - client.Core.WaitForResourceReadyOrFail(psName, lib.CloudPubSubSourceTypeMeta) + createdPubSub := client.GetPubSubOrFail(psName) + subID := createdPubSub.Status.SubscriptionID + createdSubExists := lib.SubscriptionExists(t, subID) + if !createdSubExists { + t.Errorf("Expected subscription %q to exist", subID) + } + + client.DeletePubSubOrFail(psName) + //Wait for 20 seconds for subscription to get deleted in gcp + time.Sleep(resources.WaitDeletionTime) + deletedSubExists := lib.SubscriptionExists(t, subID) + if deletedSubExists { + t.Errorf("Expected subscription %q to get deleted", subID) + } } // CloudPubSubSourceWithTargetTestImpl tests we can receive an event from a CloudPubSubSource. diff --git a/test/e2e/test_scheduler.go b/test/e2e/test_scheduler.go index efb9bfe098..15594e9a04 100644 --- a/test/e2e/test_scheduler.go +++ b/test/e2e/test_scheduler.go @@ -19,32 +19,66 @@ package e2e import ( "encoding/json" "testing" + "time" "knative.dev/pkg/test/helpers" "github.com/google/knative-gcp/pkg/apis/events/v1beta1" - kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing" "github.com/google/knative-gcp/test/e2e/lib" + "github.com/google/knative-gcp/test/e2e/lib/resources" ) -// SmokeCloudSchedulerSourceSetup tests if a CloudSchedulerSource object can be created and be made ready. -func SmokeCloudSchedulerSourceSetup(t *testing.T, authConfig lib.AuthConfig) { +// SmokeCloudSchedulerSourceTestImpl tests if a CloudSchedulerSource object can be created to ready state and delete a CloudSchedulerSource resource and its underlying resources.. +func SmokeCloudSchedulerSourceTestImpl(t *testing.T, authConfig lib.AuthConfig) { t.Helper() client := lib.Setup(t, true, authConfig.WorkloadIdentity) defer lib.TearDown(client) - sName := "scheduler-test" + // Create an Addressable to receive scheduler events + data := helpers.AppendRandomString("smoke-scheduler-source") + // Create the target and scheduler + schedulerName := helpers.AppendRandomString("scheduler") + svcName := "event-display" + lib.MakeSchedulerOrDie(client, lib.ServiceGVK, schedulerName, data, svcName, authConfig.PubsubServiceAccount) + + createdScheduler := client.GetSchedulerOrFail(schedulerName) - scheduler := kngcptesting.NewCloudSchedulerSource(sName, client.Namespace, - kngcptesting.WithCloudSchedulerSourceLocation("us-central1"), - kngcptesting.WithCloudSchedulerSourceData("my test data"), - kngcptesting.WithCloudSchedulerSourceSchedule("* * * * *"), - kngcptesting.WithCloudSchedulerSourceSink(lib.ServiceGVK, "event-display"), - kngcptesting.WithCloudSchedulerSourceGCPServiceAccount(authConfig.PubsubServiceAccount), - ) + topicID := createdScheduler.Status.TopicID + subID := createdScheduler.Status.SubscriptionID + jobName := createdScheduler.Status.JobName + + createdJobExists := lib.SchedulerJobExists(t, jobName) + if !createdJobExists { + t.Errorf("Expected job%q to exist", jobName) + } + + createdTopicExists := lib.TopicExists(t, topicID) + if !createdTopicExists { + t.Errorf("Expected topic%q to exist", topicID) + } - client.CreateSchedulerOrFail(scheduler) - client.Core.WaitForResourceReadyOrFail(sName, lib.CloudSchedulerSourceTypeMeta) + createdSubExists := lib.SubscriptionExists(t, subID) + if !createdSubExists { + t.Errorf("Expected subscription %q to exist", subID) + } + client.DeleteSchedulerOrFail(schedulerName) + //Wait for 20 seconds for topic, subscription and job to get deleted in gcp + time.Sleep(resources.WaitDeletionTime) + + deletedJobExists := lib.SchedulerJobExists(t, jobName) + if deletedJobExists { + t.Errorf("Expected job%q to get deleted", jobName) + } + + deletedTopicExists := lib.TopicExists(t, topicID) + if deletedTopicExists { + t.Errorf("Expected topic %q to get deleted", topicID) + } + + deletedSubExists := lib.SubscriptionExists(t, subID) + if deletedSubExists { + t.Errorf("Expected subscription %q to get deleted", subID) + } } // CloudSchedulerSourceWithTargetTestImpl injects a scheduler event and checks if it is in the diff --git a/test/e2e/test_storage.go b/test/e2e/test_storage.go index f633c05cad..b2b479d363 100644 --- a/test/e2e/test_storage.go +++ b/test/e2e/test_storage.go @@ -32,11 +32,68 @@ import ( "github.com/google/knative-gcp/pkg/apis/events/v1beta1" "github.com/google/knative-gcp/test/e2e/lib" "github.com/google/knative-gcp/test/e2e/lib/metrics" + "github.com/google/knative-gcp/test/e2e/lib/resources" // The following line to load the gcp plugin (only required to authenticate against GKE clusters). _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) +// SmokeCloudStorageSourceTestImpl tests if a CloudStorageSource object can be created to ready state and delete a CloudStorageSource resource and its underlying resources.. +func SmokeCloudStorageSourceTestImpl(t *testing.T, authConfig lib.AuthConfig) { + t.Helper() + client := lib.Setup(t, true, authConfig.WorkloadIdentity) + defer lib.TearDown(client) + + ctx := context.Background() + project := os.Getenv(lib.ProwProjectKey) + + bucketName := lib.MakeBucket(ctx, t, project) + storageName := helpers.AppendRandomString(bucketName + "-storage") + svcName := helpers.AppendRandomString(bucketName + "-event-display") + + //make the storage source + lib.MakeStorageOrDie(client, lib.ServiceGVK, bucketName, storageName, svcName, authConfig.PubsubServiceAccount) + + createdStorage := client.GetStorageOrFail(storageName) + + topicID := createdStorage.Status.TopicID + subID := createdStorage.Status.SubscriptionID + notificationID := createdStorage.Status.NotificationID + + createdNotificationExists := lib.NotificationExists(t, bucketName, notificationID) + if !createdNotificationExists { + t.Errorf("Expected notification%q to exist", topicID) + } + + createdTopicExists := lib.TopicExists(t, topicID) + if !createdTopicExists { + t.Errorf("Expected topic%q to exist", topicID) + } + + createdSubExists := lib.SubscriptionExists(t, subID) + if !createdSubExists { + t.Errorf("Expected subscription %q to exist", subID) + } + client.DeleteStorageOrFail(storageName) + //Wait for 20 seconds for topic, subscription and notification to get deleted in gcp + time.Sleep(resources.WaitDeletionTime) + + deletedNotificationExists := lib.NotificationExists(t, bucketName, notificationID) + if deletedNotificationExists { + t.Errorf("Expected notification%q tto get deleted", notificationID) + } + + deletedTopicExists := lib.TopicExists(t, topicID) + if deletedTopicExists { + t.Errorf("Expected topic %q to get deleted", topicID) + } + + deletedSubExists := lib.SubscriptionExists(t, subID) + if deletedSubExists { + t.Errorf("Expected subscription %q to get deleted", subID) + } +} + func CloudStorageSourceWithTargetTestImpl(t *testing.T, assertMetrics bool, authConfig lib.AuthConfig) { t.Helper() ctx := context.Background()