diff --git a/config/core/resources/cloudauditlogssource.yaml b/config/core/resources/cloudauditlogssource.yaml index a31a5447fa..a1c3c0b27f 100644 --- a/config/core/resources/cloudauditlogssource.yaml +++ b/config/core/resources/cloudauditlogssource.yaml @@ -22,7 +22,7 @@ metadata: annotations: registry.knative.dev/eventTypes: | [ - {"type": "com.google.cloud.auditlog.event", "schema": "type.googleapis.com/google.logging.v2.LogEntry", "description": "Common audit log event type for all Google Cloud Platform API operations." } + {"type": "google.cloud.audit.log.v1.written", "schema": "https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/audit/v1/data.proto", "description": "Common audit log event type for all Google Cloud Platform API operations." } ] name: cloudauditlogssources.events.cloud.google.com spec: diff --git a/config/core/resources/cloudpubsubsource.yaml b/config/core/resources/cloudpubsubsource.yaml index 94d22ec8fb..ecee6ab2b4 100644 --- a/config/core/resources/cloudpubsubsource.yaml +++ b/config/core/resources/cloudpubsubsource.yaml @@ -22,7 +22,7 @@ metadata: annotations: registry.knative.dev/eventTypes: | [ - { "type": "com.google.cloud.pubsub.topic.publish", "description": "This event is sent when a message is published to a Cloud Pub/Sub topic."} + { "type": "google.cloud.pubsub.topic.v1.messagePublished", "schema": "https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/pubsub/v1/data.proto", "description": "This event is sent when a message is published to a Cloud Pub/Sub topic."} ] name: cloudpubsubsources.events.cloud.google.com spec: diff --git a/config/core/resources/cloudschedulersource.yaml b/config/core/resources/cloudschedulersource.yaml index 994b337013..829f3b3a18 100644 --- a/config/core/resources/cloudschedulersource.yaml +++ b/config/core/resources/cloudschedulersource.yaml @@ -22,7 +22,7 @@ metadata: annotations: registry.knative.dev/eventTypes: | [ - { "type": "com.google.cloud.scheduler.job.execute", "description": "This event is sent when a job is executed in Cloud Scheduler."} + { "type": "google.cloud.scheduler.job.v1.executed", "schema":"https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/scheduler/v1/data.proto", "description": "This event is sent when a job is executed in Cloud Scheduler."} ] name: cloudschedulersources.events.cloud.google.com spec: diff --git a/config/core/resources/cloudstoragesource.yaml b/config/core/resources/cloudstoragesource.yaml index a9031d274e..6b40c4c9b3 100644 --- a/config/core/resources/cloudstoragesource.yaml +++ b/config/core/resources/cloudstoragesource.yaml @@ -22,10 +22,10 @@ metadata: annotations: registry.knative.dev/eventTypes: | [ - { "type": "com.google.cloud.storage.object.finalize", "schema": "https://raw.githubusercontent.com/google/knative-gcp/master/schemas/storage/schema.json", "description": "Sent when a new object (or a new generation of an existing object) is successfully created in the bucket. This includes copying or rewriting an existing object. A failed upload does not trigger this event." }, - { "type": "com.google.cloud.storage.object.delete", "schema": "https://raw.githubusercontent.com/google/knative-gcp/master/schemas/storage/schema.json", "description": "Sent when an object has been permanently deleted. This includes objects that are overwritten or are deleted as part of the bucket's lifecycle configuration. For buckets with object versioning enabled, this is not sent when an object is archived."}, - { "type": "com.google.cloud.storage.object.archive", "schema": "https://raw.githubusercontent.com/google/knative-gcp/master/schemas/storage/schema.json", "description": "Only sent when a bucket has enabled object versioning. This event indicates that the live version of an object has become an archived version, either because it was archived or because it was overwritten by the upload of an object of the same name."}, - { "type": "com.google.cloud.storage.object.metadataUpdate", "schema": "https://raw.githubusercontent.com/google/knative-gcp/master/schemas/storage/schema.json", "description": "Sent when the metadata of an existing object changes." } + { "type": "google.cloud.storage.object.v1.finalized", "schema": "https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/storage/v1/data.proto", "description": "Sent when a new object (or a new generation of an existing object) is successfully created in the bucket. This includes copying or rewriting an existing object. A failed upload does not trigger this event." }, + { "type": "google.cloud.storage.object.v1.deleted", "schema": "https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/storage/v1/data.proto", "description": "Sent when an object has been permanently deleted. This includes objects that are overwritten or are deleted as part of the bucket's lifecycle configuration. For buckets with object versioning enabled, this is not sent when an object is archived."}, + { "type": "google.cloud.storage.object.v1.archived", "schema": "https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/storage/v1/data.proto", "description": "Only sent when a bucket has enabled object versioning. This event indicates that the live version of an object has become an archived version, either because it was archived or because it was overwritten by the upload of an object of the same name."}, + { "type": "google.cloud.storage.object.v1.metadataUpdated", "schema": "https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/storage/v1/data.proto", "description": "Sent when the metadata of an existing object changes." } ] name: cloudstoragesources.events.cloud.google.com spec: @@ -154,10 +154,10 @@ spec: items: type: string enum: - - com.google.cloud.storage.object.finalize - - com.google.cloud.storage.object.delete - - com.google.cloud.storage.object.archive - - com.google.cloud.storage.object.metadataUpdate + - google.cloud.storage.object.v1.finalized + - google.cloud.storage.object.v1.deleted + - google.cloud.storage.object.v1.archived + - google.cloud.storage.object.v1.metadataUpdated status: type: object properties: diff --git a/pkg/apis/events/v1alpha1/cloudauditlogssource_types.go b/pkg/apis/events/v1alpha1/cloudauditlogssource_types.go index 97e3f5c2df..1a0fa9276e 100644 --- a/pkg/apis/events/v1alpha1/cloudauditlogssource_types.go +++ b/pkg/apis/events/v1alpha1/cloudauditlogssource_types.go @@ -17,9 +17,6 @@ limitations under the License. package v1alpha1 import ( - "crypto/md5" - "fmt" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -65,21 +62,6 @@ var auditLogsSourceCondSet = apis.NewLivingConditionSet( SinkReady, ) -const ( - CloudAuditLogsSourceEvent = "com.google.cloud.auditlog.event" -) - -// CloudAuditLogsSourceEventSource returns the Cloud Audit Logs CloudEvent source value. -func CloudAuditLogsSourceEventSource(serviceName, parentResource string) string { - return fmt.Sprintf("//%s/%s", serviceName, parentResource) -} - -// CloudAuditLogsSourceEventID returns the Cloud Audit Logs CloudEvent id value. -func CloudAuditLogsSourceEventID(id, logName, timestamp string) string { - // Hash the concatenation of the three fields. - return fmt.Sprintf("%x", md5.Sum([]byte(id+logName+timestamp))) -} - type CloudAuditLogsSourceSpec struct { // This brings in the PubSub based Source Specs. Includes: duckv1alpha1.PubSubSpec `json:",inline"` diff --git a/pkg/apis/events/v1alpha1/cloudauditlogssource_types_test.go b/pkg/apis/events/v1alpha1/cloudauditlogssource_types_test.go index 4994e8cbc4..cb77ac4083 100644 --- a/pkg/apis/events/v1alpha1/cloudauditlogssource_types_test.go +++ b/pkg/apis/events/v1alpha1/cloudauditlogssource_types_test.go @@ -68,26 +68,6 @@ func TestAuditLogsConditionSet(t *testing.T) { } } -func TestCloudAuditLogsSourceEventSource(t *testing.T) { - want := "//pubsub.googleapis.com/projects/PROJECT" - - got := CloudAuditLogsSourceEventSource("pubsub.googleapis.com", "projects/PROJECT") - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("failed to get expected (-want, +got) = %v", diff) - } -} - -func TestCloudAuditLogsSourceEventID(t *testing.T) { - want := "efdb9bf7d6fdfc922352530c1ba51242" - - got := CloudAuditLogsSourceEventID("pt9y76cxw5", "projects/knative-project-228222/logs/cloudaudit.googleapis.com%2Factivity", "2020-01-19T22:45:03.439395442Z") - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("failed to get expected (-want, +got) = %v", diff) - } -} - func TestCloudAuditLogsSourceIdentitySpec(t *testing.T) { s := &CloudAuditLogsSource{ Spec: CloudAuditLogsSourceSpec{ diff --git a/pkg/apis/events/v1alpha1/cloudpubsubsource_types.go b/pkg/apis/events/v1alpha1/cloudpubsubsource_types.go index 3b50e20a81..16ca0b6f5c 100644 --- a/pkg/apis/events/v1alpha1/cloudpubsubsource_types.go +++ b/pkg/apis/events/v1alpha1/cloudpubsubsource_types.go @@ -17,7 +17,6 @@ limitations under the License. package v1alpha1 import ( - "fmt" "time" "k8s.io/apimachinery/pkg/runtime" @@ -109,16 +108,6 @@ func (ps CloudPubSubSourceSpec) GetRetentionDuration() time.Duration { return defaultRetentionDuration } -// CloudPubSubSourceEventSource returns the Cloud Pub/Sub CloudEvent source value. -func CloudPubSubSourceEventSource(googleCloudProject, topic string) string { - return fmt.Sprintf("//pubsub.googleapis.com/projects/%s/topics/%s", googleCloudProject, topic) -} - -const ( - // CloudPubSubSource CloudEvent type - CloudPubSubSourcePublish = "com.google.cloud.pubsub.topic.publish" -) - const ( // CloudPubSubSourceConditionReady has status True when the CloudPubSubSource is // ready to send events. diff --git a/pkg/apis/events/v1alpha1/cloudpubsubsource_types_test.go b/pkg/apis/events/v1alpha1/cloudpubsubsource_types_test.go index 94113e9825..63fe7d278a 100644 --- a/pkg/apis/events/v1alpha1/cloudpubsubsource_types_test.go +++ b/pkg/apis/events/v1alpha1/cloudpubsubsource_types_test.go @@ -30,16 +30,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" ) -func TestCloudPubSubSourceEventSource(t *testing.T) { - want := "//pubsub.googleapis.com/projects/PROJECT/topics/TOPIC" - - got := CloudPubSubSourceEventSource("PROJECT", "TOPIC") - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("failed to get expected (-want, +got) = %v", diff) - } -} - func TestCloudPubSubSourceGetGroupVersionKind(t *testing.T) { want := schema.GroupVersionKind{ Group: "events.cloud.google.com", diff --git a/pkg/apis/events/v1alpha1/cloudschedulersource_types.go b/pkg/apis/events/v1alpha1/cloudschedulersource_types.go index c1d4d57177..f7624de3ac 100644 --- a/pkg/apis/events/v1alpha1/cloudschedulersource_types.go +++ b/pkg/apis/events/v1alpha1/cloudschedulersource_types.go @@ -17,8 +17,6 @@ limitations under the License. package v1alpha1 import ( - "fmt" - "k8s.io/apimachinery/pkg/runtime" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -55,19 +53,6 @@ var ( _ kngcpduck.PubSubable = (*CloudSchedulerSource)(nil) ) -const ( - // CloudEvent types used by CloudSchedulerSource. - CloudSchedulerSourceExecute = "com.google.cloud.scheduler.job.execute" - // CloudSchedulerSourceJobName is the Pub/Sub message attribute key with the CloudSchedulerSource's job name. - CloudSchedulerSourceJobName = "jobName" - // CloudSchedulerSourceName is the Pub/Sub message attribute key with the CloudSchedulerSource's name. - CloudSchedulerSourceName = "schedulerName" -) - -func CloudSchedulerSourceEventSource(parent, scheduler string) string { - return fmt.Sprintf("//cloudscheduler.googleapis.com/%s/schedulers/%s", parent, scheduler) -} - // CloudSchedulerSourceSpec is the spec for a CloudSchedulerSource resource type CloudSchedulerSourceSpec struct { // This brings in the PubSub based Source Specs. Includes: diff --git a/pkg/apis/events/v1alpha1/cloudschedulersource_types_test.go b/pkg/apis/events/v1alpha1/cloudschedulersource_types_test.go index d3358fbdca..c5f57d5de5 100644 --- a/pkg/apis/events/v1alpha1/cloudschedulersource_types_test.go +++ b/pkg/apis/events/v1alpha1/cloudschedulersource_types_test.go @@ -42,16 +42,6 @@ func TestCloudSchedulerSourceGetGroupVersionKind(t *testing.T) { } } -func TestCloudSchedulerSourceEventSource(t *testing.T) { - want := "//cloudscheduler.googleapis.com/PARENT/schedulers/SCHEDULER" - - got := CloudSchedulerSourceEventSource("PARENT", "SCHEDULER") - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("failed to get expected (-want, +got) = %v", diff) - } -} - func TestCloudSchedulerSourceConditionSet(t *testing.T) { want := []apis.Condition{{ Type: JobReady, diff --git a/pkg/apis/events/v1alpha1/cloudstoragesource_defaults.go b/pkg/apis/events/v1alpha1/cloudstoragesource_defaults.go index d24432bde2..6e851eb2f3 100644 --- a/pkg/apis/events/v1alpha1/cloudstoragesource_defaults.go +++ b/pkg/apis/events/v1alpha1/cloudstoragesource_defaults.go @@ -23,9 +23,15 @@ import ( duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" metadataClient "github.com/google/knative-gcp/pkg/gclient/metadata" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" ) -var allEventTypes = []string{CloudStorageSourceFinalize, CloudStorageSourceDelete, CloudStorageSourceArchive, CloudStorageSourceMetadataUpdate} +var allEventTypes = []string{ + schemasv1.CloudStorageObjectFinalizedEventType, + schemasv1.CloudStorageObjectDeletedEventType, + schemasv1.CloudStorageObjectArchivedEventType, + schemasv1.CloudStorageObjectMetadataUpdatedEventType, +} func (s *CloudStorageSource) SetDefaults(ctx context.Context) { ctx = apis.WithinParent(ctx, s.ObjectMeta) diff --git a/pkg/apis/events/v1alpha1/cloudstoragesource_defaults_test.go b/pkg/apis/events/v1alpha1/cloudstoragesource_defaults_test.go index 75d9e92fc1..6d9f25230a 100644 --- a/pkg/apis/events/v1alpha1/cloudstoragesource_defaults_test.go +++ b/pkg/apis/events/v1alpha1/cloudstoragesource_defaults_test.go @@ -25,6 +25,7 @@ import ( duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" testingMetadataClient "github.com/google/knative-gcp/pkg/gclient/metadata/testing" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -51,7 +52,7 @@ func TestCloudStorageSourceSpec_SetDefaults(t *testing.T) { }, "defaults present": { orig: &CloudStorageSourceSpec{ - EventTypes: []string{CloudStorageSourceFinalize, CloudStorageSourceDelete}, + EventTypes: []string{schemasv1.CloudStorageObjectFinalizedEventType, schemasv1.CloudStorageObjectDeletedEventType}, PubSubSpec: duckv1alpha1.PubSubSpec{ Secret: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ @@ -62,7 +63,7 @@ func TestCloudStorageSourceSpec_SetDefaults(t *testing.T) { }, }, expected: &CloudStorageSourceSpec{ - EventTypes: []string{CloudStorageSourceFinalize, CloudStorageSourceDelete}, + EventTypes: []string{schemasv1.CloudStorageObjectFinalizedEventType, schemasv1.CloudStorageObjectDeletedEventType}, PubSubSpec: duckv1alpha1.PubSubSpec{ Secret: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ @@ -106,10 +107,10 @@ func TestCloudStorageSource_SetDefaults(t *testing.T) { }, Spec: CloudStorageSourceSpec{ EventTypes: []string{ - "com.google.cloud.storage.object.finalize", - "com.google.cloud.storage.object.delete", - "com.google.cloud.storage.object.archive", - "com.google.cloud.storage.object.metadataUpdate", + schemasv1.CloudStorageObjectFinalizedEventType, + schemasv1.CloudStorageObjectDeletedEventType, + schemasv1.CloudStorageObjectArchivedEventType, + schemasv1.CloudStorageObjectMetadataUpdatedEventType, }, PubSubSpec: duckv1alpha1.PubSubSpec{ Secret: &corev1.SecretKeySelector{ @@ -148,10 +149,10 @@ func TestCloudStorageSource_SetDefaults(t *testing.T) { }, Spec: CloudStorageSourceSpec{ EventTypes: []string{ - "com.google.cloud.storage.object.finalize", - "com.google.cloud.storage.object.delete", - "com.google.cloud.storage.object.archive", - "com.google.cloud.storage.object.metadataUpdate", + schemasv1.CloudStorageObjectFinalizedEventType, + schemasv1.CloudStorageObjectDeletedEventType, + schemasv1.CloudStorageObjectArchivedEventType, + schemasv1.CloudStorageObjectMetadataUpdatedEventType, }, PubSubSpec: duckv1alpha1.PubSubSpec{ Secret: &corev1.SecretKeySelector{ diff --git a/pkg/apis/events/v1alpha1/cloudstoragesource_types.go b/pkg/apis/events/v1alpha1/cloudstoragesource_types.go index 979f89c5d2..565cf15582 100644 --- a/pkg/apis/events/v1alpha1/cloudstoragesource_types.go +++ b/pkg/apis/events/v1alpha1/cloudstoragesource_types.go @@ -17,8 +17,6 @@ limitations under the License. package v1alpha1 import ( - "fmt" - "k8s.io/apimachinery/pkg/runtime" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -78,17 +76,6 @@ type CloudStorageSourceSpec struct { PayloadFormat string `json:"payloadFormat,omitempty"` } -const ( - // CloudEvent types used by CloudStorageSource. - CloudStorageSourceFinalize = "com.google.cloud.storage.object.finalize" - CloudStorageSourceArchive = "com.google.cloud.storage.object.archive" - CloudStorageSourceDelete = "com.google.cloud.storage.object.delete" - CloudStorageSourceMetadataUpdate = "com.google.cloud.storage.object.metadataUpdate" - - // CloudEvent source prefix. - storageSourcePrefix = "//storage.googleapis.com/buckets" -) - const ( // CloudStorageSourceConditionReady has status True when the CloudStorageSource is ready to send events. CloudStorageSourceConditionReady = apis.ConditionReady @@ -98,10 +85,6 @@ const ( NotificationReady apis.ConditionType = "NotificationReady" ) -func CloudStorageSourceEventSource(bucket string) string { - return fmt.Sprintf("%s/%s", storageSourcePrefix, bucket) -} - var storageCondSet = apis.NewLivingConditionSet( duckv1alpha1.PullSubscriptionReady, duckv1alpha1.TopicReady, diff --git a/pkg/apis/events/v1alpha1/cloudstoragesource_types_test.go b/pkg/apis/events/v1alpha1/cloudstoragesource_types_test.go index 47aeaad60e..d92bb2584c 100644 --- a/pkg/apis/events/v1alpha1/cloudstoragesource_types_test.go +++ b/pkg/apis/events/v1alpha1/cloudstoragesource_types_test.go @@ -41,15 +41,6 @@ func TestGetGroupVersionKind(t *testing.T) { } } -func TestCloudStorageSourceEventSource(t *testing.T) { - want := "//storage.googleapis.com/buckets/bucket" - got := CloudStorageSourceEventSource("bucket") - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("failed to get expected (-want, +got) = %v", diff) - } -} - func TestCloudStorageSourceSourceConditionSet(t *testing.T) { want := []apis.Condition{{ Type: NotificationReady, diff --git a/pkg/apis/events/v1alpha1/cloudstoragesource_validation_test.go b/pkg/apis/events/v1alpha1/cloudstoragesource_validation_test.go index af129da7d6..6fd2acaa57 100644 --- a/pkg/apis/events/v1alpha1/cloudstoragesource_validation_test.go +++ b/pkg/apis/events/v1alpha1/cloudstoragesource_validation_test.go @@ -25,6 +25,7 @@ import ( duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" metadatatesting "github.com/google/knative-gcp/pkg/gclient/metadata/testing" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -77,7 +78,7 @@ var ( // Bucket, Sink, Secret, Event Type and Project, ObjectNamePrefix and PayloadFormat storageSourceSpec = CloudStorageSourceSpec{ Bucket: "my-test-bucket", - EventTypes: []string{CloudStorageSourceFinalize, CloudStorageSourceDelete}, + EventTypes: []string{schemasv1.CloudStorageObjectFinalizedEventType, schemasv1.CloudStorageObjectDeletedEventType}, ObjectNamePrefix: "test-prefix", PayloadFormat: cloudevents.ApplicationJSON, PubSubSpec: duckv1alpha1.PubSubSpec{ @@ -381,7 +382,7 @@ func TestCheckImmutableFields(t *testing.T) { orig: &storageSourceSpec, updated: CloudStorageSourceSpec{ Bucket: storageSourceSpec.Bucket, - EventTypes: []string{CloudStorageSourceMetadataUpdate}, + EventTypes: []string{schemasv1.CloudStorageObjectMetadataUpdatedEventType}, ObjectNamePrefix: storageSourceSpec.ObjectNamePrefix, PayloadFormat: storageSourceSpec.PayloadFormat, PubSubSpec: storageSourceSpec.PubSubSpec, diff --git a/pkg/apis/events/v1beta1/cloudauditlogssource_types.go b/pkg/apis/events/v1beta1/cloudauditlogssource_types.go index ff0787cb9e..42778d3046 100644 --- a/pkg/apis/events/v1beta1/cloudauditlogssource_types.go +++ b/pkg/apis/events/v1beta1/cloudauditlogssource_types.go @@ -17,9 +17,6 @@ limitations under the License. package v1beta1 import ( - "crypto/md5" - "fmt" - duckv1beta1 "github.com/google/knative-gcp/pkg/apis/duck/v1beta1" kngcpduck "github.com/google/knative-gcp/pkg/duck/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -67,21 +64,6 @@ var auditLogsSourceCondSet = apis.NewLivingConditionSet( SinkReady, ) -const ( - CloudAuditLogsSourceEvent = "com.google.cloud.auditlog.event" -) - -// CloudAuditLogsSourceEventSource returns the Cloud Audit Logs CloudEvent source value. -func CloudAuditLogsSourceEventSource(serviceName, parentResource string) string { - return fmt.Sprintf("//%s/%s", serviceName, parentResource) -} - -// CloudAuditLogsSourceEventID returns the Cloud Audit Logs CloudEvent id value. -func CloudAuditLogsSourceEventID(id, logName, timestamp string) string { - // Hash the concatenation of the three fields. - return fmt.Sprintf("%x", md5.Sum([]byte(id+logName+timestamp))) -} - type CloudAuditLogsSourceSpec struct { // This brings in the PubSub based Source Specs. Includes: duckv1beta1.PubSubSpec `json:",inline"` diff --git a/pkg/apis/events/v1beta1/cloudauditlogssource_types_test.go b/pkg/apis/events/v1beta1/cloudauditlogssource_types_test.go index 6f5f128b82..aba7f67686 100644 --- a/pkg/apis/events/v1beta1/cloudauditlogssource_types_test.go +++ b/pkg/apis/events/v1beta1/cloudauditlogssource_types_test.go @@ -68,26 +68,6 @@ func TestAuditLogsConditionSet(t *testing.T) { } } -func TestCloudAuditLogsSourceEventSource(t *testing.T) { - want := "//pubsub.googleapis.com/projects/PROJECT" - - got := CloudAuditLogsSourceEventSource("pubsub.googleapis.com", "projects/PROJECT") - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("failed to get expected (-want, +got) = %v", diff) - } -} - -func TestCloudAuditLogsSourceEventID(t *testing.T) { - want := "efdb9bf7d6fdfc922352530c1ba51242" - - got := CloudAuditLogsSourceEventID("pt9y76cxw5", "projects/knative-project-228222/logs/cloudaudit.googleapis.com%2Factivity", "2020-01-19T22:45:03.439395442Z") - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("failed to get expected (-want, +got) = %v", diff) - } -} - func TestCloudAuditLogsSourceIdentitySpec(t *testing.T) { s := &CloudAuditLogsSource{ Spec: CloudAuditLogsSourceSpec{ diff --git a/pkg/apis/events/v1beta1/cloudpubsubsource_types.go b/pkg/apis/events/v1beta1/cloudpubsubsource_types.go index 0573f720d8..aa04b931f6 100644 --- a/pkg/apis/events/v1beta1/cloudpubsubsource_types.go +++ b/pkg/apis/events/v1beta1/cloudpubsubsource_types.go @@ -17,7 +17,6 @@ limitations under the License. package v1beta1 import ( - "fmt" "time" duckv1beta1 "github.com/google/knative-gcp/pkg/apis/duck/v1beta1" @@ -110,16 +109,6 @@ func (ps CloudPubSubSourceSpec) GetRetentionDuration() time.Duration { return defaultRetentionDuration } -// CloudPubSubSourceEventSource returns the Cloud Pub/Sub CloudEvent source value. -func CloudPubSubSourceEventSource(googleCloudProject, topic string) string { - return fmt.Sprintf("//pubsub.googleapis.com/projects/%s/topics/%s", googleCloudProject, topic) -} - -const ( - // CloudPubSubSource CloudEvent type - CloudPubSubSourcePublish = "com.google.cloud.pubsub.topic.publish" -) - const ( // CloudPubSubSourceConditionReady has status True when the CloudPubSubSource is // ready to send events. diff --git a/pkg/apis/events/v1beta1/cloudpubsubsource_types_test.go b/pkg/apis/events/v1beta1/cloudpubsubsource_types_test.go index d2b87caf7d..ba876e4f8b 100644 --- a/pkg/apis/events/v1beta1/cloudpubsubsource_types_test.go +++ b/pkg/apis/events/v1beta1/cloudpubsubsource_types_test.go @@ -30,16 +30,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" ) -func TestCloudPubSubSourceEventSource(t *testing.T) { - want := "//pubsub.googleapis.com/projects/PROJECT/topics/TOPIC" - - got := CloudPubSubSourceEventSource("PROJECT", "TOPIC") - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("failed to get expected (-want, +got) = %v", diff) - } -} - func TestCloudPubSubSourceGetGroupVersionKind(t *testing.T) { want := schema.GroupVersionKind{ Group: "events.cloud.google.com", @@ -162,4 +152,4 @@ func TestCloudPubSubSource_GetStatus(t *testing.T) { if got, want := s.GetStatus(), &s.Status.Status; got != want { t.Errorf("GetStatus=%v, want=%v", got, want) } -} \ No newline at end of file +} diff --git a/pkg/apis/events/v1beta1/cloudschedulersource_types.go b/pkg/apis/events/v1beta1/cloudschedulersource_types.go index d30b449317..5260cdb447 100644 --- a/pkg/apis/events/v1beta1/cloudschedulersource_types.go +++ b/pkg/apis/events/v1beta1/cloudschedulersource_types.go @@ -17,8 +17,6 @@ limitations under the License. package v1beta1 import ( - "fmt" - duckv1beta1 "github.com/google/knative-gcp/pkg/apis/duck/v1beta1" kngcpduck "github.com/google/knative-gcp/pkg/duck/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -57,18 +55,10 @@ var ( ) const ( - // CloudEvent types used by CloudSchedulerSource. - CloudSchedulerSourceExecute = "com.google.cloud.scheduler.job.execute" // CloudSchedulerSourceJobName is the Pub/Sub message attribute key with the CloudSchedulerSource's job name. CloudSchedulerSourceJobName = "jobName" - // CloudSchedulerSourceName is the Pub/Sub message attribute key with the CloudSchedulerSource's name. - CloudSchedulerSourceName = "schedulerName" ) -func CloudSchedulerSourceEventSource(parent, scheduler string) string { - return fmt.Sprintf("//cloudscheduler.googleapis.com/%s/schedulers/%s", parent, scheduler) -} - // CloudSchedulerSourceSpec is the spec for a CloudSchedulerSource resource type CloudSchedulerSourceSpec struct { // This brings in the PubSub based Source Specs. Includes: diff --git a/pkg/apis/events/v1beta1/cloudschedulersource_types_test.go b/pkg/apis/events/v1beta1/cloudschedulersource_types_test.go index c333a2adc7..96d1863358 100644 --- a/pkg/apis/events/v1beta1/cloudschedulersource_types_test.go +++ b/pkg/apis/events/v1beta1/cloudschedulersource_types_test.go @@ -42,16 +42,6 @@ func TestCloudSchedulerSourceGetGroupVersionKind(t *testing.T) { } } -func TestCloudSchedulerSourceEventSource(t *testing.T) { - want := "//cloudscheduler.googleapis.com/PARENT/schedulers/SCHEDULER" - - got := CloudSchedulerSourceEventSource("PARENT", "SCHEDULER") - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("failed to get expected (-want, +got) = %v", diff) - } -} - func TestCloudSchedulerSourceConditionSet(t *testing.T) { want := []apis.Condition{{ Type: JobReady, diff --git a/pkg/apis/events/v1beta1/cloudstoragesource_defaults.go b/pkg/apis/events/v1beta1/cloudstoragesource_defaults.go index 69a150f0d2..3fb3bc62ab 100644 --- a/pkg/apis/events/v1beta1/cloudstoragesource_defaults.go +++ b/pkg/apis/events/v1beta1/cloudstoragesource_defaults.go @@ -22,9 +22,15 @@ import ( "knative.dev/pkg/apis" duckv1beta1 "github.com/google/knative-gcp/pkg/apis/duck/v1beta1" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" ) -var allEventTypes = []string{CloudStorageSourceFinalize, CloudStorageSourceDelete, CloudStorageSourceArchive, CloudStorageSourceMetadataUpdate} +var allEventTypes = []string{ + schemasv1.CloudStorageObjectFinalizedEventType, + schemasv1.CloudStorageObjectDeletedEventType, + schemasv1.CloudStorageObjectArchivedEventType, + schemasv1.CloudStorageObjectMetadataUpdatedEventType, +} func (s *CloudStorageSource) SetDefaults(ctx context.Context) { ctx = apis.WithinParent(ctx, s.ObjectMeta) diff --git a/pkg/apis/events/v1beta1/cloudstoragesource_defaults_test.go b/pkg/apis/events/v1beta1/cloudstoragesource_defaults_test.go index ba73575d65..5ce6f941ed 100644 --- a/pkg/apis/events/v1beta1/cloudstoragesource_defaults_test.go +++ b/pkg/apis/events/v1beta1/cloudstoragesource_defaults_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/go-cmp/cmp" duckv1beta1 "github.com/google/knative-gcp/pkg/apis/duck/v1beta1" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" corev1 "k8s.io/api/core/v1" ) @@ -47,7 +48,7 @@ func TestCloudStorageSource_SetDefaults(t *testing.T) { }, "defaults present": { orig: &CloudStorageSourceSpec{ - EventTypes: []string{CloudStorageSourceFinalize, CloudStorageSourceDelete}, + EventTypes: []string{schemasv1.CloudStorageObjectFinalizedEventType, schemasv1.CloudStorageObjectDeletedEventType}, PubSubSpec: duckv1beta1.PubSubSpec{ Secret: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ @@ -58,7 +59,7 @@ func TestCloudStorageSource_SetDefaults(t *testing.T) { }, }, expected: &CloudStorageSourceSpec{ - EventTypes: []string{CloudStorageSourceFinalize, CloudStorageSourceDelete}, + EventTypes: []string{schemasv1.CloudStorageObjectFinalizedEventType, schemasv1.CloudStorageObjectDeletedEventType}, PubSubSpec: duckv1beta1.PubSubSpec{ Secret: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ diff --git a/pkg/apis/events/v1beta1/cloudstoragesource_types.go b/pkg/apis/events/v1beta1/cloudstoragesource_types.go index 4681a46c47..0ba51a573e 100644 --- a/pkg/apis/events/v1beta1/cloudstoragesource_types.go +++ b/pkg/apis/events/v1beta1/cloudstoragesource_types.go @@ -17,8 +17,6 @@ limitations under the License. package v1beta1 import ( - "fmt" - duckv1beta1 "github.com/google/knative-gcp/pkg/apis/duck/v1beta1" kngcpduck "github.com/google/knative-gcp/pkg/duck/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -80,17 +78,6 @@ type CloudStorageSourceSpec struct { PayloadFormat string `json:"payloadFormat,omitempty"` } -const ( - // CloudEvent types used by CloudStorageSource. - CloudStorageSourceFinalize = "com.google.cloud.storage.object.finalize" - CloudStorageSourceArchive = "com.google.cloud.storage.object.archive" - CloudStorageSourceDelete = "com.google.cloud.storage.object.delete" - CloudStorageSourceMetadataUpdate = "com.google.cloud.storage.object.metadataUpdate" - - // CloudEvent source prefix. - storageSourcePrefix = "//storage.googleapis.com/buckets" -) - const ( // CloudStorageSourceConditionReady has status True when the CloudStorageSource is ready to send events. CloudStorageSourceConditionReady = apis.ConditionReady @@ -100,10 +87,6 @@ const ( NotificationReady apis.ConditionType = "NotificationReady" ) -func CloudStorageSourceEventSource(bucket string) string { - return fmt.Sprintf("%s/%s", storageSourcePrefix, bucket) -} - var storageCondSet = apis.NewLivingConditionSet( duckv1beta1.PullSubscriptionReady, duckv1beta1.TopicReady, diff --git a/pkg/apis/events/v1beta1/cloudstoragesource_types_test.go b/pkg/apis/events/v1beta1/cloudstoragesource_types_test.go index d4faef1bae..f859df1435 100644 --- a/pkg/apis/events/v1beta1/cloudstoragesource_types_test.go +++ b/pkg/apis/events/v1beta1/cloudstoragesource_types_test.go @@ -41,15 +41,6 @@ func TestGetGroupVersionKind(t *testing.T) { } } -func TestCloudStorageSourceEventSource(t *testing.T) { - want := "//storage.googleapis.com/buckets/bucket" - got := CloudStorageSourceEventSource("bucket") - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("failed to get expected (-want, +got) = %v", diff) - } -} - func TestCloudStorageSourceSourceConditionSet(t *testing.T) { want := []apis.Condition{{ Type: NotificationReady, diff --git a/pkg/apis/events/v1beta1/cloudstoragesource_validation_test.go b/pkg/apis/events/v1beta1/cloudstoragesource_validation_test.go index 18e186df80..419d694ccd 100644 --- a/pkg/apis/events/v1beta1/cloudstoragesource_validation_test.go +++ b/pkg/apis/events/v1beta1/cloudstoragesource_validation_test.go @@ -22,6 +22,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go" duckv1beta1 "github.com/google/knative-gcp/pkg/apis/duck/v1beta1" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -73,7 +74,7 @@ var ( // Bucket, Sink, Secret, Event Type and Project, ObjectNamePrefix and PayloadFormat storageSourceSpec = CloudStorageSourceSpec{ Bucket: "my-test-bucket", - EventTypes: []string{CloudStorageSourceFinalize, CloudStorageSourceDelete}, + EventTypes: []string{schemasv1.CloudStorageObjectFinalizedEventType, schemasv1.CloudStorageObjectDeletedEventType}, ObjectNamePrefix: "test-prefix", PayloadFormat: cloudevents.ApplicationJSON, PubSubSpec: duckv1beta1.PubSubSpec{ @@ -366,7 +367,7 @@ func TestCheckImmutableFields(t *testing.T) { orig: &storageSourceSpec, updated: CloudStorageSourceSpec{ Bucket: storageSourceSpec.Bucket, - EventTypes: []string{CloudStorageSourceMetadataUpdate}, + EventTypes: []string{schemasv1.CloudStorageObjectMetadataUpdatedEventType}, ObjectNamePrefix: storageSourceSpec.ObjectNamePrefix, PayloadFormat: storageSourceSpec.PayloadFormat, PubSubSpec: storageSourceSpec.PubSubSpec, diff --git a/pkg/pubsub/adapter/converters/auditlogs.go b/pkg/pubsub/adapter/converters/auditlogs.go index 0bbdfb315d..4c43b250ba 100644 --- a/pkg/pubsub/adapter/converters/auditlogs.go +++ b/pkg/pubsub/adapter/converters/auditlogs.go @@ -35,17 +35,11 @@ import ( "cloud.google.com/go/pubsub" cev2 "github.com/cloudevents/sdk-go/v2" - "github.com/google/knative-gcp/pkg/apis/events/v1beta1" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" ) const ( - logEntrySchema = "type.googleapis.com/google.logging.v2.LogEntry" - parentResourcePattern = `^(:?projects|organizations|billingAccounts|folders)/[^/]+` - - serviceNameExtension = "servicename" - methodNameExtension = "methodname" - resourceNameExtension = "resourcename" ) var ( @@ -101,6 +95,16 @@ func resolveAnyUnknowns(typeURL string) (proto.Message, error) { return reflect.New(mt.Elem()).Interface().(proto.Message), nil } +// Log name ref: https://cloud.google.com/logging/docs/audit#viewing_audit_logs +func logActivity(logName string) string { + parts := strings.Split(logName, "%2F") + if len(parts) < 2 { + return "" + } + // Could be "activity" or "data_access" + return parts[1] +} + func convertCloudAuditLogs(ctx context.Context, msg *pubsub.Message) (*cev2.Event, error) { entry := logpb.LogEntry{} if err := jsonpbUnmarshaller.Unmarshal(bytes.NewReader(msg.Data), &entry); err != nil { @@ -111,17 +115,20 @@ func convertCloudAuditLogs(ctx context.Context, msg *pubsub.Message) (*cev2.Even if parentResource == "" { return nil, fmt.Errorf("invalid LogName: %q", entry.LogName) } + logActivity := logActivity(entry.LogName) // Make a new event and convert the message payload. event := cev2.NewEvent(cev2.VersionV1) - event.SetID(v1beta1.CloudAuditLogsSourceEventID(entry.InsertId, entry.LogName, ptypes.TimestampString(entry.Timestamp))) + event.SetID(schemasv1.CloudAuditLogsEventID(entry.InsertId, entry.LogName, ptypes.TimestampString(entry.Timestamp))) if timestamp, err := ptypes.Timestamp(entry.Timestamp); err != nil { return nil, fmt.Errorf("invalid LogEntry timestamp: %w", err) } else { event.SetTime(timestamp) } + event.SetType(schemasv1.CloudAuditLogsLogWrittenEventType) + event.SetSource(schemasv1.CloudAuditLogsEventSource(parentResource, logActivity)) + event.SetDataSchema(schemasv1.CloudAuditLogsEventDataSchema) event.SetData(cev2.ApplicationJSON, msg.Data) - event.SetDataSchema(logEntrySchema) switch payload := entry.Payload.(type) { case *logpb.LogEntry_ProtoPayload: @@ -131,12 +138,10 @@ func convertCloudAuditLogs(ctx context.Context, msg *pubsub.Message) (*cev2.Even } switch proto := unpacked.Message.(type) { case *auditpb.AuditLog: - event.SetType(v1beta1.CloudAuditLogsSourceEvent) - event.SetSource(v1beta1.CloudAuditLogsSourceEventSource(proto.ServiceName, parentResource)) - event.SetSubject(proto.ResourceName) - event.SetExtension(serviceNameExtension, proto.ServiceName) - event.SetExtension(methodNameExtension, proto.MethodName) - event.SetExtension(resourceNameExtension, proto.ResourceName) + event.SetSubject(schemasv1.CloudAuditLogsEventSubject(proto.ServiceName, proto.ResourceName)) + event.SetExtension(schemasv1.ServiceNameExtension, proto.ServiceName) + event.SetExtension(schemasv1.MethodNameExtension, proto.MethodName) + event.SetExtension(schemasv1.ResourceNameExtension, proto.ResourceName) default: return nil, fmt.Errorf("unhandled proto payload type: %T", proto) } diff --git a/pkg/pubsub/adapter/converters/auditlogs_test.go b/pkg/pubsub/adapter/converters/auditlogs_test.go index 0f79659b08..ecc3a7cf73 100644 --- a/pkg/pubsub/adapter/converters/auditlogs_test.go +++ b/pkg/pubsub/adapter/converters/auditlogs_test.go @@ -27,7 +27,7 @@ import ( "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/timestamp" "github.com/google/go-cmp/cmp" - "github.com/google/knative-gcp/pkg/apis/events/v1beta1" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" auditpb "google.golang.org/genproto/googleapis/cloud/audit" logpb "google.golang.org/genproto/googleapis/logging/v2" "google.golang.org/protobuf/testing/protocmp" @@ -35,15 +35,15 @@ import ( const ( insertID = "test-insert-id" - logName = "projects/test-project/test-log-name" + logName = "projects/test-project/pubsub.googleapis.com%2Factivity" testTs = "2006-01-02T15:04:05Z" ) func TestConvertAuditLog(t *testing.T) { auditLog := auditpb.AuditLog{ - ServiceName: "test-service-name", + ServiceName: "pubsub.googleapis.com", MethodName: "test-method-name", - ResourceName: "test-resource-name", + ResourceName: "projects/test-project/topics/test-topic", } payload, err := ptypes.MarshalAny(&auditLog) if err != nil { @@ -81,21 +81,24 @@ func TestConvertAuditLog(t *testing.T) { if err != nil { t.Fatalf("conversion failed: %v", err) } - if id := v1beta1.CloudAuditLogsSourceEventID(insertID, logName, testTs); e.ID() != id { + if id := schemasv1.CloudAuditLogsEventID(insertID, logName, testTs); e.ID() != id { t.Errorf("ID '%s' != '%s'", e.ID(), id) } if !e.Time().Equal(testTime) { t.Errorf("Time '%v' != '%v'", e.Time(), testTime) } - if want := v1beta1.CloudAuditLogsSourceEventSource("test-service-name", "projects/test-project"); e.Source() != want { + if want := schemasv1.CloudAuditLogsEventSource("projects/test-project", "activity"); e.Source() != want { t.Errorf("Source %q != %q", e.Source(), want) } - if e.Type() != "com.google.cloud.auditlog.event" { - t.Errorf(`Type %q != "com.google.cloud.auditlog.event"`, e.Type()) + if e.Type() != "google.cloud.audit.log.v1.written" { + t.Errorf(`Type %q != "google.cloud.audit.log.v1.written"`, e.Type()) } - if want := "test-resource-name"; e.Subject() != want { + if want := schemasv1.CloudAuditLogsEventSubject("pubsub.googleapis.com", "projects/test-project/topics/test-topic"); e.Subject() != want { t.Errorf("Subject %q != %q", e.Subject(), want) } + if e.DataSchema() != schemasv1.CloudAuditLogsEventDataSchema { + t.Errorf("DataSchema got=%s, want=%s", e.DataSchema(), schemasv1.CloudAuditLogsEventDataSchema) + } var actualLogEntry logpb.LogEntry if err = jsonpb.Unmarshal(bytes.NewReader(e.Data()), &actualLogEntry); err != nil { @@ -107,9 +110,9 @@ func TestConvertAuditLog(t *testing.T) { } wantExtensions := map[string]interface{}{ - "servicename": "test-service-name", + "servicename": "pubsub.googleapis.com", "methodname": "test-method-name", - "resourcename": "test-resource-name", + "resourcename": "projects/test-project/topics/test-topic", } if diff := cmp.Diff(wantExtensions, e.Extensions()); diff != "" { t.Errorf("unexpected (-want, +got) = %v", diff) diff --git a/pkg/pubsub/adapter/converters/pubsub.go b/pkg/pubsub/adapter/converters/pubsub.go index 8fbf6b94f6..b697044e2c 100644 --- a/pkg/pubsub/adapter/converters/pubsub.go +++ b/pkg/pubsub/adapter/converters/pubsub.go @@ -18,12 +18,11 @@ package converters import ( "context" - "time" "cloud.google.com/go/pubsub" cev2 "github.com/cloudevents/sdk-go/v2" - "github.com/google/knative-gcp/pkg/apis/events/v1beta1" . "github.com/google/knative-gcp/pkg/pubsub/adapter/context" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" ) func convertCloudPubSub(ctx context.Context, msg *pubsub.Message) (*cev2.Event, error) { @@ -40,17 +39,17 @@ func convertCloudPubSub(ctx context.Context, msg *pubsub.Message) (*cev2.Event, return nil, err } - event.SetSource(v1beta1.CloudPubSubSourceEventSource(project, topic)) - event.SetType(v1beta1.CloudPubSubSourcePublish) + event.SetSource(schemasv1.CloudPubSubEventSource(project, topic)) + event.SetType(schemasv1.CloudPubSubMessagePublishedEventType) subscription, err := GetSubscriptionKey(ctx) if err != nil { return nil, err } - pushMessage := &PushMessage{ + pushMessage := &schemasv1.PushMessage{ Subscription: subscription, - Message: &PubSubMessage{ + Message: &schemasv1.PubSubMessage{ ID: msg.ID, Attributes: msg.Attributes, PublishTime: msg.PublishTime, @@ -63,31 +62,3 @@ func convertCloudPubSub(ctx context.Context, msg *pubsub.Message) (*cev2.Event, } return &event, nil } - -// PushMessage represents the format Pub/Sub uses to push events. -type PushMessage struct { - // Subscription is the subscription ID that received this Message. - Subscription string `json:"subscription"` - // Message holds the Pub/Sub message contents. - Message *PubSubMessage `json:"message,omitempty"` -} - -// PubSubMessage matches the inner message format used by Push Subscriptions. -type PubSubMessage struct { - // ID identifies this message. This ID is assigned by the server and is - // populated for Messages obtained from a subscription. - // This field is read-only. - ID string `json:"messageId,omitempty"` - - // Data is the actual data in the message. - Data interface{} `json:"data,omitempty"` - - // Attributes represents the key-value pairs the current message - // is labelled with. - Attributes map[string]string `json:"attributes,omitempty"` - - // The time at which the message was published. This is populated by the - // server for Messages obtained from a subscription. - // This field is read-only. - PublishTime time.Time `json:"publishTime,omitempty"` -} diff --git a/pkg/pubsub/adapter/converters/pubsub_pull.go b/pkg/pubsub/adapter/converters/pubsub_pull.go index 2bd552d265..ddf7d9d214 100644 --- a/pkg/pubsub/adapter/converters/pubsub_pull.go +++ b/pkg/pubsub/adapter/converters/pubsub_pull.go @@ -17,13 +17,14 @@ limitations under the License. package converters import ( - "cloud.google.com/go/pubsub" "context" "fmt" + + "cloud.google.com/go/pubsub" cev2 "github.com/cloudevents/sdk-go/v2" . "github.com/cloudevents/sdk-go/v2/event" - "github.com/google/knative-gcp/pkg/apis/events/v1beta1" . "github.com/google/knative-gcp/pkg/pubsub/adapter/context" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" ) func convertPubSubPull(ctx context.Context, msg *pubsub.Message) (*cev2.Event, error) { @@ -40,8 +41,8 @@ func convertPubSubPull(ctx context.Context, msg *pubsub.Message) (*cev2.Event, e return nil, err } - event.SetSource(v1beta1.CloudPubSubSourceEventSource(project, topic)) - event.SetType(v1beta1.CloudPubSubSourcePublish) + event.SetSource(schemasv1.CloudPubSubEventSource(project, topic)) + event.SetType(schemasv1.CloudPubSubMessagePublishedEventType) // We promote attributes to extensions. If there is at least one attribute that cannot be promoted, we fail. if msg.Attributes != nil && len(msg.Attributes) > 0 { diff --git a/pkg/pubsub/adapter/converters/pubsub_pull_test.go b/pkg/pubsub/adapter/converters/pubsub_pull_test.go index 0ae23656f0..7592651a0e 100644 --- a/pkg/pubsub/adapter/converters/pubsub_pull_test.go +++ b/pkg/pubsub/adapter/converters/pubsub_pull_test.go @@ -18,14 +18,15 @@ package converters import ( "context" - "github.com/cloudevents/sdk-go" - "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" "testing" + cloudevents "github.com/cloudevents/sdk-go" + "cloud.google.com/go/pubsub" cev2 "github.com/cloudevents/sdk-go/v2" "github.com/google/go-cmp/cmp" . "github.com/google/knative-gcp/pkg/pubsub/adapter/context" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" ) func TestConvertPubSubPull(t *testing.T) { @@ -54,7 +55,7 @@ func TestConvertPubSubPull(t *testing.T) { }, { name: "upper case attributes", message: &pubsub.Message{ - ID: "id", + ID: "id", Data: []byte("test data"), Attributes: map[string]string{ "AttriBUte1": "value1", @@ -102,8 +103,8 @@ func TestConvertPubSubPull(t *testing.T) { func pubSubPull(extensions map[string]string) *cev2.Event { e := cev2.NewEvent(cloudevents.VersionV1) e.SetID("id") - e.SetSource(v1alpha1.CloudPubSubSourceEventSource("testproject", "testtopic")) - e.SetType(v1alpha1.CloudPubSubSourcePublish) + e.SetSource(schemasv1.CloudPubSubEventSource("testproject", "testtopic")) + e.SetType(schemasv1.CloudPubSubMessagePublishedEventType) e.SetData("application/octet-stream", []byte("test data")) for k, v := range extensions { e.SetExtension(k, v) diff --git a/pkg/pubsub/adapter/converters/pubsub_test.go b/pkg/pubsub/adapter/converters/pubsub_test.go index 253dcb3558..732b494b48 100644 --- a/pkg/pubsub/adapter/converters/pubsub_test.go +++ b/pkg/pubsub/adapter/converters/pubsub_test.go @@ -25,22 +25,22 @@ import ( "cloud.google.com/go/pubsub" cev2 "github.com/cloudevents/sdk-go/v2" "github.com/google/go-cmp/cmp" - "github.com/google/knative-gcp/pkg/apis/events/v1beta1" . "github.com/google/knative-gcp/pkg/pubsub/adapter/context" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" ) func TestConvertCloudPubSub(t *testing.T) { tests := []struct { - name string - message *pubsub.Message - wantEventFn func() *cev2.Event - wantErr bool + name string + message *pubsub.Message + wantEventFn func() *cev2.Event + wantErr bool wantInvalidContext bool }{{ name: "non alphanumeric attribute", message: &pubsub.Message{ - ID: "id", + ID: "id", Data: []byte("\"test data\""), // Data passed in quotes for it to be marshalled properly Attributes: map[string]string{ "attribute1": "value1", @@ -56,7 +56,7 @@ func TestConvertCloudPubSub(t *testing.T) { }, { name: "no attributes", message: &pubsub.Message{ - ID: "id", + ID: "id", Data: []byte("\"test data\""), // Data passed in quotes for it to be marshalled properly Attributes: map[string]string{}, }, @@ -66,12 +66,12 @@ func TestConvertCloudPubSub(t *testing.T) { }, { name: "invalid context", message: &pubsub.Message{ - ID: "id", + ID: "id", Data: []byte("\"test data\""), // Data passed in quotes for it to be marshalled properly Attributes: map[string]string{}, }, wantInvalidContext: true, - wantErr: true, + wantErr: true, }} for _, test := range tests { @@ -101,7 +101,7 @@ func TestConvertCloudPubSub(t *testing.T) { func pubSubCloudEvent(attributes map[string]string, data string) *cev2.Event { e := cev2.NewEvent(cev2.VersionV1) e.SetID("id") - e.SetSource(v1beta1.CloudPubSubSourceEventSource("testproject", "testtopic")) + e.SetSource(schemasv1.CloudPubSubEventSource("testproject", "testtopic")) at := "" if attributes != nil { ex, _ := json.Marshal(attributes) @@ -109,7 +109,7 @@ func pubSubCloudEvent(attributes map[string]string, data string) *cev2.Event { } s := fmt.Sprintf(`{"subscription":"testsubscription","message":{"messageId":"id","data":%s,%s"publishTime":"0001-01-01T00:00:00Z"}}`, data, at) e.SetData(cev2.ApplicationJSON, []byte(s)) - e.SetType(v1beta1.CloudPubSubSourcePublish) + e.SetType(schemasv1.CloudPubSubMessagePublishedEventType) e.DataBase64 = false return &e } diff --git a/pkg/pubsub/adapter/converters/scheduler.go b/pkg/pubsub/adapter/converters/scheduler.go index eb81c6c95f..61eda29d47 100644 --- a/pkg/pubsub/adapter/converters/scheduler.go +++ b/pkg/pubsub/adapter/converters/scheduler.go @@ -21,7 +21,7 @@ import ( "errors" "github.com/google/knative-gcp/pkg/apis/events/v1beta1" - "github.com/google/knative-gcp/pkg/reconciler/events/scheduler/resources" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" "cloud.google.com/go/pubsub" cev2 "github.com/cloudevents/sdk-go/v2" @@ -31,25 +31,16 @@ func convertCloudScheduler(ctx context.Context, msg *pubsub.Message) (*cev2.Even event := cev2.NewEvent(cev2.VersionV1) event.SetID(msg.ID) event.SetTime(msg.PublishTime) - event.SetType(v1beta1.CloudSchedulerSourceExecute) + event.SetType(schemasv1.CloudSchedulerJobExecutedEventType) + event.SetDataSchema(schemasv1.CloudSchedulerEventDataSchema) - // Set the source and subject if it comes as an attribute. - // We added this attributes so that we could identify the scheduler. - // We should remove them here. jobName, ok := msg.Attributes[v1beta1.CloudSchedulerSourceJobName] if !ok { return nil, errors.New("received event did not have jobName") } - schedulerName, ok := msg.Attributes[v1beta1.CloudSchedulerSourceName] - if !ok { - return nil, errors.New("received event did not have schedulerName") - } - - parentName := resources.ExtractParentName(jobName) - event.SetSource(v1beta1.CloudSchedulerSourceEventSource(parentName, schedulerName)) - event.SetSubject(resources.ExtractJobID(jobName)) + event.SetSource(schemasv1.CloudSchedulerEventSource(jobName)) - if err := event.SetData(cev2.ApplicationJSON, msg.Data); err != nil { + if err := event.SetData(cev2.ApplicationJSON, &schemasv1.SchedulerJobData{CustomData: msg.Data}); err != nil { return nil, err } return &event, nil diff --git a/pkg/pubsub/adapter/converters/scheduler_test.go b/pkg/pubsub/adapter/converters/scheduler_test.go index 7ec5a4fa90..73a14edeaa 100644 --- a/pkg/pubsub/adapter/converters/scheduler_test.go +++ b/pkg/pubsub/adapter/converters/scheduler_test.go @@ -23,7 +23,7 @@ import ( "cloud.google.com/go/pubsub" "github.com/google/go-cmp/cmp" - "github.com/google/knative-gcp/pkg/apis/events/v1beta1" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" cev2 "github.com/cloudevents/sdk-go/v2" ) @@ -50,9 +50,7 @@ func TestConvertCloudSchedulerSource(t *testing.T) { }, }, wantEventFn: func() *cev2.Event { - return schedulerCloudEvent( - "//cloudscheduler.googleapis.com/projects/knative-gcp-test/locations/us-east4/schedulers/scheduler-test", - "jobs/cre-scheduler-test") + return schedulerCloudEvent("//cloudscheduler.googleapis.com/projects/knative-gcp-test/locations/us-east4/jobs/cre-scheduler-test") }, }, { name: "missing jobName attribute", @@ -104,12 +102,12 @@ func TestConvertCloudSchedulerSource(t *testing.T) { } } -func schedulerCloudEvent(source, subject string) *cev2.Event { +func schedulerCloudEvent(source string) *cev2.Event { e := cev2.NewEvent(cev2.VersionV1) e.SetID("id") - e.SetData(cev2.ApplicationJSON, []byte("test data")) - e.SetType(v1beta1.CloudSchedulerSourceExecute) + e.SetData(cev2.ApplicationJSON, &schemasv1.SchedulerJobData{CustomData: []byte("test data")}) + e.SetType(schemasv1.CloudSchedulerJobExecutedEventType) + e.SetDataSchema(schemasv1.CloudSchedulerEventDataSchema) e.SetSource(source) - e.SetSubject(subject) return &e } diff --git a/pkg/pubsub/adapter/converters/storage.go b/pkg/pubsub/adapter/converters/storage.go index a8ea5c8a26..dfbae9e066 100644 --- a/pkg/pubsub/adapter/converters/storage.go +++ b/pkg/pubsub/adapter/converters/storage.go @@ -17,47 +17,43 @@ limitations under the License. package converters import ( - "cloud.google.com/go/pubsub" "context" "errors" "fmt" + + "cloud.google.com/go/pubsub" cev2 "github.com/cloudevents/sdk-go/v2" - "github.com/google/knative-gcp/pkg/apis/events/v1beta1" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" ) var ( // Mapping of GCS eventTypes to CloudEvent types. storageEventTypes = map[string]string{ - "OBJECT_FINALIZE": v1beta1.CloudStorageSourceFinalize, - "OBJECT_ARCHIVE": v1beta1.CloudStorageSourceArchive, - "OBJECT_DELETE": v1beta1.CloudStorageSourceDelete, - "OBJECT_METADATA_UPDATE": v1beta1.CloudStorageSourceMetadataUpdate, + "OBJECT_FINALIZE": schemasv1.CloudStorageObjectFinalizedEventType, + "OBJECT_ARCHIVE": schemasv1.CloudStorageObjectArchivedEventType, + "OBJECT_DELETE": schemasv1.CloudStorageObjectDeletedEventType, + "OBJECT_METADATA_UPDATE": schemasv1.CloudStorageObjectMetadataUpdatedEventType, } ) -const ( - // Schema extracted from https://raw.githubusercontent.com/googleapis/google-api-go-client/master/storage/v1/storage-api.json. - // TODO find the public google endpoint we should use to point to the schema and avoid hosting it ourselves. - // The link above is tied to the go-client, and it seems not to be a valid json schema. - storageSchemaUrl = "https://raw.githubusercontent.com/google/knative-gcp/master/schemas/storage/schema.json" -) - func convertCloudStorage(ctx context.Context, msg *pubsub.Message) (*cev2.Event, error) { event := cev2.NewEvent(cev2.VersionV1) event.SetID(msg.ID) event.SetTime(msg.PublishTime) - event.SetDataSchema(storageSchemaUrl) + event.SetDataSchema(schemasv1.CloudStorageEventDataSchema) + // TODO: figure out if we want to continue to add these as extensions. if val, ok := msg.Attributes["bucketId"]; ok { - event.SetSource(v1beta1.CloudStorageSourceEventSource(val)) + event.SetSource(schemasv1.CloudStorageEventSource(val)) } else { return nil, errors.New("received event did not have bucketId") } if val, ok := msg.Attributes["objectId"]; ok { - event.SetSubject(val) + event.SetSubject(schemasv1.CloudStorageEventSubject(val)) } else { return nil, errors.New("received event did not have objectId") } + if val, ok := msg.Attributes["eventType"]; ok { if eventType, ok := storageEventTypes[val]; ok { event.SetType(eventType) diff --git a/pkg/pubsub/adapter/converters/storage_test.go b/pkg/pubsub/adapter/converters/storage_test.go index 5ead70ac0e..ff3ba4320a 100644 --- a/pkg/pubsub/adapter/converters/storage_test.go +++ b/pkg/pubsub/adapter/converters/storage_test.go @@ -22,7 +22,7 @@ import ( "time" "cloud.google.com/go/pubsub" - "github.com/google/knative-gcp/pkg/apis/events/v1beta1" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" ) const ( @@ -118,17 +118,17 @@ func TestConvertCloudStorageSource(t *testing.T) { if !gotEvent.Time().Equal(storagePublishTime) { t.Errorf("Time '%v' != '%v'", gotEvent.Time(), storagePublishTime) } - if want := v1beta1.CloudStorageSourceEventSource("my-bucket"); gotEvent.Source() != want { + if want := schemasv1.CloudStorageEventSource("my-bucket"); gotEvent.Source() != want { t.Errorf("Source %q != %q", gotEvent.Source(), want) } - if gotEvent.Type() != v1beta1.CloudStorageSourceFinalize { - t.Errorf(`Type %q != %q`, gotEvent.Type(), v1beta1.CloudStorageSourceFinalize) + if gotEvent.Type() != schemasv1.CloudStorageObjectFinalizedEventType { + t.Errorf(`Type %q != %q`, gotEvent.Type(), schemasv1.CloudStorageObjectFinalizedEventType) } - if gotEvent.Subject() != objectId { + if want := schemasv1.CloudStorageEventSubject(objectId); gotEvent.Subject() != want { t.Errorf("Subject %q != %q", gotEvent.Subject(), objectId) } - if gotEvent.DataSchema() != storageSchemaUrl { - t.Errorf("DataSchema %q != %q", gotEvent.DataSchema(), storageSchemaUrl) + if gotEvent.DataSchema() != schemasv1.CloudStorageEventDataSchema { + t.Errorf("DataSchema %q != %q", gotEvent.DataSchema(), schemasv1.CloudStorageEventDataSchema) } } }) diff --git a/pkg/reconciler/events/scheduler/scheduler.go b/pkg/reconciler/events/scheduler/scheduler.go index 0193b46c46..858929e901 100644 --- a/pkg/reconciler/events/scheduler/scheduler.go +++ b/pkg/reconciler/events/scheduler/scheduler.go @@ -124,7 +124,6 @@ func (r *Reconciler) reconcileJob(ctx context.Context, scheduler *v1beta1.CloudS // Add our jobName, and schedulerName as customAttributes. customAttributes := map[string]string{ v1beta1.CloudSchedulerSourceJobName: jobName, - v1beta1.CloudSchedulerSourceName: scheduler.GetName(), } _, err = client.CreateJob(ctx, &schedulerpb.CreateJobRequest{ Parent: parent, diff --git a/pkg/reconciler/events/storage/storage.go b/pkg/reconciler/events/storage/storage.go index 33e07cca8c..12c9899e68 100644 --- a/pkg/reconciler/events/storage/storage.go +++ b/pkg/reconciler/events/storage/storage.go @@ -37,6 +37,7 @@ import ( "github.com/google/knative-gcp/pkg/reconciler/events/storage/resources" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/intevents" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" "github.com/google/knative-gcp/pkg/utils" ) @@ -55,10 +56,10 @@ const ( var ( // Mapping of the storage source CloudEvent types to google storage types. storageEventTypes = map[string]string{ - v1beta1.CloudStorageSourceFinalize: "OBJECT_FINALIZE", - v1beta1.CloudStorageSourceArchive: "OBJECT_ARCHIVE", - v1beta1.CloudStorageSourceDelete: "OBJECT_DELETE", - v1beta1.CloudStorageSourceMetadataUpdate: "OBJECT_METADATA_UPDATE", + schemasv1.CloudStorageObjectFinalizedEventType: "OBJECT_FINALIZE", + schemasv1.CloudStorageObjectArchivedEventType: "OBJECT_ARCHIVE", + schemasv1.CloudStorageObjectDeletedEventType: "OBJECT_DELETE", + schemasv1.CloudStorageObjectMetadataUpdatedEventType: "OBJECT_METADATA_UPDATE", } ) diff --git a/pkg/reconciler/events/storage/storage_test.go b/pkg/reconciler/events/storage/storage_test.go index fb168daa75..67feb5362a 100644 --- a/pkg/reconciler/events/storage/storage_test.go +++ b/pkg/reconciler/events/storage/storage_test.go @@ -52,6 +52,7 @@ import ( "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/intevents" . "github.com/google/knative-gcp/pkg/reconciler/testing" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" ) const ( @@ -703,7 +704,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), - WithCloudStorageSourceEventTypes([]string{storagev1beta1.CloudStorageSourceFinalize}), + WithCloudStorageSourceEventTypes([]string{schemasv1.CloudStorageObjectFinalizedEventType}), WithCloudStorageSourceSetDefaults, ), NewTopic(storageName, testNS, @@ -754,7 +755,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceStatusObservedGeneration(generation), WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), - WithCloudStorageSourceEventTypes([]string{storagev1beta1.CloudStorageSourceFinalize}), + WithCloudStorageSourceEventTypes([]string{schemasv1.CloudStorageObjectFinalizedEventType}), WithInitCloudStorageSourceConditions, WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceTopicReady(testTopicID), @@ -775,7 +776,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), - WithCloudStorageSourceEventTypes([]string{storagev1beta1.CloudStorageSourceFinalize}), + WithCloudStorageSourceEventTypes([]string{schemasv1.CloudStorageObjectFinalizedEventType}), WithCloudStorageSourceSetDefaults, ), NewTopic(storageName, testNS, @@ -828,7 +829,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceStatusObservedGeneration(generation), WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), - WithCloudStorageSourceEventTypes([]string{storagev1beta1.CloudStorageSourceFinalize}), + WithCloudStorageSourceEventTypes([]string{schemasv1.CloudStorageObjectFinalizedEventType}), WithInitCloudStorageSourceConditions, WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceTopicReady(testTopicID), @@ -848,7 +849,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), - WithCloudStorageSourceEventTypes([]string{storagev1beta1.CloudStorageSourceFinalize}), + WithCloudStorageSourceEventTypes([]string{schemasv1.CloudStorageObjectFinalizedEventType}), WithCloudStorageSourceSetDefaults, ), NewTopic(storageName, testNS, @@ -901,7 +902,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceStatusObservedGeneration(generation), WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), - WithCloudStorageSourceEventTypes([]string{storagev1beta1.CloudStorageSourceFinalize}), + WithCloudStorageSourceEventTypes([]string{schemasv1.CloudStorageObjectFinalizedEventType}), WithInitCloudStorageSourceConditions, WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceTopicReady(testTopicID), @@ -921,7 +922,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), - WithCloudStorageSourceEventTypes([]string{storagev1beta1.CloudStorageSourceFinalize}), + WithCloudStorageSourceEventTypes([]string{schemasv1.CloudStorageObjectFinalizedEventType}), WithCloudStorageSourceSetDefaults, ), NewTopic(storageName, testNS, @@ -974,7 +975,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceStatusObservedGeneration(generation), WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), - WithCloudStorageSourceEventTypes([]string{storagev1beta1.CloudStorageSourceFinalize}), + WithCloudStorageSourceEventTypes([]string{schemasv1.CloudStorageObjectFinalizedEventType}), WithInitCloudStorageSourceConditions, WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceTopicReady(testTopicID), @@ -994,7 +995,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), - WithCloudStorageSourceEventTypes([]string{storagev1beta1.CloudStorageSourceFinalize}), + WithCloudStorageSourceEventTypes([]string{schemasv1.CloudStorageObjectFinalizedEventType}), WithCloudStorageSourceSetDefaults, ), NewTopic(storageName, testNS, @@ -1047,7 +1048,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceStatusObservedGeneration(generation), WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), - WithCloudStorageSourceEventTypes([]string{storagev1beta1.CloudStorageSourceFinalize}), + WithCloudStorageSourceEventTypes([]string{schemasv1.CloudStorageObjectFinalizedEventType}), WithInitCloudStorageSourceConditions, WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceTopicReady(testTopicID), @@ -1068,7 +1069,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), - WithCloudStorageSourceEventTypes([]string{storagev1beta1.CloudStorageSourceFinalize}), + WithCloudStorageSourceEventTypes([]string{schemasv1.CloudStorageObjectFinalizedEventType}), WithCloudStorageSourceSinkURI(storageSinkURL), WithCloudStorageSourceNotificationReady(notificationId), WithCloudStorageSourceTopicReady(testTopicID), @@ -1111,7 +1112,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), - WithCloudStorageSourceEventTypes([]string{storagev1beta1.CloudStorageSourceFinalize}), + WithCloudStorageSourceEventTypes([]string{schemasv1.CloudStorageObjectFinalizedEventType}), WithCloudStorageSourceSinkURI(storageSinkURL), WithCloudStorageSourceNotificationReady(notificationId), WithCloudStorageSourceTopicReady(testTopicID), @@ -1154,7 +1155,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), - WithCloudStorageSourceEventTypes([]string{storagev1beta1.CloudStorageSourceFinalize}), + WithCloudStorageSourceEventTypes([]string{schemasv1.CloudStorageObjectFinalizedEventType}), WithCloudStorageSourceSinkURI(storageSinkURL), WithCloudStorageSourceTopicReady(testTopicID), WithDeletionTimestamp, @@ -1195,7 +1196,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), - WithCloudStorageSourceEventTypes([]string{storagev1beta1.CloudStorageSourceFinalize}), + WithCloudStorageSourceEventTypes([]string{schemasv1.CloudStorageObjectFinalizedEventType}), WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceTopicFailed("TopicDeleted", fmt.Sprintf("Successfully deleted Topic: %s", storageName)), WithCloudStorageSourcePullSubscriptionFailed("PullSubscriptionDeleted", fmt.Sprintf("Successfully deleted PullSubscription: %s", storageName)), @@ -1212,7 +1213,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), - WithCloudStorageSourceEventTypes([]string{storagev1beta1.CloudStorageSourceFinalize}), + WithCloudStorageSourceEventTypes([]string{schemasv1.CloudStorageObjectFinalizedEventType}), WithCloudStorageSourceSinkURI(storageSinkURL), WithCloudStorageSourceTopicReady(testTopicID), WithDeletionTimestamp, @@ -1257,7 +1258,7 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), - WithCloudStorageSourceEventTypes([]string{storagev1beta1.CloudStorageSourceFinalize}), + WithCloudStorageSourceEventTypes([]string{schemasv1.CloudStorageObjectFinalizedEventType}), WithCloudStorageSourceObjectMetaGeneration(generation), WithCloudStorageSourceTopicFailed("TopicDeleted", fmt.Sprintf("Successfully deleted Topic: %s", storageName)), WithCloudStorageSourcePullSubscriptionFailed("PullSubscriptionDeleted", fmt.Sprintf("Successfully deleted PullSubscription: %s", storageName)), diff --git a/pkg/schemas/v1/auditlogs.go b/pkg/schemas/v1/auditlogs.go new file mode 100644 index 0000000000..669df90d97 --- /dev/null +++ b/pkg/schemas/v1/auditlogs.go @@ -0,0 +1,52 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + "crypto/md5" + "fmt" +) + +const ( + CloudAuditLogsLogWrittenEventType = "google.cloud.audit.log.v1.written" + CloudAuditLogsEventDataSchema = "https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/audit/v1/data.proto" + + ServiceNameExtension = "servicename" + MethodNameExtension = "methodname" + ResourceNameExtension = "resourcename" +) + +// CloudAuditLogsEventSource returns the Cloud Audit Logs CloudEvent source value. +// Format e.g. //cloudaudit.googleapis.com/projects/project-id/logs/[activity|data_access] +func CloudAuditLogsEventSource(parentResource, activity string) string { + src := fmt.Sprintf("//cloudaudit.googleapis.com/%s", parentResource) + if activity != "" { + src = src + "/logs/" + activity + } + return src +} + +// CloudAuditLogsEventID returns the Cloud Audit Logs CloudEvent id value. +func CloudAuditLogsEventID(id, logName, timestamp string) string { + // Hash the concatenation of the three fields. + return fmt.Sprintf("%x", md5.Sum([]byte(id+logName+timestamp))) +} + +// CloudAuditLogsEventSubject returns the Cloud Audit Logs CloudEvent subject value. +func CloudAuditLogsEventSubject(serviceName, resourceName string) string { + return fmt.Sprintf("%s/%s", serviceName, resourceName) +} diff --git a/pkg/schemas/v1/auditlogs_test.go b/pkg/schemas/v1/auditlogs_test.go new file mode 100644 index 0000000000..ffbdf7c9be --- /dev/null +++ b/pkg/schemas/v1/auditlogs_test.go @@ -0,0 +1,45 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + "testing" +) + +func TestCloudAuditLogsEventSource(t *testing.T) { + want := "//cloudaudit.googleapis.com/projects/PROJECT/logs/activity" + got := CloudAuditLogsEventSource("projects/PROJECT", "activity") + if got != want { + t.Errorf("CloudAuditLogsEventSource got=%s, want=%s", got, want) + } +} + +func TestCloudAuditLogsEventSubject(t *testing.T) { + want := "pubsub.googleapis.com/projects/PROJECT/topics/TOPIC" + got := CloudAuditLogsEventSubject("pubsub.googleapis.com", "projects/PROJECT/topics/TOPIC") + if got != want { + t.Errorf("CloudAuditLogsEventSubject got=%s, want=%s", got, want) + } +} + +func TestCloudAuditLogsventID(t *testing.T) { + want := "efdb9bf7d6fdfc922352530c1ba51242" + got := CloudAuditLogsEventID("pt9y76cxw5", "projects/knative-project-228222/logs/cloudaudit.googleapis.com%2Factivity", "2020-01-19T22:45:03.439395442Z") + if got != want { + t.Errorf("CloudAuditLogsEventID got=%s, want=%s", got, want) + } +} diff --git a/pkg/schemas/v1/doc.go b/pkg/schemas/v1/doc.go new file mode 100644 index 0000000000..57d683ce41 --- /dev/null +++ b/pkg/schemas/v1/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package v1 defines constants and util functions for event v1 schemas. +// TODO: Later most of the constants should be available in the Go package +// generated from https://github.com/googleapis/google-cloudevents. We should +// use that instead. +package v1 diff --git a/pkg/schemas/v1/pubsub.go b/pkg/schemas/v1/pubsub.go new file mode 100644 index 0000000000..0afcbc1eca --- /dev/null +++ b/pkg/schemas/v1/pubsub.go @@ -0,0 +1,60 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + "fmt" + "time" +) + +const ( + CloudPubSubMessagePublishedEventType = "google.cloud.pubsub.topic.v1.messagePublished" + CloudPubSubEventDataSchema = "https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/pubsub/v1/data.proto" +) + +// CloudPubSubEventSource returns the Cloud Pub/Sub CloudEvent source value. +func CloudPubSubEventSource(googleCloudProject, topic string) string { + return fmt.Sprintf("//pubsub.googleapis.com/projects/%s/topics/%s", googleCloudProject, topic) +} + +// PushMessage represents the format Pub/Sub uses to push events. +type PushMessage struct { + // Subscription is the subscription ID that received this Message. + Subscription string `json:"subscription"` + // Message holds the Pub/Sub message contents. + Message *PubSubMessage `json:"message,omitempty"` +} + +// PubSubMessage matches the inner message format used by Push Subscriptions. +type PubSubMessage struct { + // ID identifies this message. This ID is assigned by the server and is + // populated for Messages obtained from a subscription. + // This field is read-only. + ID string `json:"messageId,omitempty"` + + // Data is the actual data in the message. + Data interface{} `json:"data,omitempty"` + + // Attributes represents the key-value pairs the current message + // is labelled with. + Attributes map[string]string `json:"attributes,omitempty"` + + // The time at which the message was published. This is populated by the + // server for Messages obtained from a subscription. + // This field is read-only. + PublishTime time.Time `json:"publishTime,omitempty"` +} diff --git a/pkg/schemas/v1/pubsub_test.go b/pkg/schemas/v1/pubsub_test.go new file mode 100644 index 0000000000..952f9c59dc --- /dev/null +++ b/pkg/schemas/v1/pubsub_test.go @@ -0,0 +1,29 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + "testing" +) + +func TestCloudPubSubEventSource(t *testing.T) { + want := "//pubsub.googleapis.com/projects/PROJECT/topics/TOPIC" + got := CloudPubSubEventSource("PROJECT", "TOPIC") + if got != want { + t.Errorf("CloudPubSubEventSource got=%s, want=%s", got, want) + } +} diff --git a/pkg/schemas/v1/scheduler.go b/pkg/schemas/v1/scheduler.go new file mode 100644 index 0000000000..43be9a0759 --- /dev/null +++ b/pkg/schemas/v1/scheduler.go @@ -0,0 +1,32 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import "fmt" + +const ( + CloudSchedulerJobExecutedEventType = "google.cloud.scheduler.job.v1.executed" + CloudSchedulerEventDataSchema = "https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/scheduler/v1/data.proto" +) + +func CloudSchedulerEventSource(jobName string) string { + return fmt.Sprintf("//cloudscheduler.googleapis.com/%s", jobName) +} + +type SchedulerJobData struct { + CustomData []byte `json:"custom_data,omitempty"` +} diff --git a/pkg/schemas/v1/scheduler_test.go b/pkg/schemas/v1/scheduler_test.go new file mode 100644 index 0000000000..cf48ff2a95 --- /dev/null +++ b/pkg/schemas/v1/scheduler_test.go @@ -0,0 +1,29 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + "testing" +) + +func TestCloudSchedulerEventSource(t *testing.T) { + want := "//cloudscheduler.googleapis.com/JOB_NAME" + got := CloudSchedulerEventSource("JOB_NAME") + if got != want { + t.Errorf("CloudSchedulerEventSource got=%s, want=%s", got, want) + } +} diff --git a/pkg/schemas/v1/storage.go b/pkg/schemas/v1/storage.go new file mode 100644 index 0000000000..c556dc818e --- /dev/null +++ b/pkg/schemas/v1/storage.go @@ -0,0 +1,35 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import "fmt" + +const ( + CloudStorageObjectFinalizedEventType = "google.cloud.storage.object.v1.finalized" + CloudStorageObjectArchivedEventType = "google.cloud.storage.object.v1.archived" + CloudStorageObjectDeletedEventType = "google.cloud.storage.object.v1.deleted" + CloudStorageObjectMetadataUpdatedEventType = "google.cloud.storage.object.v1.metadataUpdated" + CloudStorageEventDataSchema = "https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/storage/v1/data.proto" +) + +func CloudStorageEventSource(bucket string) string { + return fmt.Sprintf("//storage.googleapis.com/projects/_/buckets/%s", bucket) +} + +func CloudStorageEventSubject(object string) string { + return fmt.Sprintf("objects/%s", object) +} diff --git a/pkg/schemas/v1/storage_test.go b/pkg/schemas/v1/storage_test.go new file mode 100644 index 0000000000..7bc4308fc0 --- /dev/null +++ b/pkg/schemas/v1/storage_test.go @@ -0,0 +1,37 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + "testing" +) + +func TestCloudStorageEventSource(t *testing.T) { + want := "//storage.googleapis.com/projects/_/buckets/bucket" + got := CloudStorageEventSource("bucket") + if got != want { + t.Errorf("CloudStorageEventSource got=%s, want=%s", got, want) + } +} + +func TestCloudStorageEventSubject(t *testing.T) { + want := "objects/obj" + got := CloudStorageEventSubject("obj") + if got != want { + t.Errorf("CloudStorageEventSubject got=%s, want=%s", got, want) + } +} diff --git a/schemas/storage/schema.json b/schemas/storage/schema.json deleted file mode 100644 index 4320195ea4..0000000000 --- a/schemas/storage/schema.json +++ /dev/null @@ -1,259 +0,0 @@ -{ - "definitions" : { - "ObjectAccessControl": { - "description": "An access-control entry.", - "$id": "#/definitions/ObjectAccessControl", - "properties": { - "bucket": { - "description": "The name of the bucket.", - "type": "string" - }, - "domain": { - "description": "The domain associated with the entity, if any.", - "type": "string" - }, - "email": { - "description": "The email address associated with the entity, if any.", - "type": "string" - }, - "entity": { - "annotations": { - "required": [ - "storage.defaultObjectAccessControls.insert", - "storage.objectAccessControls.insert" - ] - }, - "description": "The entity holding the permission, in one of the following forms: \n- user-userId \n- user-email \n- group-groupId \n- group-email \n- domain-domain \n- project-team-projectId \n- allUsers \n- allAuthenticatedUsers Examples: \n- The user liz@example.com would be user-liz@example.com. \n- The group example@googlegroups.com would be group-example@googlegroups.com. \n- To refer to all members of the Google Apps for Business domain example.com, the entity would be domain-example.com.", - "type": "string" - }, - "entityId": { - "description": "The ID for the entity, if any.", - "type": "string" - }, - "etag": { - "description": "HTTP 1.1 Entity tag for the access-control entry.", - "type": "string" - }, - "generation": { - "description": "The content generation of the object, if applied to an object.", - "format": "int64", - "type": "string" - }, - "id": { - "description": "The ID of the access-control entry.", - "type": "string" - }, - "kind": { - "default": "storage#objectAccessControl", - "description": "The kind of item this is. For object access control entries, this is always storage#objectAccessControl.", - "type": "string" - }, - "object": { - "description": "The name of the object, if applied to an object.", - "type": "string" - }, - "projectTeam": { - "description": "The project team associated with the entity, if any.", - "properties": { - "projectNumber": { - "description": "The project number.", - "type": "string" - }, - "team": { - "description": "The team.", - "type": "string" - } - }, - "type": "object" - }, - "role": { - "annotations": { - "required": [ - "storage.defaultObjectAccessControls.insert", - "storage.objectAccessControls.insert" - ] - }, - "description": "The access permission for the entity.", - "type": "string" - }, - "selfLink": { - "description": "The link to this access-control entry.", - "type": "string" - } - }, - "type": "object" - } - }, - "$schema": "http://json-schema.org/draft-07/schema#", - "$id": "https://raw.githubusercontent.com/google/knative-gcp/master/schemas/storage/schema.json", - "type": "object", - "title": "The Object Schema.", - "properties": { - "acl": { - "annotations": { - "required": [ - "storage.objects.update" - ] - }, - "description": "Access controls on the object.", - "items": { - "$ref": "#/definitions/ObjectAccessControl" - }, - "type": "array" - }, - "bucket": { - "description": "The name of the bucket containing this object.", - "type": "string" - }, - "cacheControl": { - "description": "Cache-Control directive for the object data. If omitted, and the object is accessible to all anonymous users, the default will be public, max-age=3600.", - "type": "string" - }, - "componentCount": { - "description": "Number of underlying components that make up this object. Components are accumulated by compose operations.", - "format": "int32", - "type": "integer" - }, - "contentDisposition": { - "description": "Content-Disposition of the object data.", - "type": "string" - }, - "contentEncoding": { - "description": "Content-Encoding of the object data.", - "type": "string" - }, - "contentLanguage": { - "description": "Content-Language of the object data.", - "type": "string" - }, - "contentType": { - "description": "Content-Type of the object data. If an object is stored without a Content-Type, it is served as application/octet-stream.", - "type": "string" - }, - "crc32c": { - "description": "CRC32c checksum, as described in RFC 4960, Appendix B; encoded using base64 in big-endian byte order. For more information about using the CRC32c checksum, see Hashes and ETags: Best Practices.", - "type": "string" - }, - "customerEncryption": { - "description": "Metadata of customer-supplied encryption key, if the object is encrypted by such a key.", - "properties": { - "encryptionAlgorithm": { - "description": "The encryption algorithm.", - "type": "string" - }, - "keySha256": { - "description": "SHA256 hash value of the encryption key.", - "type": "string" - } - }, - "type": "object" - }, - "etag": { - "description": "HTTP 1.1 Entity tag for the object.", - "type": "string" - }, - "eventBasedHold": { - "description": "Whether an object is under event-based hold. Event-based hold is a way to retain objects until an event occurs, which is signified by the hold's release (i.e. this value is set to false). After being released (set to false), such objects will be subject to bucket-level retention (if any). One sample use case of this flag is for banks to hold loan documents for at least 3 years after loan is paid in full. Here, bucket-level retention is 3 years and the event is the loan being paid in full. In this example, these objects will be held intact for any number of years until the event has occurred (event-based hold on the object is released) and then 3 more years after that. That means retention duration of the objects begins from the moment event-based hold transitioned from true to false.", - "type": "boolean" - }, - "generation": { - "description": "The content generation of this object. Used for object versioning.", - "format": "int64", - "type": "string" - }, - "id": { - "description": "The ID of the object, including the bucket name, object name, and generation number.", - "type": "string" - }, - "kind": { - "default": "storage#object", - "description": "The kind of item this is. For objects, this is always storage#object.", - "type": "string" - }, - "kmsKeyName": { - "description": "Cloud KMS Key used to encrypt this object, if the object is encrypted by such a key.", - "type": "string" - }, - "md5Hash": { - "description": "MD5 hash of the data; encoded using base64. For more information about using the MD5 hash, see Hashes and ETags: Best Practices.", - "type": "string" - }, - "mediaLink": { - "description": "Media download link.", - "type": "string" - }, - "metadata": { - "additionalProperties": { - "description": "An individual metadata entry.", - "type": "string" - }, - "description": "User-provided metadata, in key/value pairs.", - "type": "object" - }, - "metageneration": { - "description": "The version of the metadata for this object at this generation. Used for preconditions and for detecting changes in metadata. A metageneration number is only meaningful in the context of a particular generation of a particular object.", - "format": "int64", - "type": "string" - }, - "name": { - "description": "The name of the object. Required if not specified by URL parameter.", - "type": "string" - }, - "owner": { - "description": "The owner of the object. This will always be the uploader of the object.", - "properties": { - "entity": { - "description": "The entity, in the form user-userId.", - "type": "string" - }, - "entityId": { - "description": "The ID for the entity.", - "type": "string" - } - }, - "type": "object" - }, - "retentionExpirationTime": { - "description": "A server-determined value that specifies the earliest time that the object's retention period expires. This value is in RFC 3339 format. Note 1: This field is not provided for objects with an active event-based hold, since retention expiration is unknown until the hold is removed. Note 2: This value can be provided even when temporary hold is set (so that the user can reason about policy without having to first unset the temporary hold).", - "format": "date-time", - "type": "string" - }, - "selfLink": { - "description": "The link to this object.", - "type": "string" - }, - "size": { - "description": "Content-Length of the data in bytes.", - "format": "uint64", - "type": "string" - }, - "storageClass": { - "description": "Storage class of the object.", - "type": "string" - }, - "temporaryHold": { - "description": "Whether an object is under temporary hold. While this flag is set to true, the object is protected against deletion and overwrites. A common use case of this flag is regulatory investigations where objects need to be retained while the investigation is ongoing. Note that unlike event-based hold, temporary hold does not impact retention expiration time of an object.", - "type": "boolean" - }, - "timeCreated": { - "description": "The creation time of the object in RFC 3339 format.", - "format": "date-time", - "type": "string" - }, - "timeDeleted": { - "description": "The deletion time of the object in RFC 3339 format. Will be returned if and only if this version of the object has been deleted.", - "format": "date-time", - "type": "string" - }, - "timeStorageClassUpdated": { - "description": "The time at which the object's storage class was last changed. When the object is initially created, it will be set to timeCreated.", - "format": "date-time", - "type": "string" - }, - "updated": { - "description": "The modification time of the object metadata in RFC 3339 format.", - "format": "date-time", - "type": "string" - } - } -} diff --git a/test/e2e/lib/auditlogs.go b/test/e2e/lib/auditlogs.go index 95201fd5fd..6f9a0de2fd 100644 --- a/test/e2e/lib/auditlogs.go +++ b/test/e2e/lib/auditlogs.go @@ -25,8 +25,8 @@ import ( "google.golang.org/grpc/status" "cloud.google.com/go/logging/logadmin" - "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" "github.com/google/knative-gcp/test/e2e/lib/resources" "google.golang.org/grpc/codes" v1 "k8s.io/api/core/v1" @@ -81,10 +81,10 @@ func MakeAuditLogsJobOrDie(client *Client, methodName, project, resourceName, se Value: eventType, }, { Name: "SOURCE", - Value: v1alpha1.CloudAuditLogsSourceEventSource(serviceName, fmt.Sprintf("projects/%s", project)), + Value: schemasv1.CloudAuditLogsEventSource(fmt.Sprintf("projects/%s", project), "activity"), }, { Name: "SUBJECT", - Value: resourceName, + Value: schemasv1.CloudAuditLogsEventSubject(serviceName, resourceName), }, { Name: "TIME", Value: "6m", diff --git a/test/e2e/lib/helpers/broker_event_transformation_test_with_source.go b/test/e2e/lib/helpers/broker_event_transformation_test_with_source.go index 219d073999..e5f1dbdde8 100644 --- a/test/e2e/lib/helpers/broker_event_transformation_test_with_source.go +++ b/test/e2e/lib/helpers/broker_event_transformation_test_with_source.go @@ -36,7 +36,7 @@ import ( // The following line to load the gcp plugin (only required to authenticate against GKE clusters). _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - "github.com/google/knative-gcp/pkg/apis/events/v1beta1" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" "github.com/google/knative-gcp/test/e2e/lib" "github.com/google/knative-gcp/test/e2e/lib/metrics" "github.com/google/knative-gcp/test/e2e/lib/resources" @@ -245,7 +245,7 @@ func BrokerEventTransformationTestWithPubSubSourceHelper(client *lib.Client, aut psName := helpers.AppendRandomString(topicName + "-pubsub") targetName := helpers.AppendRandomString(topicName + "-target") data := fmt.Sprintf(`{"topic":%s}`, topicName) - source := v1beta1.CloudPubSubSourceEventSource(project, topicName) + source := schemasv1.CloudPubSubEventSource(project, topicName) // Create a target PubSub Job to receive the events. lib.MakePubSubTargetJobOrDie(client, source, targetName, lib.E2EPubSubRespEventType) @@ -255,7 +255,7 @@ func BrokerEventTransformationTestWithPubSubSourceHelper(client *lib.Client, aut // Create a Trigger with the Knative Service subscriber. triggerFilter := eventingtestresources.WithAttributesTriggerFilterV1Beta1( eventingv1beta1.TriggerAnyFilter, - v1beta1.CloudPubSubSourcePublish, + schemasv1.CloudPubSubMessagePublishedEventType, map[string]interface{}{}) createTriggerWithKServiceSubscriber(client, brokerName, kserviceName, triggerFilter) @@ -309,17 +309,18 @@ func BrokerEventTransformationTestWithStorageSourceHelper(client *lib.Client, au defer lib.DeleteBucket(ctx, client.T, bucketName) storageName := helpers.AppendRandomString(bucketName + "-storage") targetName := helpers.AppendRandomString(bucketName + "-target") - source := v1beta1.CloudStorageSourceEventSource(bucketName) + source := schemasv1.CloudStorageEventSource(bucketName) fileName := helpers.AppendRandomString("test-file-for-storage") + subject := schemasv1.CloudStorageEventSubject(fileName) // Create a target StorageJob to receive the events. - lib.MakeStorageJobOrDie(client, source, fileName, targetName, lib.E2EStorageRespEventType) + lib.MakeStorageJobOrDie(client, source, subject, targetName, lib.E2EStorageRespEventType) // Create the Knative Service. kserviceName := CreateKService(client, "storage_receiver") // Create a Trigger with the Knative Service subscriber. triggerFilter := eventingtestresources.WithAttributesTriggerFilterV1Beta1( eventingv1beta1.TriggerAnyFilter, - v1beta1.CloudStorageSourceFinalize, + schemasv1.CloudStorageObjectFinalizedEventType, map[string]interface{}{}) createTriggerWithKServiceSubscriber(client, brokerName, kserviceName, triggerFilter) @@ -371,7 +372,7 @@ func BrokerEventTransformationTestWithAuditLogsSourceHelper(client *lib.Client, // Create a Trigger with the Knative Service subscriber. triggerFilter := eventingtestresources.WithAttributesTriggerFilterV1Beta1( eventingv1beta1.TriggerAnyFilter, - v1beta1.CloudAuditLogsSourceEvent, + schemasv1.CloudAuditLogsLogWrittenEventType, map[string]interface{}{}) createTriggerWithKServiceSubscriber(client, brokerName, kserviceName, triggerFilter) @@ -428,7 +429,7 @@ func BrokerEventTransformationTestWithSchedulerSourceHelper(client *lib.Client, // Create a Trigger with the Knative Service subscriber. triggerFilter := eventingtestresources.WithAttributesTriggerFilterV1Beta1( eventingv1beta1.TriggerAnyFilter, - v1beta1.CloudSchedulerSourceExecute, + schemasv1.CloudSchedulerJobExecutedEventType, map[string]interface{}{}) createTriggerWithKServiceSubscriber(client, brokerName, kserviceName, triggerFilter) diff --git a/test/e2e/lib/pubsub.go b/test/e2e/lib/pubsub.go index 98ba4ef7af..65453cc5aa 100644 --- a/test/e2e/lib/pubsub.go +++ b/test/e2e/lib/pubsub.go @@ -24,8 +24,8 @@ import ( v1 "k8s.io/api/core/v1" - "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" "github.com/google/knative-gcp/test/e2e/lib/metrics" "github.com/google/knative-gcp/test/e2e/lib/resources" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -82,8 +82,8 @@ func AssertMetrics(t *testing.T, client *Client, topicName, psName string) { "metric.type": EventCountMetricType, "resource.type": GlobalMetricResourceType, "metric.label.resource_group": PubsubResourceGroup, - "metric.label.event_type": v1alpha1.CloudPubSubSourcePublish, - "metric.label.event_source": v1alpha1.CloudPubSubSourceEventSource(projectID, topicName), + "metric.label.event_type": schemasv1.CloudPubSubMessagePublishedEventType, + "metric.label.event_source": schemasv1.CloudPubSubEventSource(projectID, topicName), "metric.label.namespace_name": client.Namespace, "metric.label.name": psName, // We exit the target image before sending a response, thus check for 500. diff --git a/test/e2e/lib/scheduler.go b/test/e2e/lib/scheduler.go index 4af1964e5c..bf2606978f 100644 --- a/test/e2e/lib/scheduler.go +++ b/test/e2e/lib/scheduler.go @@ -18,11 +18,12 @@ package lib import ( "context" + "encoding/json" "testing" "github.com/google/knative-gcp/pkg/gclient/scheduler" - kngcpresources "github.com/google/knative-gcp/pkg/reconciler/events/scheduler/resources" kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" "github.com/google/knative-gcp/test/e2e/lib/resources" schedulerpb "google.golang.org/genproto/googleapis/cloud/scheduler/v1" "google.golang.org/grpc/codes" @@ -61,13 +62,9 @@ func MakeSchedulerJobOrDie(client *Client, data, targetName, eventType string) { Name: "TIME", Value: "6m", }, - { - Name: "SUBJECT_PREFIX", - Value: kngcpresources.JobPrefix, - }, { Name: "DATA", - Value: data, + Value: schedulerEventPayload(data), }, { Name: "TYPE", @@ -77,6 +74,12 @@ func MakeSchedulerJobOrDie(client *Client, data, targetName, eventType string) { client.CreateJobOrFail(job, WithServiceForJob(targetName)) } +func schedulerEventPayload(customData string) string { + jd := &schemasv1.SchedulerJobData{CustomData: []byte(customData)} + b, _ := json.Marshal(jd) + return string(b) +} + func SchedulerJobExists(t *testing.T, jobName string) bool { t.Helper() ctx := context.Background() diff --git a/test/e2e/lib/storage.go b/test/e2e/lib/storage.go index 408f03c98d..9addd58cf0 100644 --- a/test/e2e/lib/storage.go +++ b/test/e2e/lib/storage.go @@ -52,7 +52,7 @@ func MakeStorageOrDie(client *Client, config StorageConfig) { client.Core.WaitForResourceReadyOrFail(config.StorageName, CloudStorageSourceTypeMeta) } -func MakeStorageJobOrDie(client *Client, source, fileName, targetName, eventType string) { +func MakeStorageJobOrDie(client *Client, source, subject, targetName, eventType string) { client.T.Helper() job := resources.StorageTargetJob(targetName, []v1.EnvVar{ { @@ -65,7 +65,7 @@ func MakeStorageJobOrDie(client *Client, source, fileName, targetName, eventType }, { Name: "SUBJECT", - Value: fileName, + Value: subject, }, { Name: "TIME", Value: "6m", diff --git a/test/e2e/test_auditlogs.go b/test/e2e/test_auditlogs.go index a43a1efff7..a4bd145594 100644 --- a/test/e2e/test_auditlogs.go +++ b/test/e2e/test_auditlogs.go @@ -23,7 +23,7 @@ import ( "testing" "time" - "github.com/google/knative-gcp/pkg/apis/events/v1beta1" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" "github.com/google/knative-gcp/test/e2e/lib" "github.com/google/knative-gcp/test/e2e/lib/resources" @@ -109,7 +109,7 @@ func CloudAuditLogsSourceWithTargetTestImpl(t *testing.T, authConfig lib.AuthCon defer lib.TearDown(client) // Create a target Job to receive the events. - lib.MakeAuditLogsJobOrDie(client, lib.PubSubCreateTopicMethodName, project, resourceName, lib.PubSubServiceName, targetName, v1beta1.CloudAuditLogsSourceEvent) + lib.MakeAuditLogsJobOrDie(client, lib.PubSubCreateTopicMethodName, project, resourceName, lib.PubSubServiceName, targetName, schemasv1.CloudAuditLogsLogWrittenEventType) // Create the CloudAuditLogsSource. lib.MakeAuditLogsOrDie(client, lib.AuditLogsConfig{ diff --git a/test/e2e/test_pubsub.go b/test/e2e/test_pubsub.go index bcc07a979a..b5d9de2e03 100644 --- a/test/e2e/test_pubsub.go +++ b/test/e2e/test_pubsub.go @@ -28,7 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/test/helpers" - "github.com/google/knative-gcp/pkg/apis/events/v1beta1" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" "github.com/google/knative-gcp/test/e2e/lib" "github.com/google/knative-gcp/test/e2e/lib/resources" @@ -84,7 +84,7 @@ func CloudPubSubSourceWithTargetTestImpl(t *testing.T, assertMetrics bool, authC psName := helpers.AppendRandomString(topicName + "-pubsub") targetName := helpers.AppendRandomString(topicName + "-target") - source := v1beta1.CloudPubSubSourceEventSource(project, topicName) + source := schemasv1.CloudPubSubEventSource(project, topicName) data := fmt.Sprintf(`{"topic":%s}`, topicName) client := lib.Setup(t, true, authConfig.WorkloadIdentity) @@ -94,7 +94,7 @@ func CloudPubSubSourceWithTargetTestImpl(t *testing.T, assertMetrics bool, authC defer lib.TearDown(client) // Create a target Job to receive the events. - lib.MakePubSubTargetJobOrDie(client, source, targetName, v1beta1.CloudPubSubSourcePublish) + lib.MakePubSubTargetJobOrDie(client, source, targetName, schemasv1.CloudPubSubMessagePublishedEventType) // Create the PubSub source. lib.MakePubSubOrDie(client, lib.PubSubConfig{ diff --git a/test/e2e/test_pullsubscription.go b/test/e2e/test_pullsubscription.go index 46b90ccab5..c468c9e905 100644 --- a/test/e2e/test_pullsubscription.go +++ b/test/e2e/test_pullsubscription.go @@ -28,9 +28,9 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" duckv1beta1 "github.com/google/knative-gcp/pkg/apis/duck/v1beta1" - eventsv1beta1 "github.com/google/knative-gcp/pkg/apis/events/v1beta1" inteventsv1beta1 "github.com/google/knative-gcp/pkg/apis/intevents/v1beta1" kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" "github.com/google/knative-gcp/test/e2e/lib" ) @@ -71,14 +71,14 @@ func PullSubscriptionWithTargetTestImpl(t *testing.T, authConfig lib.AuthConfig) psName := topicName + "-sub" targetName := topicName + "-target" - source := eventsv1beta1.CloudPubSubSourceEventSource(project, topicName) + source := schemasv1.CloudPubSubEventSource(project, topicName) data := fmt.Sprintf(`{"topic":%s}`, topicName) client := lib.Setup(t, true, authConfig.WorkloadIdentity) defer lib.TearDown(client) // Create a target Job to receive the events. - lib.MakePubSubTargetJobOrDie(client, source, targetName, eventsv1beta1.CloudPubSubSourcePublish) + lib.MakePubSubTargetJobOrDie(client, source, targetName, schemasv1.CloudPubSubMessagePublishedEventType) // Create PullSubscription. pullsubscription := kngcptesting.NewPullSubscription(psName, client.Namespace, diff --git a/test/e2e/test_scheduler.go b/test/e2e/test_scheduler.go index c57b5ca99a..3b31a13a30 100644 --- a/test/e2e/test_scheduler.go +++ b/test/e2e/test_scheduler.go @@ -23,7 +23,7 @@ import ( "knative.dev/pkg/test/helpers" - "github.com/google/knative-gcp/pkg/apis/events/v1beta1" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" "github.com/google/knative-gcp/test/e2e/lib" "github.com/google/knative-gcp/test/e2e/lib/resources" ) @@ -99,7 +99,7 @@ func CloudSchedulerSourceWithTargetTestImpl(t *testing.T, authConfig lib.AuthCon // Create the target and scheduler schedulerName := helpers.AppendRandomString("scheduler") targetName := helpers.AppendRandomString(schedulerName + "-target") - lib.MakeSchedulerJobOrDie(client, data, targetName, v1beta1.CloudSchedulerSourceExecute) + lib.MakeSchedulerJobOrDie(client, data, targetName, schemasv1.CloudSchedulerJobExecutedEventType) lib.MakeSchedulerOrDie(client, lib.SchedulerConfig{ SinkGVK: lib.ServiceGVK, SchedulerName: schedulerName, diff --git a/test/e2e/test_storage.go b/test/e2e/test_storage.go index 17a0866fae..f9233826a8 100644 --- a/test/e2e/test_storage.go +++ b/test/e2e/test_storage.go @@ -24,12 +24,10 @@ import ( "testing" "time" - "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" - pkgmetrics "knative.dev/pkg/metrics" "knative.dev/pkg/test/helpers" - "github.com/google/knative-gcp/pkg/apis/events/v1beta1" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" "github.com/google/knative-gcp/test/e2e/lib" "github.com/google/knative-gcp/test/e2e/lib/metrics" "github.com/google/knative-gcp/test/e2e/lib/resources" @@ -118,10 +116,11 @@ func CloudStorageSourceWithTargetTestImpl(t *testing.T, assertMetrics bool, auth defer lib.TearDown(client) fileName := helpers.AppendRandomString("test-file-for-storage") - source := v1beta1.CloudStorageSourceEventSource(bucketName) + source := schemasv1.CloudStorageEventSource(bucketName) + subject := schemasv1.CloudStorageEventSubject(fileName) // Create a storage_target Job to receive the events. - lib.MakeStorageJobOrDie(client, source, fileName, targetName, v1beta1.CloudStorageSourceFinalize) + lib.MakeStorageJobOrDie(client, source, subject, targetName, schemasv1.CloudStorageObjectFinalizedEventType) // Create the Storage source. lib.MakeStorageOrDie(client, lib.StorageConfig{ @@ -175,8 +174,8 @@ func CloudStorageSourceWithTargetTestImpl(t *testing.T, assertMetrics bool, auth "metric.type": lib.EventCountMetricType, "resource.type": lib.GlobalMetricResourceType, "metric.label.resource_group": lib.StorageResourceGroup, - "metric.label.event_type": v1alpha1.CloudStorageSourceFinalize, - "metric.label.event_source": v1alpha1.CloudStorageSourceEventSource(bucketName), + "metric.label.event_type": schemasv1.CloudStorageObjectFinalizedEventType, + "metric.label.event_source": schemasv1.CloudStorageEventSource(bucketName), "metric.label.namespace_name": client.Namespace, "metric.label.name": storageName, // We exit the target image before sending a response, thus check for 500. diff --git a/test/test_images/auditlogs_receiver/main.go b/test/test_images/auditlogs_receiver/main.go index cb54e4e7bf..97d50c134f 100644 --- a/test/test_images/auditlogs_receiver/main.go +++ b/test/test_images/auditlogs_receiver/main.go @@ -23,7 +23,7 @@ import ( "net/http" cloudevents "github.com/cloudevents/sdk-go" - "github.com/google/knative-gcp/pkg/apis/events/v1beta1" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" "github.com/google/knative-gcp/test/e2e/lib" ) @@ -51,7 +51,7 @@ func (r *Receiver) Receive(ctx context.Context, event cloudevents.Event, resp *c fmt.Printf("auditlogs receiver received event\n") fmt.Printf("context of event is: %v\n", event.Context.String()) - if event.Type() == v1beta1.CloudAuditLogsSourceEvent { + if event.Type() == schemasv1.CloudAuditLogsLogWrittenEventType { resp.Status = http.StatusAccepted respEvent := cloudevents.NewEvent(cloudevents.VersionV1) respEvent.SetID(lib.E2EAuditLogsRespEventID) diff --git a/test/test_images/pubsub_receiver/main.go b/test/test_images/pubsub_receiver/main.go index e9eb8a1cac..d88ba3857a 100644 --- a/test/test_images/pubsub_receiver/main.go +++ b/test/test_images/pubsub_receiver/main.go @@ -22,7 +22,7 @@ import ( "log" "net/http" - "github.com/google/knative-gcp/pkg/apis/events/v1beta1" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" cloudevents "github.com/cloudevents/sdk-go" "github.com/google/knative-gcp/test/e2e/lib" @@ -52,7 +52,7 @@ func (r *Receiver) Receive(ctx context.Context, event cloudevents.Event, resp *c fmt.Printf("pubsub receiver received event\n") fmt.Printf("context of event is: %v\n", event.Context.String()) - if event.Type() == v1beta1.CloudPubSubSourcePublish { + if event.Type() == schemasv1.CloudPubSubMessagePublishedEventType { resp.Status = http.StatusAccepted respEvent := cloudevents.NewEvent(cloudevents.VersionV1) respEvent.SetID(lib.E2EPubSubRespEventID) diff --git a/test/test_images/scheduler_receiver/main.go b/test/test_images/scheduler_receiver/main.go index bce6bcab88..4983c7cda6 100644 --- a/test/test_images/scheduler_receiver/main.go +++ b/test/test_images/scheduler_receiver/main.go @@ -22,7 +22,7 @@ import ( "log" "net/http" - "github.com/google/knative-gcp/pkg/apis/events/v1beta1" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" cloudevents "github.com/cloudevents/sdk-go" "github.com/google/knative-gcp/test/e2e/lib" @@ -52,7 +52,7 @@ func (r *Receiver) Receive(ctx context.Context, event cloudevents.Event, resp *c fmt.Printf("scheduler receiver received event\n") fmt.Printf("context of event is: %v\n", event.Context.String()) - if event.Type() == v1beta1.CloudSchedulerSourceExecute { + if event.Type() == schemasv1.CloudSchedulerJobExecutedEventType { resp.Status = http.StatusAccepted respEvent := cloudevents.NewEvent(cloudevents.VersionV1) respEvent.SetID(lib.E2ESchedulerRespEventID) diff --git a/test/test_images/scheduler_target/main.go b/test/test_images/scheduler_target/main.go index ca13956238..c6ddacf02b 100644 --- a/test/test_images/scheduler_target/main.go +++ b/test/test_images/scheduler_target/main.go @@ -3,7 +3,6 @@ package main import ( "fmt" "os" - "strings" cloudevents "github.com/cloudevents/sdk-go" "github.com/google/knative-gcp/test/e2e/lib" @@ -20,7 +19,6 @@ func mainWithExitCode() int { if err := envconfig.Process("", r); err != nil { panic(err) } - fmt.Printf("SubjectPrefix to match: %q.\n", r.SubjectPrefix) fmt.Printf("Data to match: %q.\n", r.Data) fmt.Printf("Type to match: %q.\n", r.Type) @@ -30,9 +28,8 @@ func mainWithExitCode() int { type schedulerReceiver struct { knockdown.Config - SubjectPrefix string `envconfig:"SUBJECT_PREFIX" required:"true"` - Data string `envconfig:"DATA" required:"true"` - Type string `envconfig:"TYPE" required:"true"` + Data string `envconfig:"DATA" required:"true"` + Type string `envconfig:"TYPE" required:"true"` } func (r *schedulerReceiver) Knockdown(event cloudevents.Event) bool { @@ -42,12 +39,6 @@ func (r *schedulerReceiver) Knockdown(event cloudevents.Event) bool { incorrectAttributes := make(map[string]lib.PropPair) - // Check subject prefix - subject := event.Subject() - if !strings.HasPrefix(subject, r.SubjectPrefix) { - incorrectAttributes[lib.EventSubjectPrefix] = lib.PropPair{Expected: r.SubjectPrefix, Received: subject} - } - // Check type evType := event.Type() if evType != r.Type { @@ -55,6 +46,7 @@ func (r *schedulerReceiver) Knockdown(event cloudevents.Event) bool { } // Check data + // TODO fix! data := string(event.Data.([]uint8)) if data != r.Data { incorrectAttributes[lib.EventData] = lib.PropPair{Expected: r.Data, Received: data} diff --git a/test/test_images/storage_receiver/main.go b/test/test_images/storage_receiver/main.go index 44a06bd8ca..278581065f 100644 --- a/test/test_images/storage_receiver/main.go +++ b/test/test_images/storage_receiver/main.go @@ -23,7 +23,7 @@ import ( "net/http" cloudevents "github.com/cloudevents/sdk-go" - "github.com/google/knative-gcp/pkg/apis/events/v1beta1" + schemasv1 "github.com/google/knative-gcp/pkg/schemas/v1" "github.com/google/knative-gcp/test/e2e/lib" ) @@ -51,7 +51,7 @@ func (r *Receiver) Receive(ctx context.Context, event cloudevents.Event, resp *c fmt.Printf("storage receiver received event\n") fmt.Printf("context of event is: %v\n", event.Context.String()) - if event.Type() == v1beta1.CloudStorageSourceFinalize { + if event.Type() == schemasv1.CloudStorageObjectFinalizedEventType { resp.Status = http.StatusAccepted respEvent := cloudevents.NewEvent(cloudevents.VersionV1) respEvent.SetID(lib.E2EStorageRespEventID)