Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Update sources event CE types, sources, subjects #1340

Merged
merged 9 commits into from
Jun 26, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions pkg/apis/events/v1beta1/cloudauditlogssource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,20 @@ var auditLogsSourceCondSet = apis.NewLivingConditionSet(
)

const (
CloudAuditLogsSourceEvent = "com.google.cloud.auditlog.event"
// Ref: https://github.com/googleapis/google-cloudevents/blob/master/proto/google/events/cloud/audit/v1/events.proto#L27
// TODO: somehow reference the proto value directly.
CloudAuditLogsSourceLogWrittenEventType = "google.cloud.audit.log.v1.written"
CloudAuditLogsSourceEventDataSchema = "https://github.com/googleapis/google-cloudevents/blob/master/proto/google/events/cloud/audit/v1/events.proto"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put all of these constants in register.go maybe? outside v1beta1... When we do v1, we will be copy/pasting them otherwise. Similar thing happened with the current values... They are both in v1alpha1 and v1beta1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually moved this out of the apis folder. I feel it makes more sense as they eventually should be replaced by the code generated from the protos.

)

// CloudAuditLogsSourceEventSource returns the Cloud Audit Logs CloudEvent source value.
func CloudAuditLogsSourceEventSource(serviceName, parentResource string) string {
return fmt.Sprintf("//%s/%s", serviceName, parentResource)
// Format e.g. //cloudaudit.googleapis.com/projects/project-id/[activity|data_access]
func CloudAuditLogsSourceEventSource(parentResource, activity string) string {
src := fmt.Sprintf("//cloudaudit.googleapis.com/%s", parentResource)
if activity != "" {
src = src + "/" + activity
}
return src
}

// CloudAuditLogsSourceEventID returns the Cloud Audit Logs CloudEvent id value.
Expand All @@ -82,6 +90,11 @@ func CloudAuditLogsSourceEventID(id, logName, timestamp string) string {
return fmt.Sprintf("%x", md5.Sum([]byte(id+logName+timestamp)))
}

// CloudAuditLogsSourceEventSubject returns the Cloud Audit Logs CloudEvent subject value.
func CloudAuditLogsSourceEventSubject(serviceName, resourceName string) string {
return fmt.Sprintf("%s/%s", serviceName, resourceName)
}

type CloudAuditLogsSourceSpec struct {
// This brings in the PubSub based Source Specs. Includes:
duckv1beta1.PubSubSpec `json:",inline"`
Expand Down
12 changes: 10 additions & 2 deletions pkg/apis/events/v1beta1/cloudauditlogssource_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,23 @@ func TestAuditLogsConditionSet(t *testing.T) {
}

func TestCloudAuditLogsSourceEventSource(t *testing.T) {
want := "//pubsub.googleapis.com/projects/PROJECT"
want := "//cloudaudit.googleapis.com/projects/PROJECT/activity"

got := CloudAuditLogsSourceEventSource("pubsub.googleapis.com", "projects/PROJECT")
got := CloudAuditLogsSourceEventSource("projects/PROJECT", "activity")

if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("failed to get expected (-want, +got) = %v", diff)
}
}

func TestCloudAuditLogsSourceEventSubject(t *testing.T) {
want := "pubsub.googleapis.com/projects/PROJECT/topics/TOPIC"
got := CloudAuditLogsSourceEventSubject("pubsub.googleapis.com", "projects/PROJECT/topics/TOPIC")
if got != want {
t.Errorf("CloudAuditLogsSourceEventSubject got=%s, want=%s", got, want)
}
}

func TestCloudAuditLogsSourceEventID(t *testing.T) {
want := "efdb9bf7d6fdfc922352530c1ba51242"

Expand Down
6 changes: 4 additions & 2 deletions pkg/apis/events/v1beta1/cloudpubsubsource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ func CloudPubSubSourceEventSource(googleCloudProject, topic string) string {
}

const (
// CloudPubSubSource CloudEvent type
CloudPubSubSourcePublish = "com.google.cloud.pubsub.topic.publish"
// ref: https://github.com/googleapis/google-cloudevents/blob/master/proto/google/events/cloud/pubsub/v1/events.proto#L26
// TODO: somehow reference the proto value directly.
CloudPubSubSourceMessagePublishedEventType = "google.cloud.pubsub.topic.v1.messagePublished"
CloudPubSubSourceEventDataSchema = "https://github.com/googleapis/google-cloudevents/blob/master/proto/google/events/cloud/pubsub/v1/events.proto"
yolocs marked this conversation as resolved.
Show resolved Hide resolved
)

const (
Expand Down
9 changes: 6 additions & 3 deletions pkg/apis/events/v1beta1/cloudschedulersource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,18 @@ var (

const (
// CloudEvent types used by CloudSchedulerSource.
CloudSchedulerSourceExecute = "com.google.cloud.scheduler.job.execute"
// ref: https://github.com/googleapis/google-cloudevents/blob/master/proto/google/events/cloud/scheduler/v1/events.proto#L26
// TODO: somehow reference the proto values directly.
CloudSchedulerSourceJobExecutedEventType = "google.cloud.scheduler.job.v1.executed"
CloudSchedulerSourceEventDataSchema = "https://github.com/googleapis/google-cloudevents/blob/master/proto/google/events/cloud/scheduler/v1/events.proto"
// CloudSchedulerSourceJobName is the Pub/Sub message attribute key with the CloudSchedulerSource's job name.
CloudSchedulerSourceJobName = "jobName"
// CloudSchedulerSourceName is the Pub/Sub message attribute key with the CloudSchedulerSource's name.
CloudSchedulerSourceName = "schedulerName"
)

func CloudSchedulerSourceEventSource(parent, scheduler string) string {
return fmt.Sprintf("//cloudscheduler.googleapis.com/%s/schedulers/%s", parent, scheduler)
func CloudSchedulerSourceEventSource(jobName string) string {
return fmt.Sprintf("//cloudscheduler.googleapis.com/%s", jobName)
}

// CloudSchedulerSourceSpec is the spec for a CloudSchedulerSource resource
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/events/v1beta1/cloudschedulersource_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func TestCloudSchedulerSourceGetGroupVersionKind(t *testing.T) {
}

func TestCloudSchedulerSourceEventSource(t *testing.T) {
want := "//cloudscheduler.googleapis.com/PARENT/schedulers/SCHEDULER"
want := "//cloudscheduler.googleapis.com/JOB_NAME"

got := CloudSchedulerSourceEventSource("PARENT", "SCHEDULER")
got := CloudSchedulerSourceEventSource("JOB_NAME")

if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("failed to get expected (-want, +got) = %v", diff)
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/events/v1beta1/cloudstoragesource_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
duckv1beta1 "github.com/google/knative-gcp/pkg/apis/duck/v1beta1"
)

var allEventTypes = []string{CloudStorageSourceFinalize, CloudStorageSourceDelete, CloudStorageSourceArchive, CloudStorageSourceMetadataUpdate}
var allEventTypes = []string{CloudStorageSourceObjectFinalizedEventType, CloudStorageSourceObjectDeletedEventType, CloudStorageSourceObjectArchivedEventType, CloudStorageSourceObjectMetadataUpdatedEventType}

func (s *CloudStorageSource) SetDefaults(ctx context.Context) {
ctx = apis.WithinParent(ctx, s.ObjectMeta)
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/events/v1beta1/cloudstoragesource_defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestCloudStorageSource_SetDefaults(t *testing.T) {
},
"defaults present": {
orig: &CloudStorageSourceSpec{
EventTypes: []string{CloudStorageSourceFinalize, CloudStorageSourceDelete},
EventTypes: []string{CloudStorageSourceObjectFinalizedEventType, CloudStorageSourceObjectDeletedEventType},
PubSubSpec: duckv1beta1.PubSubSpec{
Secret: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Expand All @@ -58,7 +58,7 @@ func TestCloudStorageSource_SetDefaults(t *testing.T) {
},
},
expected: &CloudStorageSourceSpec{
EventTypes: []string{CloudStorageSourceFinalize, CloudStorageSourceDelete},
EventTypes: []string{CloudStorageSourceObjectFinalizedEventType, CloudStorageSourceObjectDeletedEventType},
PubSubSpec: duckv1beta1.PubSubSpec{
Secret: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Expand Down
10 changes: 6 additions & 4 deletions pkg/apis/events/v1beta1/cloudstoragesource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,12 @@ type CloudStorageSourceSpec struct {

const (
// CloudEvent types used by CloudStorageSource.
CloudStorageSourceFinalize = "com.google.cloud.storage.object.finalize"
CloudStorageSourceArchive = "com.google.cloud.storage.object.archive"
CloudStorageSourceDelete = "com.google.cloud.storage.object.delete"
CloudStorageSourceMetadataUpdate = "com.google.cloud.storage.object.metadataUpdate"
// TODO: somehow reference the proto values directly.
CloudStorageSourceObjectFinalizedEventType = "google.cloud.storage.object.v1.finalized"
CloudStorageSourceObjectArchivedEventType = "google.cloud.storage.object.v1.archived"
CloudStorageSourceObjectDeletedEventType = "google.cloud.storage.object.v1.deleted"
CloudStorageSourceObjectMetadataUpdatedEventType = "google.cloud.storage.object.v1.metadataUpdated"
CloudStorageSourceEventDataSchema = "https://github.com/googleapis/google-cloudevents/blob/master/proto/google/events/cloud/storage/v1/events.proto"

// CloudEvent source prefix.
storageSourcePrefix = "//storage.googleapis.com/buckets"
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/events/v1beta1/cloudstoragesource_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var (
// Bucket, Sink, Secret, Event Type and Project, ObjectNamePrefix and PayloadFormat
storageSourceSpec = CloudStorageSourceSpec{
Bucket: "my-test-bucket",
EventTypes: []string{CloudStorageSourceFinalize, CloudStorageSourceDelete},
EventTypes: []string{CloudStorageSourceObjectFinalizedEventType, CloudStorageSourceObjectDeletedEventType},
ObjectNamePrefix: "test-prefix",
PayloadFormat: cloudevents.ApplicationJSON,
PubSubSpec: duckv1beta1.PubSubSpec{
Expand Down Expand Up @@ -366,7 +366,7 @@ func TestCheckImmutableFields(t *testing.T) {
orig: &storageSourceSpec,
updated: CloudStorageSourceSpec{
Bucket: storageSourceSpec.Bucket,
EventTypes: []string{CloudStorageSourceMetadataUpdate},
EventTypes: []string{CloudStorageSourceObjectMetadataUpdatedEventType},
ObjectNamePrefix: storageSourceSpec.ObjectNamePrefix,
PayloadFormat: storageSourceSpec.PayloadFormat,
PubSubSpec: storageSourceSpec.PubSubSpec,
Expand Down
22 changes: 16 additions & 6 deletions pkg/pubsub/adapter/converters/auditlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ import (
)

const (
logEntrySchema = "type.googleapis.com/google.logging.v2.LogEntry"

parentResourcePattern = `^(:?projects|organizations|billingAccounts|folders)/[^/]+`

serviceNameExtension = "servicename"
Expand Down Expand Up @@ -101,6 +99,16 @@ func resolveAnyUnknowns(typeURL string) (proto.Message, error) {
return reflect.New(mt.Elem()).Interface().(proto.Message), nil
}

// Log name, e.g. "organizations/1234567890/logs/cloudresourcemanager.googleapis.com%2Factivity"
func logActivity(logName string) string {
parts := strings.Split(logName, "%2F")
if len(parts) < 2 {
return ""
}
// Could be "activity" or "data_access"
return parts[1]
}

func convertCloudAuditLogs(ctx context.Context, msg *pubsub.Message) (*cev2.Event, error) {
entry := logpb.LogEntry{}
if err := jsonpbUnmarshaller.Unmarshal(bytes.NewReader(msg.Data), &entry); err != nil {
Expand All @@ -111,6 +119,7 @@ func convertCloudAuditLogs(ctx context.Context, msg *pubsub.Message) (*cev2.Even
if parentResource == "" {
return nil, fmt.Errorf("invalid LogName: %q", entry.LogName)
}
logActivity := logActivity(entry.LogName)

// Make a new event and convert the message payload.
event := cev2.NewEvent(cev2.VersionV1)
Expand All @@ -120,8 +129,10 @@ func convertCloudAuditLogs(ctx context.Context, msg *pubsub.Message) (*cev2.Even
} else {
event.SetTime(timestamp)
}
event.SetType(v1beta1.CloudAuditLogsSourceLogWrittenEventType)
event.SetSource(v1beta1.CloudAuditLogsSourceEventSource(parentResource, logActivity))
event.SetDataSchema(v1beta1.CloudAuditLogsSourceEventDataSchema)
event.SetData(cev2.ApplicationJSON, msg.Data)
event.SetDataSchema(logEntrySchema)

switch payload := entry.Payload.(type) {
case *logpb.LogEntry_ProtoPayload:
Expand All @@ -131,9 +142,8 @@ func convertCloudAuditLogs(ctx context.Context, msg *pubsub.Message) (*cev2.Even
}
switch proto := unpacked.Message.(type) {
case *auditpb.AuditLog:
event.SetType(v1beta1.CloudAuditLogsSourceEvent)
event.SetSource(v1beta1.CloudAuditLogsSourceEventSource(proto.ServiceName, parentResource))
event.SetSubject(proto.ResourceName)
event.SetSubject(v1beta1.CloudAuditLogsSourceEventSubject(proto.ServiceName, proto.ResourceName))
// TODO: figure out if we want to keep these extensions.
event.SetExtension(serviceNameExtension, proto.ServiceName)
event.SetExtension(methodNameExtension, proto.MethodName)
event.SetExtension(resourceNameExtension, proto.ResourceName)
Expand Down
21 changes: 12 additions & 9 deletions pkg/pubsub/adapter/converters/auditlogs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ import (

const (
insertID = "test-insert-id"
logName = "projects/test-project/test-log-name"
logName = "projects/test-project/pubsub.googleapis.com%2Factivity"
testTs = "2006-01-02T15:04:05Z"
)

func TestConvertAuditLog(t *testing.T) {
auditLog := auditpb.AuditLog{
ServiceName: "test-service-name",
ServiceName: "pubsub.googleapis.com",
MethodName: "test-method-name",
ResourceName: "test-resource-name",
ResourceName: "projects/test-project/topics/test-topic",
}
payload, err := ptypes.MarshalAny(&auditLog)
if err != nil {
Expand Down Expand Up @@ -87,15 +87,18 @@ func TestConvertAuditLog(t *testing.T) {
if !e.Time().Equal(testTime) {
t.Errorf("Time '%v' != '%v'", e.Time(), testTime)
}
if want := v1beta1.CloudAuditLogsSourceEventSource("test-service-name", "projects/test-project"); e.Source() != want {
if want := v1beta1.CloudAuditLogsSourceEventSource("projects/test-project", "activity"); e.Source() != want {
t.Errorf("Source %q != %q", e.Source(), want)
}
if e.Type() != "com.google.cloud.auditlog.event" {
t.Errorf(`Type %q != "com.google.cloud.auditlog.event"`, e.Type())
if e.Type() != "google.cloud.audit.log.v1.written" {
t.Errorf(`Type %q != "google.cloud.audit.log.v1.written"`, e.Type())
}
if want := "test-resource-name"; e.Subject() != want {
if want := v1beta1.CloudAuditLogsSourceEventSubject("pubsub.googleapis.com", "projects/test-project/topics/test-topic"); e.Subject() != want {
t.Errorf("Subject %q != %q", e.Subject(), want)
}
if e.DataSchema() != v1beta1.CloudAuditLogsSourceEventDataSchema {
t.Errorf("DataSchema got=%s, want=%s", e.DataSchema(), v1beta1.CloudAuditLogsSourceEventDataSchema)
}

var actualLogEntry logpb.LogEntry
if err = jsonpb.Unmarshal(bytes.NewReader(e.Data()), &actualLogEntry); err != nil {
Expand All @@ -107,9 +110,9 @@ func TestConvertAuditLog(t *testing.T) {
}

wantExtensions := map[string]interface{}{
"servicename": "test-service-name",
"servicename": "pubsub.googleapis.com",
"methodname": "test-method-name",
"resourcename": "test-resource-name",
"resourcename": "projects/test-project/topics/test-topic",
}
if diff := cmp.Diff(wantExtensions, e.Extensions()); diff != "" {
t.Errorf("unexpected (-want, +got) = %v", diff)
Expand Down
3 changes: 2 additions & 1 deletion pkg/pubsub/adapter/converters/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ func convertCloudPubSub(ctx context.Context, msg *pubsub.Message) (*cev2.Event,
}

event.SetSource(v1beta1.CloudPubSubSourceEventSource(project, topic))
event.SetType(v1beta1.CloudPubSubSourcePublish)
event.SetType(v1beta1.CloudPubSubSourceMessagePublishedEventType)

subscription, err := GetSubscriptionKey(ctx)
if err != nil {
return nil, err
}

// TODO: use struct generated from proto: https://github.com/googleapis/google-cloudevents/blob/master/proto/google/events/cloud/pubsub/v1/events.proto#L33
pushMessage := &PushMessage{
Subscription: subscription,
Message: &PubSubMessage{
Expand Down
5 changes: 3 additions & 2 deletions pkg/pubsub/adapter/converters/pubsub_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ limitations under the License.
package converters

import (
"cloud.google.com/go/pubsub"
"context"
"fmt"

"cloud.google.com/go/pubsub"
cev2 "github.com/cloudevents/sdk-go/v2"
. "github.com/cloudevents/sdk-go/v2/event"
"github.com/google/knative-gcp/pkg/apis/events/v1beta1"
Expand All @@ -41,7 +42,7 @@ func convertPubSubPull(ctx context.Context, msg *pubsub.Message) (*cev2.Event, e
}

event.SetSource(v1beta1.CloudPubSubSourceEventSource(project, topic))
event.SetType(v1beta1.CloudPubSubSourcePublish)
event.SetType(v1beta1.CloudPubSubSourceMessagePublishedEventType)

// We promote attributes to extensions. If there is at least one attribute that cannot be promoted, we fail.
if msg.Attributes != nil && len(msg.Attributes) > 0 {
Expand Down
11 changes: 6 additions & 5 deletions pkg/pubsub/adapter/converters/pubsub_pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package converters

import (
"context"
"github.com/cloudevents/sdk-go"
"github.com/google/knative-gcp/pkg/apis/events/v1alpha1"
"testing"

cloudevents "github.com/cloudevents/sdk-go"

"cloud.google.com/go/pubsub"
cev2 "github.com/cloudevents/sdk-go/v2"
"github.com/google/go-cmp/cmp"
"github.com/google/knative-gcp/pkg/apis/events/v1beta1"
. "github.com/google/knative-gcp/pkg/pubsub/adapter/context"
)

Expand Down Expand Up @@ -54,7 +55,7 @@ func TestConvertPubSubPull(t *testing.T) {
}, {
name: "upper case attributes",
message: &pubsub.Message{
ID: "id",
ID: "id",
Data: []byte("test data"),
Attributes: map[string]string{
"AttriBUte1": "value1",
Expand Down Expand Up @@ -102,8 +103,8 @@ func TestConvertPubSubPull(t *testing.T) {
func pubSubPull(extensions map[string]string) *cev2.Event {
e := cev2.NewEvent(cloudevents.VersionV1)
e.SetID("id")
e.SetSource(v1alpha1.CloudPubSubSourceEventSource("testproject", "testtopic"))
e.SetType(v1alpha1.CloudPubSubSourcePublish)
e.SetSource(v1beta1.CloudPubSubSourceEventSource("testproject", "testtopic"))
e.SetType(v1beta1.CloudPubSubSourceMessagePublishedEventType)
e.SetData("application/octet-stream", []byte("test data"))
for k, v := range extensions {
e.SetExtension(k, v)
Expand Down
Loading