Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
add e2e test for AuditLogs>Broker with PubSub Channel
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolas Lopez committed Feb 21, 2020
1 parent c14c445 commit f4dede1
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 43 deletions.
9 changes: 8 additions & 1 deletion test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,18 @@ func TestCloudPubSubSourceBrokerWithPubSubChannel(t *testing.T) {
PubSubSourceBrokerWithPubSubChannelTestImpl(t)
}

// TestCloudStorageSourceBrokerWithPubSubChannel tests we can knock a Knative Service from a broker with PubSub Channel from a CloudPubSubSource.

// TestCloudStorageSourceBrokerWithPubSubChannel tests we can knock a Knative Service from a broker with PubSub Channel from a CloudStorageSource.
func TestCloudStorageSourceBrokerWithPubSubChannel(t *testing.T) {
cancel := logstream.Start(t)
defer cancel()
StorageSourceBrokerWithPubSubChannelTestImpl(t)

// TestCloudAuditLogsSourceBrokerWithPubSubChannel tests we can knock a Knative Service from a broker with PubSub Channel from a CloudAuditLogsSource.
func TestCloudAuditLogsSourceBrokerWithPubSubChannel(t *testing.T) {
cancel := logstream.Start(t)
defer cancel()
AuditLogsSourceBrokerWithPubSubChannelTestImpl(t)
}

// TestCloudStorageSource tests we can knock down a target from a CloudStorageSource.
Expand Down
67 changes: 67 additions & 0 deletions test/e2e/lib/auditlogs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright 2020 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lib

import (
"fmt"

"github.com/google/knative-gcp/pkg/apis/events/v1alpha1"
kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing"
"github.com/google/knative-gcp/test/e2e/lib/resources"
v1 "k8s.io/api/core/v1"
)

func MakeAuditLogsOrDie(client *Client,
auditlogsName, methodName, project, resourceName, serviceName, targetName string,
so ...kngcptesting.CloudAuditLogsSourceOption,
) {
so = append(so, kngcptesting.WithCloudAuditLogsSourceServiceName(serviceName))
so = append(so, kngcptesting.WithCloudAuditLogsSourceMethodName(methodName))
so = append(so, kngcptesting.WithCloudAuditLogsSourceProject(project))
so = append(so, kngcptesting.WithCloudAuditLogsSourceResourceName(resourceName))
so = append(so, kngcptesting.WithCloudAuditLogsSourceSink(ServiceGVK, targetName))
eventsAuditLogs := kngcptesting.NewCloudAuditLogsSource(auditlogsName, client.Namespace, so...)
client.CreateAuditLogsOrFail(eventsAuditLogs)

client.Core.WaitForResourceReadyOrFail(auditlogsName, CloudAuditLogsSourceTypeMeta)
}

func MakeAuditLogsJobOrDie(client *Client, methodName, project, resourceName, serviceName, targetName string) {
job := resources.AuditLogsTargetJob(targetName, []v1.EnvVar{{
Name: "SERVICENAME",
Value: serviceName,
}, {
Name: "METHODNAME",
Value: methodName,
}, {
Name: "RESOURCENAME",
Value: resourceName,
}, {
Name: "TYPE",
Value: v1alpha1.CloudAuditLogsSourceEvent,
}, {
Name: "SOURCE",
Value: v1alpha1.CloudAuditLogsSourceEventSource(serviceName, fmt.Sprintf("projects/%s", project)),
}, {
Name: "SUBJECT",
Value: resourceName,
}, {
Name: "TIME",
Value: "360",
}})
client.CreateJobOrFail(job, WithServiceForJob(targetName))
}
3 changes: 1 addition & 2 deletions test/e2e/lib/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import (
pkgmetrics "knative.dev/pkg/metrics"
)

func MakePubSubOrDie(t *testing.T,
client *Client,
func MakePubSubOrDie(client *Client,
gvk metav1.GroupVersionKind,
psName, targetName, topicName string,
so ...kngcptesting.CloudPubSubSourceOption,
Expand Down
44 changes: 9 additions & 35 deletions test/e2e/test_auditlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@ import (
"testing"
"time"

v1 "k8s.io/api/core/v1"

"github.com/google/knative-gcp/pkg/apis/events/v1alpha1"
kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing"
"github.com/google/knative-gcp/test/e2e/lib"
"github.com/google/knative-gcp/test/e2e/lib/resources"

"knative.dev/pkg/test/helpers"

Expand All @@ -53,38 +48,17 @@ func CloudAuditLogsSourceWithTestImpl(t *testing.T) {
defer lib.TearDown(client)

// Create a target Job to receive the events.
job := resources.AuditLogsTargetJob(targetName, []v1.EnvVar{{
Name: "SERVICENAME",
Value: serviceName,
}, {
Name: "METHODNAME",
Value: methodName,
}, {
Name: "RESOURCENAME",
Value: resourceName,
}, {
Name: "TYPE",
Value: v1alpha1.CloudAuditLogsSourceEvent,
}, {
Name: "SOURCE",
Value: v1alpha1.CloudAuditLogsSourceEventSource(serviceName, fmt.Sprintf("projects/%s", project)),
}, {
Name: "SUBJECT",
Value: resourceName,
}, {
Name: "TIME",
Value: "360",
}})
client.CreateJobOrFail(job, lib.WithServiceForJob(targetName))
lib.MakeAuditLogsJobOrDie(client, methodName, project, resourceName, serviceName, targetName)

// Create the CloudAuditLogsSource.
eventsAuditLogs := kngcptesting.NewCloudAuditLogsSource(auditlogsName, client.Namespace,
kngcptesting.WithCloudAuditLogsSourceServiceName(serviceName),
kngcptesting.WithCloudAuditLogsSourceMethodName(methodName),
kngcptesting.WithCloudAuditLogsSourceProject(project),
kngcptesting.WithCloudAuditLogsSourceResourceName(resourceName),
kngcptesting.WithCloudAuditLogsSourceSink(lib.ServiceGVK, targetName))
client.CreateAuditLogsOrFail(eventsAuditLogs)
lib.MakeAuditLogsOrDie(client,
auditlogsName,
methodName,
project,
resourceName,
serviceName,
targetName,
)

client.Core.WaitForResourceReadyOrFail(auditlogsName, lib.CloudAuditLogsSourceTypeMeta)

Expand Down
72 changes: 69 additions & 3 deletions test/e2e/test_broker_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package e2e
import (
"context"
"encoding/json"
"fmt"
"net/url"
"os"
"testing"
Expand Down Expand Up @@ -104,14 +105,12 @@ func PubSubSourceBrokerWithPubSubChannelTestImpl(t *testing.T) {
makeTargetJobOrDie(client, targetName)

u := createBrokerWithPubSubChannel(t, client, targetName)

var url apis.URL = apis.URL(u)
// Just to make sure all resources are ready.
time.Sleep(5 * time.Second)

// Create the PubSub source.
lib.MakePubSubOrDie(t,
client,
lib.MakePubSubOrDie(client,
lib.ServiceGVK,
psName,
targetName,
Expand Down Expand Up @@ -181,6 +180,73 @@ func StorageSourceBrokerWithPubSubChannelTestImpl(t *testing.T) {
}
}

func AuditLogsSourceBrokerWithPubSubChannelTestImpl(t *testing.T) {
project := os.Getenv(lib.ProwProjectKey)

auditlogsName := helpers.AppendRandomString("auditlogs-e2e-test")
targetName := helpers.AppendRandomString(auditlogsName + "-target")
topicName := helpers.AppendRandomString(auditlogsName + "-topic")
resourceName := fmt.Sprintf("projects/%s/topics/%s", project, topicName)

client := lib.Setup(t, true)
defer lib.TearDown(client)

// Create a target Job to receive the events.
lib.MakeAuditLogsJobOrDie(client, methodName, project, resourceName, serviceName, targetName)

u := createBrokerWithPubSubChannel(t, client, targetName)

var url apis.URL = apis.URL(u)
// Just to make sure all resources are ready.
time.Sleep(5 * time.Second)

// Create the CloudAuditLogsSource.
lib.MakeAuditLogsOrDie(client,
auditlogsName,
methodName,
project,
resourceName,
serviceName,
targetName,
kngcptesting.WithCloudAuditLogsSourceSinkURI(&url),
)

// Audit logs source misses the topic which gets created shortly after the source becomes ready. Need to wait for a few seconds.
// Tried with 45 seconds but the test has been quite flaky.
time.Sleep(90 * time.Second)
topicName, deleteTopic := lib.MakeTopicWithNameOrDie(t, topicName)
defer deleteTopic()

msg, err := client.WaitUntilJobDone(client.Namespace, targetName)
if err != nil {
t.Error(err)
}

t.Logf("Last term message => %s", msg)

if msg != "" {
out := &lib.TargetOutput{}
if err := json.Unmarshal([]byte(msg), out); err != nil {
t.Error(err)
}
if !out.Success {
// Log the output cloudauditlogssource pods.
if logs, err := client.LogsFor(client.Namespace, auditlogsName, lib.CloudAuditLogsSourceTypeMeta); err != nil {
t.Error(err)
} else {
t.Logf("cloudauditlogssource: %+v", logs)
}
// Log the output of the target job pods.
if logs, err := client.LogsFor(client.Namespace, targetName, lib.JobTypeMeta); err != nil {
t.Error(err)
} else {
t.Logf("job: %s\n", logs)
}
t.Fail()
}
}
}

func createBrokerWithPubSubChannel(t *testing.T, client *lib.Client, targetName string) url.URL {
brokerName := helpers.AppendRandomString("pubsub")
dummyTriggerName := "dummy-broker-" + brokerName
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/test_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func SmokeCloudPubSubSourceTestImpl(t *testing.T) {
defer lib.TearDown(client)

// Create the PubSub source.
lib.MakePubSubOrDie(t, client, metav1.GroupVersionKind{
lib.MakePubSubOrDie(client, metav1.GroupVersionKind{
Version: "v1",
Kind: "Service"}, psName, svcName, topic)

Expand Down Expand Up @@ -76,7 +76,7 @@ func CloudPubSubSourceWithTargetTestImpl(t *testing.T, assertMetrics bool) {
client.CreateJobOrFail(job, lib.WithServiceForJob(targetName))

// Create the PubSub source.
lib.MakePubSubOrDie(t, client, lib.ServiceGVK, psName, targetName, topicName)
lib.MakePubSubOrDie(client, lib.ServiceGVK, psName, targetName, topicName)

topic := lib.GetTopic(t, topicName)

Expand Down

0 comments on commit f4dede1

Please sign in to comment.