diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 6399ec6993..3d9308ebdd 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -29,10 +29,13 @@ import ( "github.com/google/knative-gcp/pkg/reconciler/events/pubsub" "github.com/google/knative-gcp/pkg/reconciler/events/scheduler" "github.com/google/knative-gcp/pkg/reconciler/events/storage" + kedapullsubscription "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription/keda" + staticpullsubscription "github.com/google/knative-gcp/pkg/reconciler/intevents/pullsubscription/static" + "github.com/google/knative-gcp/pkg/reconciler/intevents/topic" "github.com/google/knative-gcp/pkg/reconciler/messaging/channel" - kedapullsubscription "github.com/google/knative-gcp/pkg/reconciler/pubsub/pullsubscription/keda" - staticpullsubscription "github.com/google/knative-gcp/pkg/reconciler/pubsub/pullsubscription/static" - "github.com/google/knative-gcp/pkg/reconciler/pubsub/topic" + pubsubkedapullsubscription "github.com/google/knative-gcp/pkg/reconciler/pubsub/pullsubscription/keda" + pubsubstaticpullsubscription "github.com/google/knative-gcp/pkg/reconciler/pubsub/pullsubscription/static" + pubsubtopic "github.com/google/knative-gcp/pkg/reconciler/pubsub/topic" "github.com/google/knative-gcp/pkg/utils/appcredentials" "knative.dev/pkg/injection/sharedmain" ) @@ -48,6 +51,9 @@ func main() { staticpullsubscription.NewController, kedapullsubscription.NewController, topic.NewController, + pubsubstaticpullsubscription.NewController, + pubsubkedapullsubscription.NewController, + pubsubtopic.NewController, channel.NewController, deployment.NewController, broker.NewController, diff --git a/config/300-old_pullsubscription.yaml b/config/300-old_pullsubscription.yaml new file mode 120000 index 0000000000..105390277f --- /dev/null +++ b/config/300-old_pullsubscription.yaml @@ -0,0 +1 @@ +core/resources/old_pullsubscription.yaml \ No newline at end of file diff --git a/config/300-old_topic.yaml b/config/300-old_topic.yaml new file mode 120000 index 0000000000..dbaec6f0a4 --- /dev/null +++ b/config/300-old_topic.yaml @@ -0,0 +1 @@ +core/resources/old_topic.yaml \ No newline at end of file diff --git a/config/core/resources/old_pullsubscription.yaml b/config/core/resources/old_pullsubscription.yaml new file mode 100644 index 0000000000..2034199c97 --- /dev/null +++ b/config/core/resources/old_pullsubscription.yaml @@ -0,0 +1,194 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + labels: + events.cloud.google.com/release: devel + events.cloud.google.com/crd-install: "true" + name: pullsubscriptions.pubsub.cloud.google.com +spec: + group: pubsub.cloud.google.com + names: + categories: + - all + - knative + - pubsub + kind: PullSubscription + plural: pullsubscriptions + scope: Namespaced + subresources: + status: {} + preserveUnknownFields: false + conversion: + strategy: Webhook + webhookClientConfig: + service: + name: webhook + namespace: cloud-run-events + additionalPrinterColumns: + - name: Ready + type: string + JSONPath: ".status.conditions[?(@.type==\"Ready\")].status" + - name: Reason + type: string + JSONPath: ".status.conditions[?(@.type==\"Ready\")].reason" + - name: Age + type: date + JSONPath: .metadata.creationTimestamp + versions: + - name: v1alpha1 + served: true + storage: true + - name: v1beta1 + served: true + storage: false + # All versions happen to have the same schema today. They will likely diverge in the future. + validation: + openAPIV3Schema: + type: object + properties: + spec: + # TODO: update the OpenAPI to be much more robust. + type: object + required: + - sink + - topic + properties: + googleServiceAccount: + type: string + description: "GCP service account used to poll the Cloud Pub/Sub Subscription. The value of the service account must be a valid Google service account (see https://cloud.google.com/iam/docs/service-accounts)." + secret: + type: object + description: "Credential used to poll the Cloud Pub/Sub Subscription. It is not used to create or delete the Subscription, only to poll it. The value of the secret entry must be a service account key in the JSON format (see https://cloud.google.com/iam/docs/creating-managing-service-account-keys). Defaults to secret.name of 'google-cloud-key' and secret.key of 'key.json'." + properties: + name: + type: string + key: + type: string + optional: + type: boolean + project: + type: string + description: "ID of the Google Cloud Project that the Pub/Sub Topic exists in. E.g. 'my-project-1234' rather than its display name, 'My Project' or its number '1234567890'. If omitted uses the Project ID from the GKE cluster metadata service." + sink: + type: object + description: "Reference to an object that will resolve to a domain name to use as the sink." + properties: + uri: + type: string + minLength: 1 + ref: + type: object + required: + - apiVersion + - kind + - name + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + name: + type: string + minLength: 1 + anyOf: + - properties: + uri: + minLength: 1 + - properties: + ref: {} + transformer: + type: object + description: "Reference to an object that will resolve to a domain name to use as the transformer." + x-kubernetes-preserve-unknown-fields: true + ceOverrides: + type: object + description: "Defines overrides to control modifications of the event sent to the sink." + properties: + extensions: + type: object + description: "Extensions specify what attribute are added or overridden on the outbound event. Each `Extensions` key-value pair are set on the event as an attribute extension independently." + x-kubernetes-preserve-unknown-fields: true + mode: + type: string + enum: [CloudEventsBinary, CloudEventsStructured, PushCompatible] + description: "Mode defines the encoding and structure of the payload of when this PullSubscription invokes the sink. Default is CloudEventsBinary." + topic: + type: string + description: "ID of the Cloud Pub/Sub Topic to Subscribe to. It must be in the form of the unique identifier within the project, not the entire name. E.g. it must be 'laconia', not 'projects/my-gcp-project/topics/laconia'." + ackDeadline: + type: string + description: "The default maximum time after a subscriber receives a message before the subscriber should acknowledge the message. Defaults to `30s`. Valid time units are `s`, `m`, `h`. The minimum deadline you can specify is 0 seconds. The maximum deadline you can specify is 600 seconds (10 minutes)." + retainAckedMessages: + type: boolean + description: "Whether to retain acknowledged messages. If true, acknowledged messages will not be expunged until they fall out of the RetentionDuration window." + retentionDuration: + type: string + description: "How long to retain messages in backlog, from the time of publish. If retainAckedMessages is true, this duration affects the retention of acknowledged messages, otherwise only unacknowledged messages are retained. Defaults to 7 days (`168h`). Cannot be longer than 7 days or shorter than 10 minutes. Valid time units are `s`, `m`, `h`." + adapterType: + type: string + description: "AdapterType determines the type of receive adapter that a PullSubscription uses." + status: + type: object + properties: + observedGeneration: + type: integer + format: int64 + conditions: + items: + properties: + lastTransitionTime: + # we use a string in the stored object but a wrapper object + # at runtime. + type: string + message: + type: string + reason: + type: string + severity: + type: string + status: + type: string + type: + type: string + required: + - type + - status + type: object + type: array + serviceAccountName: + type: string + sinkUri: + type: string + ceAttributes: + type: array + items: + type: object + properties: + type: + type: string + source: + type: string + projectId: + type: string + topicId: + type: string + subscriptionId: + type: string + transformerUri: + type: string diff --git a/config/core/resources/old_topic.yaml b/config/core/resources/old_topic.yaml new file mode 100644 index 0000000000..29371408eb --- /dev/null +++ b/config/core/resources/old_topic.yaml @@ -0,0 +1,134 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: topics.pubsub.cloud.google.com + labels: + events.cloud.google.com/release: devel + events.cloud.google.com/crd-install: "true" + duck.knative.dev/addressable: "true" +spec: + group: pubsub.cloud.google.com + names: + kind: Topic + plural: topics + singular: topic + categories: + - all + - knative + - pubsub + scope: Namespaced + subresources: + status: {} + preserveUnknownFields: false + conversion: + strategy: Webhook + webhookClientConfig: + service: + name: webhook + namespace: cloud-run-events + additionalPrinterColumns: + - name: Ready + type: string + JSONPath: ".status.conditions[?(@.type==\"Ready\")].status" + - name: Reason + type: string + JSONPath: ".status.conditions[?(@.type==\"Ready\")].reason" + - name: Address + type: string + JSONPath: .status.address.url + - name: Age + type: date + JSONPath: .metadata.creationTimestamp + versions: + - name: v1alpha1 + served: true + storage: true + - name: v1beta1 + served: true + storage: false + # All versions happen to have the same schema today. They will likely diverge in the future. + validation: + openAPIV3Schema: + type: object + properties: + spec: + type: object + required: + - topic + properties: + googleServiceAccount: + type: string + description: "GCP service account used to poll the Cloud Pub/Sub Subscription. The value of the service account must be a valid Google service account (see https://cloud.google.com/iam/docs/service-accounts)." + secret: + type: object + description: "Credential used to poll the Cloud Pub/Sub Subscription. It is not used to create or delete the Subscription, only to poll it. The value of the secret entry must be a service account key in the JSON format (see https://cloud.google.com/iam/docs/creating-managing-service-account-keys). Defaults to secret.name of 'google-cloud-key' and secret.key of 'key.json'." + properties: + name: + type: string + key: + type: string + optional: + type: boolean + project: + type: string + description: "ID of the Google Cloud Project to own the Pub/Sub credentials. E.g. 'my-project-1234' rather than its display name, 'My Project' or its number '1234567890'. If omitted uses the Project ID from the GKE cluster metadata service." + topic: + type: string + description: "ID of the Cloud Pub/Sub Topic to create. It must be in the form of the unique identifier within the project, not the entire name. E.g. it must be 'laconia', not 'projects/my-gcp-project/topics/laconia'." + propagationPolicy: + type: string + enum: [CreateDelete, CreateNoDelete, NoCreateNoDelete] + description: "Propagation policy defines how Topic controls the Cloud Pub/Sub topic for lifecycle changes. Default is CreateNoDelete." + status: + type: object + properties: + observedGeneration: + type: integer + format: int64 + conditions: + type: array + items: + type: object + properties: + lastTransitionTime: + # we use a string in the stored object but a wrapper object + # at runtime. + type: string + message: + type: string + reason: + type: string + severity: + type: string + status: + type: string + type: + type: string + required: + - type + - status + serviceAccountName: + type: string + projectId: + type: string + topicId: + type: string + address: + type: object + properties: + url: + type: string + diff --git a/config/core/resources/pullsubscription.yaml b/config/core/resources/pullsubscription.yaml index 2034199c97..fb239e26f9 100644 --- a/config/core/resources/pullsubscription.yaml +++ b/config/core/resources/pullsubscription.yaml @@ -18,9 +18,9 @@ metadata: labels: events.cloud.google.com/release: devel events.cloud.google.com/crd-install: "true" - name: pullsubscriptions.pubsub.cloud.google.com + name: pullsubscriptions.internal.events.cloud.google.com spec: - group: pubsub.cloud.google.com + group: internal.events.cloud.google.com names: categories: - all diff --git a/config/core/resources/topic.yaml b/config/core/resources/topic.yaml index 29371408eb..10bebc6342 100644 --- a/config/core/resources/topic.yaml +++ b/config/core/resources/topic.yaml @@ -14,13 +14,13 @@ apiVersion: apiextensions.k8s.io/v1beta1 kind: CustomResourceDefinition metadata: - name: topics.pubsub.cloud.google.com + name: topics.internal.events.cloud.google.com labels: events.cloud.google.com/release: devel events.cloud.google.com/crd-install: "true" duck.knative.dev/addressable: "true" spec: - group: pubsub.cloud.google.com + group: internal.events.cloud.google.com names: kind: Topic plural: topics diff --git a/config/core/roles/clusterrole.yaml b/config/core/roles/clusterrole.yaml index 612fc9ff4a..9160fc3296 100644 --- a/config/core/roles/clusterrole.yaml +++ b/config/core/roles/clusterrole.yaml @@ -21,7 +21,7 @@ metadata: rules: - apiGroups: - - pubsub.cloud.google.com + - internal.events.cloud.google.com resources: - pullsubscriptions - topics @@ -34,6 +34,23 @@ rules: - patch - delete +- apiGroups: + - internal.events.cloud.google.com + resources: + - pullsubscriptions/status + - topics/status + verbs: + - get + - update + - patch + +- apiGroups: + - pubsub.cloud.google.com + resources: + - pullsubscriptions + - topics + verbs: *everything + - apiGroups: - pubsub.cloud.google.com resources: diff --git a/pkg/apis/events/v1alpha1/resources/pullsubscription_status.go b/pkg/apis/events/v1alpha1/resources/pullsubscription_status.go deleted file mode 100644 index 8c3a43e41a..0000000000 --- a/pkg/apis/events/v1alpha1/resources/pullsubscription_status.go +++ /dev/null @@ -1,43 +0,0 @@ -/* -Copyright 2020 Google LLC. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package resources - -import ( - pubsubv1alpha1 "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1" - "knative.dev/pkg/apis" -) - -func ReadyPullSubscriptionStatus() *pubsubv1alpha1.PullSubscriptionStatus { - pss := &pubsubv1alpha1.PullSubscriptionStatus{} - pss.InitializeConditions() - pss.MarkSink(apis.HTTP("test.mynamespace.svc.cluster.local")) - pss.MarkDeployed() - pss.MarkSubscribed("subID") - return pss -} - -func FalsePullSubscriptionStatus() *pubsubv1alpha1.PullSubscriptionStatus { - pss := &pubsubv1alpha1.PullSubscriptionStatus{} - pss.InitializeConditions() - pss.MarkNotDeployed("not deployed", "not deployed") - return pss -} - -func UnknownPullSubscriptionStatus() *pubsubv1alpha1.PullSubscriptionStatus { - pss := &pubsubv1alpha1.PullSubscriptionStatus{} - pss.InitializeConditions() - return pss -} diff --git a/pkg/apis/intevents/v1alpha1/pullsubscription_conversion.go b/pkg/apis/intevents/v1alpha1/pullsubscription_conversion.go index 71c64f0b11..646a943526 100644 --- a/pkg/apis/intevents/v1alpha1/pullsubscription_conversion.go +++ b/pkg/apis/intevents/v1alpha1/pullsubscription_conversion.go @@ -21,7 +21,7 @@ import ( "fmt" convert "github.com/google/knative-gcp/pkg/apis/convert" - "github.com/google/knative-gcp/pkg/apis/pubsub/v1beta1" + "github.com/google/knative-gcp/pkg/apis/intevents/v1beta1" "knative.dev/pkg/apis" ) diff --git a/pkg/apis/intevents/v1alpha1/pullsubscription_conversion_test.go b/pkg/apis/intevents/v1alpha1/pullsubscription_conversion_test.go index d033ff7b30..3456a77da7 100644 --- a/pkg/apis/intevents/v1alpha1/pullsubscription_conversion_test.go +++ b/pkg/apis/intevents/v1alpha1/pullsubscription_conversion_test.go @@ -26,7 +26,7 @@ import ( "github.com/google/go-cmp/cmp" duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" - "github.com/google/knative-gcp/pkg/apis/pubsub/v1beta1" + "github.com/google/knative-gcp/pkg/apis/intevents/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" diff --git a/pkg/apis/intevents/v1alpha1/topic_conversion.go b/pkg/apis/intevents/v1alpha1/topic_conversion.go index 16f348f67c..ed5f727eb0 100644 --- a/pkg/apis/intevents/v1alpha1/topic_conversion.go +++ b/pkg/apis/intevents/v1alpha1/topic_conversion.go @@ -21,7 +21,7 @@ import ( "fmt" convert "github.com/google/knative-gcp/pkg/apis/convert" - "github.com/google/knative-gcp/pkg/apis/pubsub/v1beta1" + "github.com/google/knative-gcp/pkg/apis/intevents/v1beta1" "knative.dev/pkg/apis" ) diff --git a/pkg/apis/intevents/v1alpha1/topic_conversion_test.go b/pkg/apis/intevents/v1alpha1/topic_conversion_test.go index dce9a546eb..313d5a71c1 100644 --- a/pkg/apis/intevents/v1alpha1/topic_conversion_test.go +++ b/pkg/apis/intevents/v1alpha1/topic_conversion_test.go @@ -25,7 +25,7 @@ import ( duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" "github.com/google/go-cmp/cmp" - "github.com/google/knative-gcp/pkg/apis/pubsub/v1beta1" + "github.com/google/knative-gcp/pkg/apis/intevents/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/apis" ) diff --git a/pkg/apis/messaging/v1alpha1/channel_conversion_test.go b/pkg/apis/messaging/v1alpha1/channel_conversion_test.go index 9410128276..ece5f32b6c 100644 --- a/pkg/apis/messaging/v1alpha1/channel_conversion_test.go +++ b/pkg/apis/messaging/v1alpha1/channel_conversion_test.go @@ -24,8 +24,8 @@ import ( "github.com/google/go-cmp/cmp" duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" + "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1" "github.com/google/knative-gcp/pkg/apis/messaging/v1beta1" - "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" diff --git a/pkg/apis/messaging/v1alpha1/channel_lifecycle.go b/pkg/apis/messaging/v1alpha1/channel_lifecycle.go index 963c9f0527..07e4478087 100644 --- a/pkg/apis/messaging/v1alpha1/channel_lifecycle.go +++ b/pkg/apis/messaging/v1alpha1/channel_lifecycle.go @@ -17,7 +17,7 @@ package v1alpha1 import ( - "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1" + "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1" corev1 "k8s.io/api/core/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" diff --git a/pkg/apis/messaging/v1alpha1/channel_lifecycle_test.go b/pkg/apis/messaging/v1alpha1/channel_lifecycle_test.go index 418c4e2a3e..c627aff1dc 100644 --- a/pkg/apis/messaging/v1alpha1/channel_lifecycle_test.go +++ b/pkg/apis/messaging/v1alpha1/channel_lifecycle_test.go @@ -23,7 +23,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" - "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1" + "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1" corev1 "k8s.io/api/core/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" diff --git a/pkg/apis/messaging/v1beta1/channel_lifecycle.go b/pkg/apis/messaging/v1beta1/channel_lifecycle.go index 6817080a5c..c598658213 100644 --- a/pkg/apis/messaging/v1beta1/channel_lifecycle.go +++ b/pkg/apis/messaging/v1beta1/channel_lifecycle.go @@ -17,7 +17,7 @@ package v1beta1 import ( - "github.com/google/knative-gcp/pkg/apis/pubsub/v1beta1" + "github.com/google/knative-gcp/pkg/apis/intevents/v1beta1" corev1 "k8s.io/api/core/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" diff --git a/pkg/apis/messaging/v1beta1/channel_lifecycle_test.go b/pkg/apis/messaging/v1beta1/channel_lifecycle_test.go index 710d6fe5d1..2e47006477 100644 --- a/pkg/apis/messaging/v1beta1/channel_lifecycle_test.go +++ b/pkg/apis/messaging/v1beta1/channel_lifecycle_test.go @@ -23,7 +23,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" duckv1beta1 "github.com/google/knative-gcp/pkg/apis/duck/v1beta1" - "github.com/google/knative-gcp/pkg/apis/pubsub/v1beta1" + "github.com/google/knative-gcp/pkg/apis/intevents/v1beta1" corev1 "k8s.io/api/core/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" diff --git a/pkg/reconciler/events/auditlogs/auditlogs.go b/pkg/reconciler/events/auditlogs/auditlogs.go index 5e9cafd325..a590de0030 100644 --- a/pkg/reconciler/events/auditlogs/auditlogs.go +++ b/pkg/reconciler/events/auditlogs/auditlogs.go @@ -36,7 +36,7 @@ import ( gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" "github.com/google/knative-gcp/pkg/reconciler/events/auditlogs/resources" "github.com/google/knative-gcp/pkg/reconciler/identity" - pubsubreconciler "github.com/google/knative-gcp/pkg/reconciler/pubsub" + "github.com/google/knative-gcp/pkg/reconciler/intevents" ) const ( @@ -53,7 +53,7 @@ const ( ) type Reconciler struct { - *pubsubreconciler.PubSubBase + *intevents.PubSubBase // identity reconciler for reconciling workload identity. *identity.Identity auditLogsSourceLister listers.CloudAuditLogsSourceLister diff --git a/pkg/reconciler/events/auditlogs/auditlogs_test.go b/pkg/reconciler/events/auditlogs/auditlogs_test.go index 5a60406ea3..943d667bb2 100644 --- a/pkg/reconciler/events/auditlogs/auditlogs_test.go +++ b/pkg/reconciler/events/auditlogs/auditlogs_test.go @@ -42,7 +42,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" - pubsubv1alpha1 "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1" + inteventsv1alpha1 "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1" "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1alpha1/cloudauditlogssource" testiam "github.com/google/knative-gcp/pkg/gclient/iam/testing" glogadmin "github.com/google/knative-gcp/pkg/gclient/logging/logadmin" @@ -51,7 +51,7 @@ import ( gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub/testing" "github.com/google/knative-gcp/pkg/pubsub/adapter/converters" "github.com/google/knative-gcp/pkg/reconciler/identity" - "github.com/google/knative-gcp/pkg/reconciler/pubsub" + "github.com/google/knative-gcp/pkg/reconciler/intevents" . "github.com/google/knative-gcp/pkg/reconciler/testing" ) @@ -163,19 +163,20 @@ func TestAllCases(t *testing.T) { })), }}, WantCreates: []runtime.Object{ - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicSpec(pubsubv1alpha1.TopicSpec{ + NewTopic(sourceName, testNS, + WithTopicSpec(inteventsv1alpha1.TopicSpec{ Topic: "cloudauditlogssource-" + sourceUID, PropagationPolicy: "CreateDelete", }), - WithPubSubTopicLabels(map[string]string{ + WithTopicLabels(map[string]string{ "receive-adapter": receiveAdapterName, "events.cloud.google.com/source-name": sourceName, }), - WithPubSubTopicOwnerReferences([]metav1.OwnerReference{sourceOwnerRef(sourceName, sourceUID)}), - WithPubSubTopicAnnotations(map[string]string{ + WithTopicOwnerReferences([]metav1.OwnerReference{sourceOwnerRef(sourceName, sourceUID)}), + WithTopicAnnotations(map[string]string{ duckv1alpha1.ClusterNameAnnotation: testingMetadataClient.FakeClusterName, - })), + }), + ), }, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(testNS, sourceName, true), @@ -195,8 +196,8 @@ func TestAllCases(t *testing.T) { duckv1alpha1.ClusterNameAnnotation: testingMetadataClient.FakeClusterName, }), ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicTopicID(testTopicID), + NewTopic(sourceName, testNS, + WithTopicTopicID(testTopicID), ), }, Key: testNS + "/" + sourceName, @@ -225,9 +226,9 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceMethodName(testMethodName), WithCloudAuditLogsSourceServiceName(testServiceName), ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), + NewTopic(sourceName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), ), }, Key: testNS + "/" + sourceName, @@ -255,10 +256,10 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceMethodName(testMethodName), WithCloudAuditLogsSourceServiceName(testServiceName), ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady(""), - WithPubSubTopicProjectID(testProject), - WithPubSubTopicAddress(testTopicURI), + NewTopic(sourceName, testNS, + WithTopicReady(""), + WithTopicProjectID(testProject), + WithTopicAddress(testTopicURI), ), }, Key: testNS + "/" + sourceName, @@ -286,10 +287,10 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceMethodName(testMethodName), WithCloudAuditLogsSourceServiceName(testServiceName), ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady("garbaaaaage"), - WithPubSubTopicProjectID(testProject), - WithPubSubTopicAddress(testTopicURI), + NewTopic(sourceName, testNS, + WithTopicReady("garbaaaaage"), + WithTopicProjectID(testProject), + WithTopicAddress(testTopicURI), ), }, Key: testNS + "/" + sourceName, @@ -317,9 +318,9 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceMethodName(testMethodName), WithCloudAuditLogsSourceServiceName(testServiceName), ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicFailed(), - WithPubSubTopicTopicID(testTopicID), + NewTopic(sourceName, testNS, + WithTopicFailed(), + WithTopicTopicID(testTopicID), ), }, Key: testNS + "/" + sourceName, @@ -346,9 +347,9 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceMethodName(testMethodName), WithCloudAuditLogsSourceServiceName(testServiceName), ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicUnknown(), - WithPubSubTopicTopicID(testTopicID), + NewTopic(sourceName, testNS, + WithTopicUnknown(), + WithTopicTopicID(testTopicID), ), }, Key: testNS + "/" + sourceName, @@ -378,10 +379,10 @@ func TestAllCases(t *testing.T) { duckv1alpha1.ClusterNameAnnotation: testingMetadataClient.FakeClusterName, }), ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(sourceName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), }, Key: testNS + "/" + sourceName, @@ -400,24 +401,24 @@ func TestAllCases(t *testing.T) { ), }}, WantCreates: []runtime.Object{ - NewPubSubPullSubscriptionWithNoDefaults(sourceName, testNS, - WithPubSubPullSubscriptionSpecWithNoDefaults(pubsubv1alpha1.PullSubscriptionSpec{ + NewPullSubscriptionWithNoDefaults(sourceName, testNS, + WithPullSubscriptionSpecWithNoDefaults(inteventsv1alpha1.PullSubscriptionSpec{ Topic: testTopicID, PubSubSpec: duckv1alpha1.PubSubSpec{ Secret: &secret, }, AdapterType: converters.CloudAuditLogsConverter, }), - WithPubSubPullSubscriptionSink(sinkGVK, sinkName), - WithPubSubPullSubscriptionLabels(map[string]string{ + WithPullSubscriptionSink(sinkGVK, sinkName), + WithPullSubscriptionLabels(map[string]string{ "receive-adapter": receiveAdapterName, "events.cloud.google.com/source-name": sourceName, }), - WithPubSubPullSubscriptionAnnotations(map[string]string{ + WithPullSubscriptionAnnotations(map[string]string{ "metrics-resource-group": resourceGroup, duckv1alpha1.ClusterNameAnnotation: testingMetadataClient.FakeClusterName, }), - WithPubSubPullSubscriptionOwnerReferences([]metav1.OwnerReference{sourceOwnerRef(sourceName, sourceUID)}), + WithPullSubscriptionOwnerReferences([]metav1.OwnerReference{sourceOwnerRef(sourceName, sourceUID)}), ), }, WantPatches: []clientgotesting.PatchActionImpl{ @@ -435,12 +436,12 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceSink(sinkGVK, sinkName), ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(sourceName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(sourceName, testNS), + NewPullSubscriptionWithNoDefaults(sourceName, testNS), }, Key: testNS + "/" + sourceName, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ @@ -469,12 +470,12 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceSink(sinkGVK, sinkName), ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(sourceName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(sourceName, testNS, WithPubSubPullSubscriptionFailed()), + NewPullSubscriptionWithNoDefaults(sourceName, testNS, WithPullSubscriptionFailed()), }, Key: testNS + "/" + sourceName, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ @@ -503,12 +504,12 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceSink(sinkGVK, sinkName), ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(sourceName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(sourceName, testNS, WithPubSubPullSubscriptionUnknown()), + NewPullSubscriptionWithNoDefaults(sourceName, testNS, WithPullSubscriptionUnknown()), }, Key: testNS + "/" + sourceName, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ @@ -537,13 +538,13 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceSink(sinkGVK, sinkName), ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(sourceName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(sourceName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(sourceName, testNS, + WithPullSubscriptionReady(sinkURI), ), }, Key: testNS + "/" + sourceName, @@ -580,13 +581,13 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceSink(sinkGVK, sinkName), ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(sourceName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(sourceName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(sourceName, testNS, + WithPullSubscriptionReady(sinkURI), ), }, Key: testNS + "/" + sourceName, @@ -623,13 +624,13 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceSink(sinkGVK, sinkName), ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(sourceName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(sourceName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(sourceName, testNS, + WithPullSubscriptionReady(sinkURI), ), }, Key: testNS + "/" + sourceName, @@ -666,13 +667,13 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceSink(sinkGVK, sinkName), ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(sourceName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(sourceName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(sourceName, testNS, + WithPullSubscriptionReady(sinkURI), ), }, Key: testNS + "/" + sourceName, @@ -710,13 +711,13 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceSink(sinkGVK, sinkName), WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceMethodName(testMethodName)), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(sourceName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(sourceName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(sourceName, testNS, + WithPullSubscriptionReady(sinkURI), ), }, Key: testNS + "/" + sourceName, @@ -764,13 +765,13 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceSink(sinkGVK, sinkName), WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceMethodName(testMethodName)), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(sourceName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(sourceName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(sourceName, testNS, + WithPullSubscriptionReady(sinkURI), ), }, Key: testNS + "/" + sourceName, @@ -818,13 +819,13 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceSink(sinkGVK, sinkName), WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceMethodName(testMethodName)), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(sourceName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(sourceName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(sourceName, testNS, + WithPullSubscriptionReady(sinkURI), ), }, Key: testNS + "/" + sourceName, @@ -867,13 +868,13 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceServiceName(testServiceName), WithCloudAuditLogsSourceSink(sinkGVK, sinkName), ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(sourceName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(sourceName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(sourceName, testNS, + WithPullSubscriptionReady(sinkURI), ), }, Key: testNS + "/" + sourceName, @@ -927,13 +928,13 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceSinkID(testSinkID), WithCloudAuditLogsSourceDeletionTimestamp, ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(sourceName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(sourceName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(sourceName, testNS, + WithPullSubscriptionReady(sinkURI), ), }, Key: testNS + "/" + sourceName, @@ -973,13 +974,13 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceSinkID(testSinkID), WithCloudAuditLogsSourceDeletionTimestamp, ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(sourceName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(sourceName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(sourceName, testNS, + WithPullSubscriptionReady(sinkURI), ), }, Key: testNS + "/" + sourceName, @@ -1007,11 +1008,11 @@ func TestAllCases(t *testing.T) { }}, WantDeletes: []clientgotesting.DeleteActionImpl{ {ActionImpl: clientgotesting.ActionImpl{ - Namespace: testNS, Verb: "delete", Resource: schema.GroupVersionResource{Group: "pubsub.cloud.google.com", Version: "v1alpha1", Resource: "topics"}}, + Namespace: testNS, Verb: "delete", Resource: schema.GroupVersionResource{Group: "internal.events.cloud.google.com", Version: "v1alpha1", Resource: "topics"}}, Name: sourceName, }, {ActionImpl: clientgotesting.ActionImpl{ - Namespace: testNS, Verb: "delete", Resource: schema.GroupVersionResource{Group: "pubsub.cloud.google.com", Version: "v1alpha1", Resource: "pullsubscriptions"}}, + Namespace: testNS, Verb: "delete", Resource: schema.GroupVersionResource{Group: "internal.events.cloud.google.com", Version: "v1alpha1", Resource: "pullsubscriptions"}}, Name: sourceName, }, }, @@ -1031,13 +1032,13 @@ func TestAllCases(t *testing.T) { WithCloudAuditLogsSourceSinkID(testSinkID), WithCloudAuditLogsSourceDeletionTimestamp, ), - NewPubSubTopic(sourceName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(sourceName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(sourceName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(sourceName, testNS, + WithPullSubscriptionReady(sinkURI), ), }, Key: testNS + "/" + sourceName, @@ -1060,11 +1061,11 @@ func TestAllCases(t *testing.T) { }}, WantDeletes: []clientgotesting.DeleteActionImpl{ {ActionImpl: clientgotesting.ActionImpl{ - Namespace: testNS, Verb: "delete", Resource: schema.GroupVersionResource{Group: "pubsub.cloud.google.com", Version: "v1alpha1", Resource: "topics"}}, + Namespace: testNS, Verb: "delete", Resource: schema.GroupVersionResource{Group: "internal.events.cloud.google.com", Version: "v1alpha1", Resource: "topics"}}, Name: sourceName, }, {ActionImpl: clientgotesting.ActionImpl{ - Namespace: testNS, Verb: "delete", Resource: schema.GroupVersionResource{Group: "pubsub.cloud.google.com", Version: "v1alpha1", Resource: "pullsubscriptions"}}, + Namespace: testNS, Verb: "delete", Resource: schema.GroupVersionResource{Group: "internal.events.cloud.google.com", Version: "v1alpha1", Resource: "pullsubscriptions"}}, Name: sourceName, }, }, @@ -1115,7 +1116,7 @@ func TestAllCases(t *testing.T) { tt.Test(t, MakeFactory( func(ctx context.Context, listers *Listers, cmw configmap.Watcher, testData map[string]interface{}) controller.Reconciler { r := &Reconciler{ - PubSubBase: pubsub.NewPubSubBaseWithAdapter(ctx, controllerAgentName, receiveAdapterName, converters.CloudAuditLogsConverter, cmw), + PubSubBase: intevents.NewPubSubBaseWithAdapter(ctx, controllerAgentName, receiveAdapterName, converters.CloudAuditLogsConverter, cmw), Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager), auditLogsSourceLister: listers.GetCloudAuditLogsSourceLister(), logadminClientProvider: logadminClientProvider, diff --git a/pkg/reconciler/events/auditlogs/controller.go b/pkg/reconciler/events/auditlogs/controller.go index 3247db485a..6207aa74b9 100644 --- a/pkg/reconciler/events/auditlogs/controller.go +++ b/pkg/reconciler/events/auditlogs/controller.go @@ -25,15 +25,15 @@ import ( "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/identity/iam" - "github.com/google/knative-gcp/pkg/reconciler/pubsub" + "github.com/google/knative-gcp/pkg/reconciler/intevents" "k8s.io/client-go/tools/cache" serviceaccountinformers "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" cloudauditlogssourceinformers "github.com/google/knative-gcp/pkg/client/injection/informers/events/v1alpha1/cloudauditlogssource" - pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/pullsubscription" - topicinformers "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/topic" + pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/pullsubscription" + topicinformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/topic" cloudauditlogssourcereconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1alpha1/cloudauditlogssource" glogadmin "github.com/google/knative-gcp/pkg/gclient/logging/logadmin" gpubsub "github.com/google/knative-gcp/pkg/gclient/pubsub" @@ -74,7 +74,7 @@ func newControllerWithIAMPolicyManager( serviceAccountInformer := serviceaccountinformers.Get(ctx) r := &Reconciler{ - PubSubBase: pubsub.NewPubSubBaseWithAdapter(ctx, controllerAgentName, receiveAdapterName, converters.CloudAuditLogsConverter, cmw), + PubSubBase: intevents.NewPubSubBaseWithAdapter(ctx, controllerAgentName, receiveAdapterName, converters.CloudAuditLogsConverter, cmw), Identity: identity.NewIdentity(ctx, ipm), auditLogsSourceLister: cloudauditlogssourceInformer.Lister(), logadminClientProvider: glogadmin.NewClient, diff --git a/pkg/reconciler/events/auditlogs/controller_test.go b/pkg/reconciler/events/auditlogs/controller_test.go index d87f113b71..7869754b37 100644 --- a/pkg/reconciler/events/auditlogs/controller_test.go +++ b/pkg/reconciler/events/auditlogs/controller_test.go @@ -25,11 +25,11 @@ import ( . "knative.dev/pkg/reconciler/testing" // Fake injection informers - _ "github.com/google/knative-gcp/pkg/client/clientset/versioned/typed/pubsub/v1alpha1/fake" + _ "github.com/google/knative-gcp/pkg/client/clientset/versioned/typed/intevents/v1alpha1/fake" _ "github.com/google/knative-gcp/pkg/client/injection/client/fake" _ "github.com/google/knative-gcp/pkg/client/injection/informers/events/v1alpha1/cloudauditlogssource/fake" - _ "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/pullsubscription/fake" - _ "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/topic/fake" + _ "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/pullsubscription/fake" + _ "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/topic/fake" _ "github.com/google/knative-gcp/pkg/reconciler/testing" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake" ) diff --git a/pkg/reconciler/events/build/build.go b/pkg/reconciler/events/build/build.go index b31106d2c9..bd92c002e1 100644 --- a/pkg/reconciler/events/build/build.go +++ b/pkg/reconciler/events/build/build.go @@ -30,9 +30,8 @@ import ( "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" cloudbuildsourcereconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1alpha1/cloudbuildsource" listers "github.com/google/knative-gcp/pkg/client/listers/events/v1alpha1" - pubsublisters "github.com/google/knative-gcp/pkg/client/listers/pubsub/v1alpha1" "github.com/google/knative-gcp/pkg/reconciler/identity" - "github.com/google/knative-gcp/pkg/reconciler/pubsub" + "github.com/google/knative-gcp/pkg/reconciler/intevents" ) const ( @@ -48,14 +47,12 @@ const ( // Reconciler is the controller implementation for the CloudBuildSource source. type Reconciler struct { - *pubsub.PubSubBase + *intevents.PubSubBase // identity reconciler for reconciling workload identity. *identity.Identity // buildLister for reading cloudbuildsources. buildLister listers.CloudBuildSourceLister - // pullsubscriptionLister for reading pullsubscriptions. - pullsubscriptionLister pubsublisters.PullSubscriptionLister // serviceAccountLister for reading serviceAccounts. serviceAccountLister corev1listers.ServiceAccountLister } diff --git a/pkg/reconciler/events/build/build_test.go b/pkg/reconciler/events/build/build_test.go index 2b550612e3..74d6022649 100644 --- a/pkg/reconciler/events/build/build_test.go +++ b/pkg/reconciler/events/build/build_test.go @@ -39,12 +39,11 @@ import ( duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" - pubsubv1alpha1 "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1" + inteventsv1alpha1 "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1" "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1alpha1/cloudbuildsource" testingMetadataClient "github.com/google/knative-gcp/pkg/gclient/metadata/testing" "github.com/google/knative-gcp/pkg/reconciler/identity" - "github.com/google/knative-gcp/pkg/reconciler/pubsub" - reconcilerpubsub "github.com/google/knative-gcp/pkg/reconciler/pubsub" + "github.com/google/knative-gcp/pkg/reconciler/intevents" . "github.com/google/knative-gcp/pkg/reconciler/testing" ) @@ -174,23 +173,23 @@ func TestAllCases(t *testing.T) { ), }}, WantCreates: []runtime.Object{ - NewPubSubPullSubscriptionWithNoDefaults(buildName, testNS, - WithPubSubPullSubscriptionSpecWithNoDefaults(pubsubv1alpha1.PullSubscriptionSpec{ + NewPullSubscriptionWithNoDefaults(buildName, testNS, + WithPullSubscriptionSpecWithNoDefaults(inteventsv1alpha1.PullSubscriptionSpec{ Topic: testTopicID, PubSubSpec: duckv1alpha1.PubSubSpec{ Secret: &secret, }, }), - WithPubSubPullSubscriptionSink(sinkGVK, sinkName), - WithPubSubPullSubscriptionLabels(map[string]string{ + WithPullSubscriptionSink(sinkGVK, sinkName), + WithPullSubscriptionLabels(map[string]string{ "receive-adapter": receiveAdapterName, "events.cloud.google.com/source-name": buildName, }), - WithPubSubPullSubscriptionAnnotations(map[string]string{ + WithPullSubscriptionAnnotations(map[string]string{ "metrics-resource-group": resourceGroup, duckv1alpha1.ClusterNameAnnotation: testingMetadataClient.FakeClusterName, }), - WithPubSubPullSubscriptionOwnerReferences([]metav1.OwnerReference{ownerRef()}), + WithPullSubscriptionOwnerReferences([]metav1.OwnerReference{ownerRef()}), ), }, WantPatches: []clientgotesting.PatchActionImpl{ @@ -198,7 +197,7 @@ func TestAllCases(t *testing.T) { }, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", buildName), - Eventf(corev1.EventTypeWarning, reconcilerpubsub.PullSubscriptionStatusPropagateFailedReason, "%s: PullSubscription %q has not yet been reconciled", failedToPropagatePullSubscriptionStatusMsg, buildName), + Eventf(corev1.EventTypeWarning, intevents.PullSubscriptionStatusPropagateFailedReason, "%s: PullSubscription %q has not yet been reconciled", failedToPropagatePullSubscriptionStatusMsg, buildName), }, }, { Name: "pullsubscription exists and the status is false", @@ -208,8 +207,8 @@ func TestAllCases(t *testing.T) { WithCloudBuildSourceTopic(testTopicID), WithCloudBuildSourceSink(sinkGVK, sinkName), ), - NewPubSubPullSubscriptionWithNoDefaults(buildName, testNS, - WithPubSubPullSubscriptionReadyStatus(corev1.ConditionFalse, "PullSubscriptionFalse", "status false test message")), + NewPullSubscriptionWithNoDefaults(buildName, testNS, + WithPullSubscriptionReadyStatus(corev1.ConditionFalse, "PullSubscriptionFalse", "status false test message")), newSink(), }, Key: testNS + "/" + buildName, @@ -229,7 +228,7 @@ func TestAllCases(t *testing.T) { }, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", buildName), - Eventf(corev1.EventTypeWarning, reconcilerpubsub.PullSubscriptionStatusPropagateFailedReason, "%s: the status of PullSubscription %q is False", failedToPropagatePullSubscriptionStatusMsg, buildName), + Eventf(corev1.EventTypeWarning, intevents.PullSubscriptionStatusPropagateFailedReason, "%s: the status of PullSubscription %q is False", failedToPropagatePullSubscriptionStatusMsg, buildName), }, }, { Name: "pullsubscription exists and the status is unknown", @@ -239,8 +238,8 @@ func TestAllCases(t *testing.T) { WithCloudBuildSourceTopic(testTopicID), WithCloudBuildSourceSink(sinkGVK, sinkName), ), - NewPubSubPullSubscriptionWithNoDefaults(buildName, testNS, - WithPubSubPullSubscriptionReadyStatus(corev1.ConditionUnknown, "PullSubscriptionUnknown", "status unknown test message")), + NewPullSubscriptionWithNoDefaults(buildName, testNS, + WithPullSubscriptionReadyStatus(corev1.ConditionUnknown, "PullSubscriptionUnknown", "status unknown test message")), newSink(), }, Key: testNS + "/" + buildName, @@ -260,7 +259,7 @@ func TestAllCases(t *testing.T) { }, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", buildName), - Eventf(corev1.EventTypeWarning, reconcilerpubsub.PullSubscriptionStatusPropagateFailedReason, "%s: the status of PullSubscription %q is Unknown", failedToPropagatePullSubscriptionStatusMsg, buildName), + Eventf(corev1.EventTypeWarning, intevents.PullSubscriptionStatusPropagateFailedReason, "%s: the status of PullSubscription %q is Unknown", failedToPropagatePullSubscriptionStatusMsg, buildName), }, }, { Name: "pullsubscription exists and ready, with retry", @@ -270,9 +269,9 @@ func TestAllCases(t *testing.T) { WithCloudBuildSourceTopic(testTopicID), WithCloudBuildSourceSink(sinkGVK, sinkName), ), - NewPubSubPullSubscriptionWithNoDefaults(buildName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), - WithPubSubPullSubscriptionReadyStatus(corev1.ConditionTrue, "PullSubscriptionNoReady", ""), + NewPullSubscriptionWithNoDefaults(buildName, testNS, + WithPullSubscriptionReady(sinkURI), + WithPullSubscriptionReadyStatus(corev1.ConditionTrue, "PullSubscriptionNoReady", ""), ), newSink(), }, @@ -354,11 +353,10 @@ func TestAllCases(t *testing.T) { defer logtesting.ClearAll() table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher, _ map[string]interface{}) controller.Reconciler { r := &Reconciler{ - PubSubBase: pubsub.NewPubSubBase(ctx, controllerAgentName, receiveAdapterName, cmw), - Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager), - buildLister: listers.GetCloudBuildSourceLister(), - pullsubscriptionLister: listers.GetPubSubPullSubscriptionLister(), - serviceAccountLister: listers.GetServiceAccountLister(), + PubSubBase: intevents.NewPubSubBase(ctx, controllerAgentName, receiveAdapterName, cmw), + Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager), + buildLister: listers.GetCloudBuildSourceLister(), + serviceAccountLister: listers.GetServiceAccountLister(), } return cloudbuildsource.NewReconciler(ctx, r.Logger, r.RunClientSet, listers.GetCloudBuildSourceLister(), r.Recorder, r) })) diff --git a/pkg/reconciler/events/build/controller.go b/pkg/reconciler/events/build/controller.go index 79da305d1d..1b9cff7ba5 100644 --- a/pkg/reconciler/events/build/controller.go +++ b/pkg/reconciler/events/build/controller.go @@ -23,13 +23,13 @@ import ( "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" cloudbuildsourceinformers "github.com/google/knative-gcp/pkg/client/injection/informers/events/v1alpha1/cloudbuildsource" - pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/pullsubscription" + pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/pullsubscription" cloudbuildsourcereconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1alpha1/cloudbuildsource" "github.com/google/knative-gcp/pkg/pubsub/adapter/converters" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/identity/iam" - "github.com/google/knative-gcp/pkg/reconciler/pubsub" + "github.com/google/knative-gcp/pkg/reconciler/intevents" ) const ( @@ -66,11 +66,10 @@ func newControllerWithIAMPolicyManager( serviceAccountInformer := serviceaccountinformers.Get(ctx) r := &Reconciler{ - PubSubBase: pubsub.NewPubSubBaseWithAdapter(ctx, controllerAgentName, receiveAdapterName, converters.CloudBuildConverter, cmw), - Identity: identity.NewIdentity(ctx, ipm), - buildLister: cloudbuildsourceInformer.Lister(), - serviceAccountLister: serviceAccountInformer.Lister(), - pullsubscriptionLister: pullsubscriptionInformer.Lister(), + PubSubBase: intevents.NewPubSubBaseWithAdapter(ctx, controllerAgentName, receiveAdapterName, converters.CloudBuildConverter, cmw), + Identity: identity.NewIdentity(ctx, ipm), + buildLister: cloudbuildsourceInformer.Lister(), + serviceAccountLister: serviceAccountInformer.Lister(), } impl := cloudbuildsourcereconciler.NewImpl(ctx, r) diff --git a/pkg/reconciler/events/build/controller_test.go b/pkg/reconciler/events/build/controller_test.go index e7ad210b0b..f30cbe946a 100644 --- a/pkg/reconciler/events/build/controller_test.go +++ b/pkg/reconciler/events/build/controller_test.go @@ -25,10 +25,10 @@ import ( . "knative.dev/pkg/reconciler/testing" // Fake injection informers - _ "github.com/google/knative-gcp/pkg/client/clientset/versioned/typed/pubsub/v1alpha1/fake" + _ "github.com/google/knative-gcp/pkg/client/clientset/versioned/typed/intevents/v1alpha1/fake" _ "github.com/google/knative-gcp/pkg/client/injection/client/fake" _ "github.com/google/knative-gcp/pkg/client/injection/informers/events/v1alpha1/cloudbuildsource/fake" - _ "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/pullsubscription/fake" + _ "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/pullsubscription/fake" _ "github.com/google/knative-gcp/pkg/reconciler/testing" _ "knative.dev/pkg/client/injection/kube/informers/batch/v1/job/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake" diff --git a/pkg/reconciler/events/pubsub/controller.go b/pkg/reconciler/events/pubsub/controller.go index b3e155a0e0..a3760aeb04 100644 --- a/pkg/reconciler/events/pubsub/controller.go +++ b/pkg/reconciler/events/pubsub/controller.go @@ -26,12 +26,12 @@ import ( "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" cloudpubsubsourceinformers "github.com/google/knative-gcp/pkg/client/injection/informers/events/v1alpha1/cloudpubsubsource" - pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/pullsubscription" + pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/pullsubscription" cloudpubsubsourcereconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1alpha1/cloudpubsubsource" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/identity/iam" - "github.com/google/knative-gcp/pkg/reconciler/pubsub" + "github.com/google/knative-gcp/pkg/reconciler/intevents" ) const ( @@ -68,11 +68,9 @@ func newControllerWithIAMPolicyManager( serviceAccountInformer := serviceaccountinformers.Get(ctx) r := &Reconciler{ - PubSubBase: pubsub.NewPubSubBase(ctx, controllerAgentName, receiveAdapterName, cmw), - Identity: identity.NewIdentity(ctx, ipm), - pubsubLister: cloudpubsubsourceInformer.Lister(), - pullsubscriptionLister: pullsubscriptionInformer.Lister(), - serviceAccountLister: serviceAccountInformer.Lister(), + PubSubBase: intevents.NewPubSubBase(ctx, controllerAgentName, receiveAdapterName, cmw), + Identity: identity.NewIdentity(ctx, ipm), + pubsubLister: cloudpubsubsourceInformer.Lister(), } impl := cloudpubsubsourcereconciler.NewImpl(ctx, r) diff --git a/pkg/reconciler/events/pubsub/controller_test.go b/pkg/reconciler/events/pubsub/controller_test.go index 7a83a3b131..49c3ffa290 100644 --- a/pkg/reconciler/events/pubsub/controller_test.go +++ b/pkg/reconciler/events/pubsub/controller_test.go @@ -25,10 +25,10 @@ import ( . "knative.dev/pkg/reconciler/testing" // Fake injection informers - _ "github.com/google/knative-gcp/pkg/client/clientset/versioned/typed/pubsub/v1alpha1/fake" + _ "github.com/google/knative-gcp/pkg/client/clientset/versioned/typed/intevents/v1alpha1/fake" _ "github.com/google/knative-gcp/pkg/client/injection/client/fake" _ "github.com/google/knative-gcp/pkg/client/injection/informers/events/v1alpha1/cloudpubsubsource/fake" - _ "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/pullsubscription/fake" + _ "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/pullsubscription/fake" _ "github.com/google/knative-gcp/pkg/reconciler/testing" _ "knative.dev/pkg/client/injection/kube/informers/batch/v1/job/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake" diff --git a/pkg/reconciler/events/pubsub/pubsub.go b/pkg/reconciler/events/pubsub/pubsub.go index 11f1d86578..ca6bd2b9f3 100644 --- a/pkg/reconciler/events/pubsub/pubsub.go +++ b/pkg/reconciler/events/pubsub/pubsub.go @@ -21,17 +21,14 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - corev1listers "k8s.io/client-go/listers/core/v1" - "knative.dev/pkg/logging" pkgreconciler "knative.dev/pkg/reconciler" "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" cloudpubsubsourcereconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1alpha1/cloudpubsubsource" listers "github.com/google/knative-gcp/pkg/client/listers/events/v1alpha1" - pubsublisters "github.com/google/knative-gcp/pkg/client/listers/pubsub/v1alpha1" "github.com/google/knative-gcp/pkg/reconciler/identity" - "github.com/google/knative-gcp/pkg/reconciler/pubsub" + "github.com/google/knative-gcp/pkg/reconciler/intevents" ) const ( @@ -44,15 +41,11 @@ const ( // Reconciler is the controller implementation for the CloudPubSubSource source. type Reconciler struct { - *pubsub.PubSubBase + *intevents.PubSubBase // identity reconciler for reconciling workload identity. *identity.Identity // pubsubLister for reading cloudpubsubsources. pubsubLister listers.CloudPubSubSourceLister - // pullsubscriptionLister for reading pullsubscriptions. - pullsubscriptionLister pubsublisters.PullSubscriptionLister - // serviceAccountLister for reading serviceAccounts. - serviceAccountLister corev1listers.ServiceAccountLister } // Check that our Reconciler implements Interface. diff --git a/pkg/reconciler/events/pubsub/pubsub_test.go b/pkg/reconciler/events/pubsub/pubsub_test.go index 975057da87..85f63138df 100644 --- a/pkg/reconciler/events/pubsub/pubsub_test.go +++ b/pkg/reconciler/events/pubsub/pubsub_test.go @@ -39,12 +39,11 @@ import ( duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" - pubsubv1alpha1 "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1" + inteventsv1alpha1 "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1" "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1alpha1/cloudpubsubsource" testingMetadataClient "github.com/google/knative-gcp/pkg/gclient/metadata/testing" "github.com/google/knative-gcp/pkg/reconciler/identity" - "github.com/google/knative-gcp/pkg/reconciler/pubsub" - reconcilerpubsub "github.com/google/knative-gcp/pkg/reconciler/pubsub" + "github.com/google/knative-gcp/pkg/reconciler/intevents" . "github.com/google/knative-gcp/pkg/reconciler/testing" ) @@ -172,24 +171,24 @@ func TestAllCases(t *testing.T) { ), }}, WantCreates: []runtime.Object{ - NewPubSubPullSubscriptionWithNoDefaults(pubsubName, testNS, - WithPubSubPullSubscriptionSpecWithNoDefaults(pubsubv1alpha1.PullSubscriptionSpec{ + NewPullSubscriptionWithNoDefaults(pubsubName, testNS, + WithPullSubscriptionSpecWithNoDefaults(inteventsv1alpha1.PullSubscriptionSpec{ Topic: testTopicID, PubSubSpec: duckv1alpha1.PubSubSpec{ Secret: &secret, }, }), - WithPubSubPullSubscriptionSink(sinkGVK, sinkName), - WithPubSubPullSubscriptionMode(pubsubv1alpha1.ModePushCompatible), - WithPubSubPullSubscriptionLabels(map[string]string{ + WithPullSubscriptionSink(sinkGVK, sinkName), + WithPullSubscriptionMode(inteventsv1alpha1.ModePushCompatible), + WithPullSubscriptionLabels(map[string]string{ "receive-adapter": receiveAdapterName, "events.cloud.google.com/source-name": pubsubName, }), - WithPubSubPullSubscriptionAnnotations(map[string]string{ + WithPullSubscriptionAnnotations(map[string]string{ "metrics-resource-group": resourceGroup, duckv1alpha1.ClusterNameAnnotation: testingMetadataClient.FakeClusterName, }), - WithPubSubPullSubscriptionOwnerReferences([]metav1.OwnerReference{ownerRef()}), + WithPullSubscriptionOwnerReferences([]metav1.OwnerReference{ownerRef()}), ), }, WantPatches: []clientgotesting.PatchActionImpl{ @@ -197,7 +196,7 @@ func TestAllCases(t *testing.T) { }, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", pubsubName), - Eventf(corev1.EventTypeWarning, reconcilerpubsub.PullSubscriptionStatusPropagateFailedReason, "%s: PullSubscription %q has not yet been reconciled", failedToPropagatePullSubscriptionStatusMsg, pubsubName), + Eventf(corev1.EventTypeWarning, intevents.PullSubscriptionStatusPropagateFailedReason, "%s: PullSubscription %q has not yet been reconciled", failedToPropagatePullSubscriptionStatusMsg, pubsubName), }, }, { Name: "pullsubscription exists and the status is false", @@ -207,8 +206,8 @@ func TestAllCases(t *testing.T) { WithCloudPubSubSourceTopic(testTopicID), WithCloudPubSubSourceSink(sinkGVK, sinkName), ), - NewPubSubPullSubscriptionWithNoDefaults(pubsubName, testNS, - WithPubSubPullSubscriptionReadyStatus(corev1.ConditionFalse, "PullSubscriptionFalse", "status false test message")), + NewPullSubscriptionWithNoDefaults(pubsubName, testNS, + WithPullSubscriptionReadyStatus(corev1.ConditionFalse, "PullSubscriptionFalse", "status false test message")), newSink(), }, Key: testNS + "/" + pubsubName, @@ -228,7 +227,7 @@ func TestAllCases(t *testing.T) { }, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", pubsubName), - Eventf(corev1.EventTypeWarning, reconcilerpubsub.PullSubscriptionStatusPropagateFailedReason, "%s: the status of PullSubscription %q is False", failedToPropagatePullSubscriptionStatusMsg, pubsubName), + Eventf(corev1.EventTypeWarning, intevents.PullSubscriptionStatusPropagateFailedReason, "%s: the status of PullSubscription %q is False", failedToPropagatePullSubscriptionStatusMsg, pubsubName), }, }, { Name: "pullsubscription exists and the status is unknown", @@ -238,8 +237,8 @@ func TestAllCases(t *testing.T) { WithCloudPubSubSourceTopic(testTopicID), WithCloudPubSubSourceSink(sinkGVK, sinkName), ), - NewPubSubPullSubscriptionWithNoDefaults(pubsubName, testNS, - WithPubSubPullSubscriptionReadyStatus(corev1.ConditionUnknown, "PullSubscriptionUnknown", "status unknown test message")), + NewPullSubscriptionWithNoDefaults(pubsubName, testNS, + WithPullSubscriptionReadyStatus(corev1.ConditionUnknown, "PullSubscriptionUnknown", "status unknown test message")), newSink(), }, Key: testNS + "/" + pubsubName, @@ -259,7 +258,7 @@ func TestAllCases(t *testing.T) { }, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", pubsubName), - Eventf(corev1.EventTypeWarning, reconcilerpubsub.PullSubscriptionStatusPropagateFailedReason, "%s: the status of PullSubscription %q is Unknown", failedToPropagatePullSubscriptionStatusMsg, pubsubName), + Eventf(corev1.EventTypeWarning, intevents.PullSubscriptionStatusPropagateFailedReason, "%s: the status of PullSubscription %q is Unknown", failedToPropagatePullSubscriptionStatusMsg, pubsubName), }, }, { Name: "pullsubscription exists and ready, with retry", @@ -269,9 +268,9 @@ func TestAllCases(t *testing.T) { WithCloudPubSubSourceTopic(testTopicID), WithCloudPubSubSourceSink(sinkGVK, sinkName), ), - NewPubSubPullSubscriptionWithNoDefaults(pubsubName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), - WithPubSubPullSubscriptionReadyStatus(corev1.ConditionTrue, "PullSubscriptionNoReady", ""), + NewPullSubscriptionWithNoDefaults(pubsubName, testNS, + WithPullSubscriptionReady(sinkURI), + WithPullSubscriptionReadyStatus(corev1.ConditionTrue, "PullSubscriptionNoReady", ""), ), newSink(), }, @@ -353,11 +352,9 @@ func TestAllCases(t *testing.T) { defer logtesting.ClearAll() table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher, _ map[string]interface{}) controller.Reconciler { r := &Reconciler{ - PubSubBase: pubsub.NewPubSubBase(ctx, controllerAgentName, receiveAdapterName, cmw), - Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager), - pubsubLister: listers.GetCloudPubSubSourceLister(), - pullsubscriptionLister: listers.GetPubSubPullSubscriptionLister(), - serviceAccountLister: listers.GetServiceAccountLister(), + PubSubBase: intevents.NewPubSubBase(ctx, controllerAgentName, receiveAdapterName, cmw), + Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager), + pubsubLister: listers.GetCloudPubSubSourceLister(), } return cloudpubsubsource.NewReconciler(ctx, r.Logger, r.RunClientSet, listers.GetCloudPubSubSourceLister(), r.Recorder, r) })) diff --git a/pkg/reconciler/events/scheduler/controller.go b/pkg/reconciler/events/scheduler/controller.go index 784d436319..ae3ac18d44 100644 --- a/pkg/reconciler/events/scheduler/controller.go +++ b/pkg/reconciler/events/scheduler/controller.go @@ -23,15 +23,15 @@ import ( "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/identity/iam" - "github.com/google/knative-gcp/pkg/reconciler/pubsub" + "github.com/google/knative-gcp/pkg/reconciler/intevents" "k8s.io/client-go/tools/cache" serviceaccountinformers "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" cloudschedulersourceinformers "github.com/google/knative-gcp/pkg/client/injection/informers/events/v1alpha1/cloudschedulersource" - pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/pullsubscription" - topicinformers "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/topic" + pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/pullsubscription" + topicinformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/topic" cloudschedulersourcereconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1alpha1/cloudschedulersource" gscheduler "github.com/google/knative-gcp/pkg/gclient/scheduler" ) @@ -71,11 +71,10 @@ func newControllerWithIAMPolicyManager( serviceAccountInformer := serviceaccountinformers.Get(ctx) c := &Reconciler{ - PubSubBase: pubsub.NewPubSubBase(ctx, controllerAgentName, receiveAdapterName, cmw), - Identity: identity.NewIdentity(ctx, ipm), - schedulerLister: cloudschedulersourceInformer.Lister(), - createClientFn: gscheduler.NewClient, - serviceAccountLister: serviceAccountInformer.Lister(), + PubSubBase: intevents.NewPubSubBase(ctx, controllerAgentName, receiveAdapterName, cmw), + Identity: identity.NewIdentity(ctx, ipm), + schedulerLister: cloudschedulersourceInformer.Lister(), + createClientFn: gscheduler.NewClient, } impl := cloudschedulersourcereconciler.NewImpl(ctx, c) diff --git a/pkg/reconciler/events/scheduler/controller_test.go b/pkg/reconciler/events/scheduler/controller_test.go index 8aacdf8722..67c0d5473b 100644 --- a/pkg/reconciler/events/scheduler/controller_test.go +++ b/pkg/reconciler/events/scheduler/controller_test.go @@ -25,11 +25,11 @@ import ( . "knative.dev/pkg/reconciler/testing" // Fake injection informers - _ "github.com/google/knative-gcp/pkg/client/clientset/versioned/typed/pubsub/v1alpha1/fake" + _ "github.com/google/knative-gcp/pkg/client/clientset/versioned/typed/intevents/v1alpha1/fake" _ "github.com/google/knative-gcp/pkg/client/injection/client/fake" _ "github.com/google/knative-gcp/pkg/client/injection/informers/events/v1alpha1/cloudschedulersource/fake" - _ "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/pullsubscription/fake" - _ "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/topic/fake" + _ "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/pullsubscription/fake" + _ "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/topic/fake" _ "github.com/google/knative-gcp/pkg/reconciler/testing" _ "knative.dev/pkg/client/injection/kube/informers/batch/v1/job/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake" diff --git a/pkg/reconciler/events/scheduler/scheduler.go b/pkg/reconciler/events/scheduler/scheduler.go index f43e71ef94..3199f03168 100644 --- a/pkg/reconciler/events/scheduler/scheduler.go +++ b/pkg/reconciler/events/scheduler/scheduler.go @@ -21,8 +21,6 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - corev1listers "k8s.io/client-go/listers/core/v1" - "knative.dev/pkg/logging" "knative.dev/pkg/reconciler" @@ -38,7 +36,7 @@ import ( "github.com/google/knative-gcp/pkg/pubsub/adapter/converters" "github.com/google/knative-gcp/pkg/reconciler/events/scheduler/resources" "github.com/google/knative-gcp/pkg/reconciler/identity" - "github.com/google/knative-gcp/pkg/reconciler/pubsub" + "github.com/google/knative-gcp/pkg/reconciler/intevents" "github.com/google/knative-gcp/pkg/utils" ) @@ -56,16 +54,13 @@ const ( // Reconciler is the controller implementation for Google Cloud Scheduler Jobs. type Reconciler struct { - *pubsub.PubSubBase + *intevents.PubSubBase // identity reconciler for reconciling workload identity. *identity.Identity // schedulerLister for reading schedulers. schedulerLister listers.CloudSchedulerSourceLister createClientFn gscheduler.CreateFn - - // serviceAccountLister for reading serviceAccounts. - serviceAccountLister corev1listers.ServiceAccountLister } // Check that our Reconciler implements Interface. diff --git a/pkg/reconciler/events/scheduler/scheduler_test.go b/pkg/reconciler/events/scheduler/scheduler_test.go index e184bc6b51..76f2f11e8d 100644 --- a/pkg/reconciler/events/scheduler/scheduler_test.go +++ b/pkg/reconciler/events/scheduler/scheduler_test.go @@ -41,12 +41,12 @@ import ( duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" schedulerv1alpha1 "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" - pubsubv1alpha1 "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1" + inteventsv1alpha1 "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1" "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1alpha1/cloudschedulersource" testingMetadataClient "github.com/google/knative-gcp/pkg/gclient/metadata/testing" gscheduler "github.com/google/knative-gcp/pkg/gclient/scheduler/testing" "github.com/google/knative-gcp/pkg/reconciler/identity" - "github.com/google/knative-gcp/pkg/reconciler/pubsub" + "github.com/google/knative-gcp/pkg/reconciler/intevents" . "github.com/google/knative-gcp/pkg/reconciler/testing" ) @@ -186,19 +186,19 @@ func TestAllCases(t *testing.T) { ), }}, WantCreates: []runtime.Object{ - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicSpec(pubsubv1alpha1.TopicSpec{ + NewTopic(schedulerName, testNS, + WithTopicSpec(inteventsv1alpha1.TopicSpec{ Topic: testTopicID, PropagationPolicy: "CreateDelete", }), - WithPubSubTopicLabels(map[string]string{ + WithTopicLabels(map[string]string{ "receive-adapter": receiveAdapterName, "events.cloud.google.com/source-name": schedulerName, }), - WithPubSubTopicAnnotations(map[string]string{ + WithTopicAnnotations(map[string]string{ duckv1alpha1.ClusterNameAnnotation: testingMetadataClient.FakeClusterName, }), - WithPubSubTopicOwnerReferences([]metav1.OwnerReference{ownerRef()}), + WithTopicOwnerReferences([]metav1.OwnerReference{ownerRef()}), ), }, WantPatches: []clientgotesting.PatchActionImpl{ @@ -217,8 +217,8 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceData(testData), WithCloudSchedulerSourceSchedule(onceAMinuteSchedule), ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicTopicID(testTopicID), + NewTopic(schedulerName, testNS, + WithTopicTopicID(testTopicID), ), newSink(), }, @@ -248,9 +248,9 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceData(testData), WithCloudSchedulerSourceSchedule(onceAMinuteSchedule), ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), + NewTopic(schedulerName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), ), newSink(), }, @@ -281,10 +281,10 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceData(testData), WithCloudSchedulerSourceSchedule(onceAMinuteSchedule), ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicReady(""), - WithPubSubTopicProjectID(testProject), - WithPubSubTopicAddress(testTopicURI), + NewTopic(schedulerName, testNS, + WithTopicReady(""), + WithTopicProjectID(testProject), + WithTopicAddress(testTopicURI), ), newSink(), }, @@ -315,10 +315,10 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceData(testData), WithCloudSchedulerSourceSchedule(onceAMinuteSchedule), ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicReady("garbaaaaage"), - WithPubSubTopicProjectID(testProject), - WithPubSubTopicAddress(testTopicURI), + NewTopic(schedulerName, testNS, + WithTopicReady("garbaaaaage"), + WithTopicProjectID(testProject), + WithTopicAddress(testTopicURI), ), newSink(), }, @@ -349,9 +349,9 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceData(testData), WithCloudSchedulerSourceSchedule(onceAMinuteSchedule), ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicFailed(), - WithPubSubTopicProjectID(testProject), + NewTopic(schedulerName, testNS, + WithTopicFailed(), + WithTopicProjectID(testProject), ), newSink(), }, @@ -382,9 +382,9 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceData(testData), WithCloudSchedulerSourceSchedule(onceAMinuteSchedule), ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicUnknown(), - WithPubSubTopicProjectID(testProject), + NewTopic(schedulerName, testNS, + WithTopicUnknown(), + WithTopicProjectID(testProject), ), newSink(), }, @@ -419,10 +419,10 @@ func TestAllCases(t *testing.T) { duckv1alpha1.ClusterNameAnnotation: testingMetadataClient.FakeClusterName, }), ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(schedulerName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), newSink(), }, @@ -442,22 +442,22 @@ func TestAllCases(t *testing.T) { ), }}, WantCreates: []runtime.Object{ - NewPubSubPullSubscriptionWithNoDefaults(schedulerName, testNS, - WithPubSubPullSubscriptionSpecWithNoDefaults(pubsubv1alpha1.PullSubscriptionSpec{ + NewPullSubscriptionWithNoDefaults(schedulerName, testNS, + WithPullSubscriptionSpecWithNoDefaults(inteventsv1alpha1.PullSubscriptionSpec{ Topic: testTopicID, PubSubSpec: duckv1alpha1.PubSubSpec{ Secret: &secret, }, }), - WithPubSubPullSubscriptionSink(sinkGVK, sinkName), - WithPubSubPullSubscriptionLabels(map[string]string{ + WithPullSubscriptionSink(sinkGVK, sinkName), + WithPullSubscriptionLabels(map[string]string{ "receive-adapter": receiveAdapterName, "events.cloud.google.com/source-name": schedulerName}), - WithPubSubPullSubscriptionAnnotations(map[string]string{ + WithPullSubscriptionAnnotations(map[string]string{ "metrics-resource-group": resourceGroup, duckv1alpha1.ClusterNameAnnotation: testingMetadataClient.FakeClusterName, }), - WithPubSubPullSubscriptionOwnerReferences([]metav1.OwnerReference{ownerRef()}), + WithPullSubscriptionOwnerReferences([]metav1.OwnerReference{ownerRef()}), ), }, WantPatches: []clientgotesting.PatchActionImpl{ @@ -476,12 +476,12 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceData(testData), WithCloudSchedulerSourceSchedule(onceAMinuteSchedule), ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(schedulerName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(schedulerName, testNS), + NewPullSubscriptionWithNoDefaults(schedulerName, testNS), newSink(), }, Key: testNS + "/" + schedulerName, @@ -512,12 +512,12 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceData(testData), WithCloudSchedulerSourceSchedule(onceAMinuteSchedule), ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(schedulerName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(schedulerName, testNS, WithPubSubPullSubscriptionFailed()), + NewPullSubscriptionWithNoDefaults(schedulerName, testNS, WithPullSubscriptionFailed()), newSink(), }, Key: testNS + "/" + schedulerName, @@ -548,12 +548,12 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceData(testData), WithCloudSchedulerSourceSchedule(onceAMinuteSchedule), ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(schedulerName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(schedulerName, testNS, WithPubSubPullSubscriptionUnknown()), + NewPullSubscriptionWithNoDefaults(schedulerName, testNS, WithPullSubscriptionUnknown()), newSink(), }, Key: testNS + "/" + schedulerName, @@ -585,13 +585,13 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceData(testData), WithCloudSchedulerSourceSchedule(onceAMinuteSchedule), ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(schedulerName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(schedulerName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(schedulerName, testNS, + WithPullSubscriptionReady(sinkURI), ), newSink(), }, @@ -631,13 +631,13 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceData(testData), WithCloudSchedulerSourceSchedule(onceAMinuteSchedule), ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(schedulerName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(schedulerName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(schedulerName, testNS, + WithPullSubscriptionReady(sinkURI), ), newSink(), }, @@ -677,13 +677,13 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceData(testData), WithCloudSchedulerSourceSchedule(onceAMinuteSchedule), ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(schedulerName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(schedulerName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(schedulerName, testNS, + WithPullSubscriptionReady(sinkURI), ), newSink(), }, @@ -723,13 +723,13 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceData(testData), WithCloudSchedulerSourceSchedule(onceAMinuteSchedule), ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(schedulerName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(schedulerName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(schedulerName, testNS, + WithPullSubscriptionReady(sinkURI), ), newSink(), }, @@ -770,13 +770,13 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceData(testData), WithCloudSchedulerSourceSchedule(onceAMinuteSchedule), ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(schedulerName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(schedulerName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(schedulerName, testNS, + WithPullSubscriptionReady(sinkURI), ), newSink(), }, @@ -816,13 +816,13 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceData(testData), WithCloudSchedulerSourceSchedule(onceAMinuteSchedule), ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(schedulerName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(schedulerName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(schedulerName, testNS, + WithPullSubscriptionReady(sinkURI), ), newSink(), }, @@ -863,13 +863,13 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceSinkURI(schedulerSinkURL), WithCloudSchedulerSourceDeletionTimestamp, ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(schedulerName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(schedulerName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(schedulerName, testNS, + WithPullSubscriptionReady(sinkURI), ), newSink(), }, @@ -899,13 +899,13 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceSinkURI(schedulerSinkURL), WithCloudSchedulerSourceDeletionTimestamp, ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(schedulerName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(schedulerName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(schedulerName, testNS, + WithPullSubscriptionReady(sinkURI), ), newSink(), }, @@ -935,13 +935,13 @@ func TestAllCases(t *testing.T) { WithCloudSchedulerSourceSinkURI(schedulerSinkURL), WithCloudSchedulerSourceDeletionTimestamp, ), - NewPubSubTopic(schedulerName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(schedulerName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(schedulerName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(schedulerName, testNS, + WithPullSubscriptionReady(sinkURI), ), newSink(), }, @@ -962,11 +962,11 @@ func TestAllCases(t *testing.T) { }}, WantDeletes: []clientgotesting.DeleteActionImpl{ {ActionImpl: clientgotesting.ActionImpl{ - Namespace: testNS, Verb: "delete", Resource: schema.GroupVersionResource{Group: "pubsub.cloud.google.com", Version: "v1alpha1", Resource: "topics"}}, + Namespace: testNS, Verb: "delete", Resource: schema.GroupVersionResource{Group: "internal.events.cloud.google.com", Version: "v1alpha1", Resource: "topics"}}, Name: schedulerName, }, {ActionImpl: clientgotesting.ActionImpl{ - Namespace: testNS, Verb: "delete", Resource: schema.GroupVersionResource{Group: "pubsub.cloud.google.com", Version: "v1alpha1", Resource: "pullsubscriptions"}}, + Namespace: testNS, Verb: "delete", Resource: schema.GroupVersionResource{Group: "internal.events.cloud.google.com", Version: "v1alpha1", Resource: "pullsubscriptions"}}, Name: schedulerName, }, }, @@ -1022,11 +1022,10 @@ func TestAllCases(t *testing.T) { defer logtesting.ClearAll() table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher, testData map[string]interface{}) controller.Reconciler { r := &Reconciler{ - PubSubBase: pubsub.NewPubSubBase(ctx, controllerAgentName, receiveAdapterName, cmw), - Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager), - schedulerLister: listers.GetCloudSchedulerSourceLister(), - createClientFn: gscheduler.TestClientCreator(testData["scheduler"]), - serviceAccountLister: listers.GetServiceAccountLister(), + PubSubBase: intevents.NewPubSubBase(ctx, controllerAgentName, receiveAdapterName, cmw), + Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager), + schedulerLister: listers.GetCloudSchedulerSourceLister(), + createClientFn: gscheduler.TestClientCreator(testData["scheduler"]), } return cloudschedulersource.NewReconciler(ctx, r.Logger, r.RunClientSet, listers.GetCloudSchedulerSourceLister(), r.Recorder, r) })) diff --git a/pkg/reconciler/events/storage/controller.go b/pkg/reconciler/events/storage/controller.go index 1cc75ece8b..e56f1db808 100644 --- a/pkg/reconciler/events/storage/controller.go +++ b/pkg/reconciler/events/storage/controller.go @@ -26,14 +26,14 @@ import ( "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" cloudstoragesourceinformers "github.com/google/knative-gcp/pkg/client/injection/informers/events/v1alpha1/cloudstoragesource" - pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/pullsubscription" - topicinformers "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/topic" + pullsubscriptioninformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/pullsubscription" + topicinformers "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/topic" cloudstoragesourcereconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1alpha1/cloudstoragesource" gstorage "github.com/google/knative-gcp/pkg/gclient/storage" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/identity/iam" - "github.com/google/knative-gcp/pkg/reconciler/pubsub" + "github.com/google/knative-gcp/pkg/reconciler/intevents" ) const ( @@ -71,11 +71,10 @@ func newControllerWithIAMPolicyManager( serviceAccountInformer := serviceaccountinformers.Get(ctx) r := &Reconciler{ - PubSubBase: pubsub.NewPubSubBase(ctx, controllerAgentName, receiveAdapterName, cmw), - Identity: identity.NewIdentity(ctx, ipm), - storageLister: cloudstoragesourceInformer.Lister(), - createClientFn: gstorage.NewClient, - serviceAccountLister: serviceAccountInformer.Lister(), + PubSubBase: intevents.NewPubSubBase(ctx, controllerAgentName, receiveAdapterName, cmw), + Identity: identity.NewIdentity(ctx, ipm), + storageLister: cloudstoragesourceInformer.Lister(), + createClientFn: gstorage.NewClient, } impl := cloudstoragesourcereconciler.NewImpl(ctx, r) diff --git a/pkg/reconciler/events/storage/controller_test.go b/pkg/reconciler/events/storage/controller_test.go index 826d6f39b4..d2825cc191 100644 --- a/pkg/reconciler/events/storage/controller_test.go +++ b/pkg/reconciler/events/storage/controller_test.go @@ -25,11 +25,11 @@ import ( . "knative.dev/pkg/reconciler/testing" // Fake injection informers - _ "github.com/google/knative-gcp/pkg/client/clientset/versioned/typed/pubsub/v1alpha1/fake" + _ "github.com/google/knative-gcp/pkg/client/clientset/versioned/typed/intevents/v1alpha1/fake" _ "github.com/google/knative-gcp/pkg/client/injection/client/fake" _ "github.com/google/knative-gcp/pkg/client/injection/informers/events/v1alpha1/cloudstoragesource/fake" - _ "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/pullsubscription/fake" - _ "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/topic/fake" + _ "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/pullsubscription/fake" + _ "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/topic/fake" _ "github.com/google/knative-gcp/pkg/reconciler/testing" _ "knative.dev/pkg/client/injection/kube/informers/batch/v1/job/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake" diff --git a/pkg/reconciler/events/storage/storage.go b/pkg/reconciler/events/storage/storage.go index f59d760e4a..338385be6d 100644 --- a/pkg/reconciler/events/storage/storage.go +++ b/pkg/reconciler/events/storage/storage.go @@ -24,7 +24,6 @@ import ( "google.golang.org/grpc/codes" gstatus "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" - corev1listers "k8s.io/client-go/listers/core/v1" "knative.dev/pkg/logging" "knative.dev/pkg/reconciler" @@ -38,7 +37,7 @@ import ( "github.com/google/knative-gcp/pkg/pubsub/adapter/converters" "github.com/google/knative-gcp/pkg/reconciler/events/storage/resources" "github.com/google/knative-gcp/pkg/reconciler/identity" - "github.com/google/knative-gcp/pkg/reconciler/pubsub" + "github.com/google/knative-gcp/pkg/reconciler/intevents" "github.com/google/knative-gcp/pkg/utils" ) @@ -67,7 +66,7 @@ var ( // Reconciler is the controller implementation for Google Cloud Storage (GCS) event // notifications. type Reconciler struct { - *pubsub.PubSubBase + *intevents.PubSubBase // identity reconciler for reconciling workload identity. *identity.Identity // storageLister for reading storages. @@ -76,9 +75,6 @@ type Reconciler struct { // createClientFn is the function used to create the Storage client that interacts with GCS. // This is needed so that we can inject a mock client for UTs purposes. createClientFn gstorage.CreateFn - - // serviceAccountLister for reading serviceAccounts. - serviceAccountLister corev1listers.ServiceAccountLister } // Check that our Reconciler implements Interface. diff --git a/pkg/reconciler/events/storage/storage_test.go b/pkg/reconciler/events/storage/storage_test.go index 0cc63b4aeb..1d278dbfec 100644 --- a/pkg/reconciler/events/storage/storage_test.go +++ b/pkg/reconciler/events/storage/storage_test.go @@ -42,12 +42,12 @@ import ( duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" storagev1alpha1 "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" - pubsubv1alpha1 "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1" + inteventsv1alpha1 "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1" "github.com/google/knative-gcp/pkg/client/injection/reconciler/events/v1alpha1/cloudstoragesource" testingMetadataClient "github.com/google/knative-gcp/pkg/gclient/metadata/testing" gstorage "github.com/google/knative-gcp/pkg/gclient/storage/testing" "github.com/google/knative-gcp/pkg/reconciler/identity" - "github.com/google/knative-gcp/pkg/reconciler/pubsub" + "github.com/google/knative-gcp/pkg/reconciler/intevents" . "github.com/google/knative-gcp/pkg/reconciler/testing" ) @@ -184,19 +184,19 @@ func TestAllCases(t *testing.T) { ), }}, WantCreates: []runtime.Object{ - NewPubSubTopic(storageName, testNS, - WithPubSubTopicSpec(pubsubv1alpha1.TopicSpec{ + NewTopic(storageName, testNS, + WithTopicSpec(inteventsv1alpha1.TopicSpec{ Topic: testTopicID, PropagationPolicy: "CreateDelete", }), - WithPubSubTopicLabels(map[string]string{ + WithTopicLabels(map[string]string{ "receive-adapter": receiveAdapterName, "events.cloud.google.com/source-name": storageName, }), - WithPubSubTopicAnnotations(map[string]string{ + WithTopicAnnotations(map[string]string{ duckv1alpha1.ClusterNameAnnotation: testingMetadataClient.FakeClusterName, }), - WithPubSubTopicOwnerReferences([]metav1.OwnerReference{ownerRef()}), + WithTopicOwnerReferences([]metav1.OwnerReference{ownerRef()}), ), }, WantPatches: []clientgotesting.PatchActionImpl{ @@ -214,8 +214,8 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), ), - NewPubSubTopic(storageName, testNS, - WithPubSubTopicTopicID(testTopicID), + NewTopic(storageName, testNS, + WithTopicTopicID(testTopicID), ), newSink(), }, @@ -244,9 +244,9 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), ), - NewPubSubTopic(storageName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), + NewTopic(storageName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), ), newSink(), }, @@ -276,10 +276,10 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), ), - NewPubSubTopic(storageName, testNS, - WithPubSubTopicReady(""), - WithPubSubTopicProjectID(testProject), - WithPubSubTopicAddress(testTopicURI), + NewTopic(storageName, testNS, + WithTopicReady(""), + WithTopicProjectID(testProject), + WithTopicAddress(testTopicURI), ), newSink(), }, @@ -309,10 +309,10 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), ), - NewPubSubTopic(storageName, testNS, - WithPubSubTopicReady("garbaaaaage"), - WithPubSubTopicProjectID(testProject), - WithPubSubTopicAddress(testTopicURI), + NewTopic(storageName, testNS, + WithTopicReady("garbaaaaage"), + WithTopicProjectID(testProject), + WithTopicAddress(testTopicURI), ), newSink(), }, @@ -342,9 +342,9 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), ), - NewPubSubTopic(storageName, testNS, - WithPubSubTopicFailed(), - WithPubSubTopicProjectID(testProject), + NewTopic(storageName, testNS, + WithTopicFailed(), + WithTopicProjectID(testProject), ), newSink(), }, @@ -374,9 +374,9 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), ), - NewPubSubTopic(storageName, testNS, - WithPubSubTopicUnknown(), - WithPubSubTopicProjectID(testProject), + NewTopic(storageName, testNS, + WithTopicUnknown(), + WithTopicProjectID(testProject), ), newSink(), }, @@ -409,10 +409,10 @@ func TestAllCases(t *testing.T) { duckv1alpha1.ClusterNameAnnotation: testingMetadataClient.FakeClusterName, }), ), - NewPubSubTopic(storageName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(storageName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), newSink(), }, @@ -433,23 +433,23 @@ func TestAllCases(t *testing.T) { ), }}, WantCreates: []runtime.Object{ - NewPubSubPullSubscriptionWithNoDefaults(storageName, testNS, - WithPubSubPullSubscriptionSpecWithNoDefaults(pubsubv1alpha1.PullSubscriptionSpec{ + NewPullSubscriptionWithNoDefaults(storageName, testNS, + WithPullSubscriptionSpecWithNoDefaults(inteventsv1alpha1.PullSubscriptionSpec{ Topic: testTopicID, PubSubSpec: duckv1alpha1.PubSubSpec{ Secret: &secret, }, }), - WithPubSubPullSubscriptionSink(sinkGVK, sinkName), - WithPubSubPullSubscriptionLabels(map[string]string{ + WithPullSubscriptionSink(sinkGVK, sinkName), + WithPullSubscriptionLabels(map[string]string{ "receive-adapter": receiveAdapterName, "events.cloud.google.com/source-name": storageName, }), - WithPubSubPullSubscriptionAnnotations(map[string]string{ + WithPullSubscriptionAnnotations(map[string]string{ "metrics-resource-group": resourceGroup, duckv1alpha1.ClusterNameAnnotation: testingMetadataClient.FakeClusterName, }), - WithPubSubPullSubscriptionOwnerReferences([]metav1.OwnerReference{ownerRef()}), + WithPullSubscriptionOwnerReferences([]metav1.OwnerReference{ownerRef()}), ), }, WantPatches: []clientgotesting.PatchActionImpl{ @@ -468,12 +468,12 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), ), - NewPubSubTopic(storageName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(storageName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(storageName, testNS), + NewPullSubscriptionWithNoDefaults(storageName, testNS), newSink(), }, Key: testNS + "/" + storageName, @@ -504,12 +504,12 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), ), - NewPubSubTopic(storageName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(storageName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(storageName, testNS, WithPubSubPullSubscriptionFailed()), + NewPullSubscriptionWithNoDefaults(storageName, testNS, WithPullSubscriptionFailed()), }, Key: testNS + "/" + storageName, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ @@ -539,12 +539,12 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceBucket(bucket), WithCloudStorageSourceSink(sinkGVK, sinkName), ), - NewPubSubTopic(storageName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(storageName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(storageName, testNS, WithPubSubPullSubscriptionUnknown()), + NewPullSubscriptionWithNoDefaults(storageName, testNS, WithPullSubscriptionUnknown()), }, Key: testNS + "/" + storageName, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ @@ -577,13 +577,13 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceSink(sinkGVK, sinkName), WithCloudStorageSourceEventTypes([]string{storagev1alpha1.CloudStorageSourceFinalize}), ), - NewPubSubTopic(storageName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(storageName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(storageName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(storageName, testNS, + WithPullSubscriptionReady(sinkURI), ), newSink(), }, @@ -627,13 +627,13 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceSink(sinkGVK, sinkName), WithCloudStorageSourceEventTypes([]string{storagev1alpha1.CloudStorageSourceFinalize}), ), - NewPubSubTopic(storageName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(storageName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(storageName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(storageName, testNS, + WithPullSubscriptionReady(sinkURI), ), newSink(), }, @@ -679,13 +679,13 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceSink(sinkGVK, sinkName), WithCloudStorageSourceEventTypes([]string{storagev1alpha1.CloudStorageSourceFinalize}), ), - NewPubSubTopic(storageName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(storageName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(storageName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(storageName, testNS, + WithPullSubscriptionReady(sinkURI), ), newSink(), }, @@ -731,13 +731,13 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceSink(sinkGVK, sinkName), WithCloudStorageSourceEventTypes([]string{storagev1alpha1.CloudStorageSourceFinalize}), ), - NewPubSubTopic(storageName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(storageName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(storageName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(storageName, testNS, + WithPullSubscriptionReady(sinkURI), ), newSink(), }, @@ -786,13 +786,13 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceTopicReady(testTopicID), WithDeletionTimestamp(), ), - NewPubSubTopic(storageName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(storageName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(storageName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(storageName, testNS, + WithPullSubscriptionReady(sinkURI), ), newSink(), }, @@ -827,13 +827,13 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceTopicReady(testTopicID), WithDeletionTimestamp(), ), - NewPubSubTopic(storageName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(storageName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(storageName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(storageName, testNS, + WithPullSubscriptionReady(sinkURI), ), newSink(), }, @@ -907,13 +907,13 @@ func TestAllCases(t *testing.T) { WithCloudStorageSourceTopicReady(testTopicID), WithDeletionTimestamp(), ), - NewPubSubTopic(storageName, testNS, - WithPubSubTopicReady(testTopicID), - WithPubSubTopicAddress(testTopicURI), - WithPubSubTopicProjectID(testProject), + NewTopic(storageName, testNS, + WithTopicReady(testTopicID), + WithTopicAddress(testTopicURI), + WithTopicProjectID(testProject), ), - NewPubSubPullSubscriptionWithNoDefaults(storageName, testNS, - WithPubSubPullSubscriptionReady(sinkURI), + NewPullSubscriptionWithNoDefaults(storageName, testNS, + WithPullSubscriptionReady(sinkURI), ), newSink(), }, @@ -931,11 +931,11 @@ func TestAllCases(t *testing.T) { }, WantDeletes: []clientgotesting.DeleteActionImpl{ {ActionImpl: clientgotesting.ActionImpl{ - Namespace: testNS, Verb: "delete", Resource: schema.GroupVersionResource{Group: "pubsub.cloud.google.com", Version: "v1alpha1", Resource: "topics"}}, + Namespace: testNS, Verb: "delete", Resource: schema.GroupVersionResource{Group: "internal.events.cloud.google.com", Version: "v1alpha1", Resource: "topics"}}, Name: storageName, }, {ActionImpl: clientgotesting.ActionImpl{ - Namespace: testNS, Verb: "delete", Resource: schema.GroupVersionResource{Group: "pubsub.cloud.google.com", Version: "v1alpha1", Resource: "pullsubscriptions"}}, + Namespace: testNS, Verb: "delete", Resource: schema.GroupVersionResource{Group: "internal.events.cloud.google.com", Version: "v1alpha1", Resource: "pullsubscriptions"}}, Name: storageName, }, }, @@ -956,11 +956,10 @@ func TestAllCases(t *testing.T) { defer logtesting.ClearAll() table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher, testData map[string]interface{}) controller.Reconciler { r := &Reconciler{ - PubSubBase: pubsub.NewPubSubBase(ctx, controllerAgentName, receiveAdapterName, cmw), - Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager), - storageLister: listers.GetCloudStorageSourceLister(), - createClientFn: gstorage.TestClientCreator(testData["storage"]), - serviceAccountLister: listers.GetServiceAccountLister(), + PubSubBase: intevents.NewPubSubBase(ctx, controllerAgentName, receiveAdapterName, cmw), + Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager), + storageLister: listers.GetCloudStorageSourceLister(), + createClientFn: gstorage.TestClientCreator(testData["storage"]), } return cloudstoragesource.NewReconciler(ctx, r.Logger, r.RunClientSet, listers.GetCloudStorageSourceLister(), r.Recorder, r) })) diff --git a/pkg/reconciler/messaging/channel/channel.go b/pkg/reconciler/messaging/channel/channel.go index bd743c4440..3f42e14b9b 100644 --- a/pkg/reconciler/messaging/channel/channel.go +++ b/pkg/reconciler/messaging/channel/channel.go @@ -27,19 +27,17 @@ import ( apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - corev1listers "k8s.io/client-go/listers/core/v1" - eventingduck "knative.dev/eventing/pkg/apis/duck/v1alpha1" eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" "knative.dev/pkg/logging" pkgreconciler "knative.dev/pkg/reconciler" duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" + inteventsv1alpha1 "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1" "github.com/google/knative-gcp/pkg/apis/messaging/v1alpha1" - pubsubv1alpha1 "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1" channelreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/messaging/v1alpha1/channel" + inteventslisters "github.com/google/knative-gcp/pkg/client/listers/intevents/v1alpha1" listers "github.com/google/knative-gcp/pkg/client/listers/messaging/v1alpha1" - pubsublisters "github.com/google/knative-gcp/pkg/client/listers/pubsub/v1alpha1" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" "github.com/google/knative-gcp/pkg/reconciler/messaging/channel/resources" @@ -62,11 +60,8 @@ type Reconciler struct { // identity reconciler for reconciling workload identity. *identity.Identity // listers index properties about resources - channelLister listers.ChannelLister - topicLister pubsublisters.TopicLister - pullSubscriptionLister pubsublisters.PullSubscriptionLister - // serviceAccountLister for reading serviceAccounts. - serviceAccountLister corev1listers.ServiceAccountLister + channelLister listers.ChannelLister + topicLister inteventslisters.TopicLister } // Check that our Reconciler implements Interface. @@ -123,7 +118,7 @@ func (r *Reconciler) syncSubscribers(ctx context.Context, channel *v1alpha1.Chan subDeletes := []eventingduckv1beta1.SubscriberStatus(nil) // Make a map of name to PullSubscription for lookup. - pullsubs := make(map[string]pubsubv1alpha1.PullSubscription) + pullsubs := make(map[string]inteventsv1alpha1.PullSubscription) if subs, err := r.getPullSubscriptions(ctx, channel); err != nil { logging.FromContext(ctx).Desugar().Error("Failed to list PullSubscriptions", zap.Error(err)) } else { @@ -174,7 +169,7 @@ func (r *Reconciler) syncSubscribers(ctx context.Context, channel *v1alpha1.Chan Annotations: resources.GetPullSubscriptionAnnotations(channel.Name, clusterName), Subscriber: s, }) - ps, err := r.RunClientSet.PubsubV1alpha1().PullSubscriptions(channel.Namespace).Create(ps) + ps, err := r.RunClientSet.InternalV1alpha1().PullSubscriptions(channel.Namespace).Create(ps) if apierrs.IsAlreadyExists(err) { // If the pullsub already exists and is owned by the current channel, mark it for update. if _, found := pullsubs[genName]; found { @@ -213,7 +208,7 @@ func (r *Reconciler) syncSubscribers(ctx context.Context, channel *v1alpha1.Chan existingPs, found := pullsubs[genName] if !found { // PullSubscription does not exist, that's ok, create it now. - ps, err := r.RunClientSet.PubsubV1alpha1().PullSubscriptions(channel.Namespace).Create(ps) + ps, err := r.RunClientSet.InternalV1alpha1().PullSubscriptions(channel.Namespace).Create(ps) if apierrs.IsAlreadyExists(err) { // If the pullsub is not owned by the current channel, this is an error. r.Recorder.Eventf(channel, corev1.EventTypeWarning, "SubscriberNotOwned", "Subscriber %q is not owned by this channel", genName) @@ -227,7 +222,7 @@ func (r *Reconciler) syncSubscribers(ctx context.Context, channel *v1alpha1.Chan // Don't modify the informers copy. desired := existingPs.DeepCopy() desired.Spec = ps.Spec - ps, err := r.RunClientSet.PubsubV1alpha1().PullSubscriptions(channel.Namespace).Update(desired) + ps, err := r.RunClientSet.InternalV1alpha1().PullSubscriptions(channel.Namespace).Update(desired) if err != nil { r.Recorder.Eventf(channel, corev1.EventTypeWarning, "SubscriberUpdateFailed", "Updating Subscriber %q failed", genName) return err @@ -245,7 +240,7 @@ func (r *Reconciler) syncSubscribers(ctx context.Context, channel *v1alpha1.Chan for _, s := range subDeletes { genName := resources.GenerateSubscriptionName(s.UID) // TODO: we need to handle the case of a already deleted pull subscription. Perhaps move to ensure deleted method. - if err := r.RunClientSet.PubsubV1alpha1().PullSubscriptions(channel.Namespace).Delete(genName, &metav1.DeleteOptions{}); err != nil { + if err := r.RunClientSet.InternalV1alpha1().PullSubscriptions(channel.Namespace).Delete(genName, &metav1.DeleteOptions{}); err != nil { logging.FromContext(ctx).Desugar().Error("unable to delete PullSubscription for Channel", zap.String("ps", genName), zap.String("channel", channel.Name), zap.Error(err)) r.Recorder.Eventf(channel, corev1.EventTypeWarning, "SubscriberDeleteFailed", "Deleting Subscriber %q failed", genName) return err @@ -276,7 +271,7 @@ func (r *Reconciler) syncSubscribersStatus(ctx context.Context, channel *v1alpha } // Make a map of subscriber name to PullSubscription for lookup. - pullsubs := make(map[string]pubsubv1alpha1.PullSubscription) + pullsubs := make(map[string]inteventsv1alpha1.PullSubscription) if subs, err := r.getPullSubscriptions(ctx, channel); err != nil { logging.FromContext(ctx).Desugar().Error("Failed to list PullSubscriptions", zap.Error(err)) } else { @@ -298,7 +293,7 @@ func (r *Reconciler) syncSubscribersStatus(ctx context.Context, channel *v1alpha return nil } -func (r *Reconciler) reconcileTopic(ctx context.Context, channel *v1alpha1.Channel) (*pubsubv1alpha1.Topic, error) { +func (r *Reconciler) reconcileTopic(ctx context.Context, channel *v1alpha1.Channel) (*inteventsv1alpha1.Topic, error) { topic, err := r.getTopic(ctx, channel) if err != nil && !apierrors.IsNotFound(err) { logging.FromContext(ctx).Desugar().Error("Unable to get a Topic", zap.Error(err)) @@ -324,7 +319,7 @@ func (r *Reconciler) reconcileTopic(ctx context.Context, channel *v1alpha1.Chann Annotations: resources.GetTopicAnnotations(clusterName), }) - topic, err = r.RunClientSet.PubsubV1alpha1().Topics(channel.Namespace).Create(t) + topic, err = r.RunClientSet.InternalV1alpha1().Topics(channel.Namespace).Create(t) if err != nil { logging.FromContext(ctx).Desugar().Error("Failed to create Topic", zap.Error(err)) r.Recorder.Eventf(channel, corev1.EventTypeWarning, "TopicCreateFailed", "Failed to created Topic %q: %s", topic.Name, err.Error()) @@ -334,7 +329,7 @@ func (r *Reconciler) reconcileTopic(ctx context.Context, channel *v1alpha1.Chann return topic, err } -func (r *Reconciler) getTopic(ctx context.Context, channel *v1alpha1.Channel) (*pubsubv1alpha1.Topic, error) { +func (r *Reconciler) getTopic(_ context.Context, channel *v1alpha1.Channel) (*inteventsv1alpha1.Topic, error) { name := resources.GeneratePublisherName(channel) topic, err := r.topicLister.Topics(channel.Namespace).Get(name) if err != nil { @@ -347,8 +342,8 @@ func (r *Reconciler) getTopic(ctx context.Context, channel *v1alpha1.Channel) (* return topic, nil } -func (r *Reconciler) getPullSubscriptions(ctx context.Context, channel *v1alpha1.Channel) ([]pubsubv1alpha1.PullSubscription, error) { - sl, err := r.RunClientSet.PubsubV1alpha1().PullSubscriptions(channel.Namespace).List(metav1.ListOptions{ +func (r *Reconciler) getPullSubscriptions(ctx context.Context, channel *v1alpha1.Channel) ([]inteventsv1alpha1.PullSubscription, error) { + sl, err := r.RunClientSet.InternalV1alpha1().PullSubscriptions(channel.Namespace).List(metav1.ListOptions{ // Use GetLabelSelector to select all PullSubscriptions related to this channel. LabelSelector: resources.GetLabelSelector(controllerAgentName, channel.Name, string(channel.UID)).String(), TypeMeta: metav1.TypeMeta{ @@ -361,7 +356,7 @@ func (r *Reconciler) getPullSubscriptions(ctx context.Context, channel *v1alpha1 logging.FromContext(ctx).Desugar().Error("Failed to list PullSubscriptions", zap.Error(err)) return nil, err } - subs := []pubsubv1alpha1.PullSubscription(nil) + subs := []inteventsv1alpha1.PullSubscription(nil) for _, subscription := range sl.Items { if metav1.IsControlledBy(&subscription, channel) { subs = append(subs, subscription) @@ -370,7 +365,7 @@ func (r *Reconciler) getPullSubscriptions(ctx context.Context, channel *v1alpha1 return subs, nil } -func (r *Reconciler) getPullSubscriptionStatus(ps *pubsubv1alpha1.PullSubscription) (corev1.ConditionStatus, string) { +func (r *Reconciler) getPullSubscriptionStatus(ps *inteventsv1alpha1.PullSubscription) (corev1.ConditionStatus, string) { ready := corev1.ConditionTrue message := "" if !ps.Status.IsReady() { diff --git a/pkg/reconciler/messaging/channel/channel_test.go b/pkg/reconciler/messaging/channel/channel_test.go index 4e30d38f7e..7f2ae0d2be 100644 --- a/pkg/reconciler/messaging/channel/channel_test.go +++ b/pkg/reconciler/messaging/channel/channel_test.go @@ -39,8 +39,8 @@ import ( . "knative.dev/pkg/reconciler/testing" duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" + inteventsv1alpha1 "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1" "github.com/google/knative-gcp/pkg/apis/messaging/v1alpha1" - pubsubv1alpha1 "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1" "github.com/google/knative-gcp/pkg/client/injection/reconciler/messaging/v1alpha1/channel" testingMetadataClient "github.com/google/knative-gcp/pkg/gclient/metadata/testing" "github.com/google/knative-gcp/pkg/reconciler" @@ -89,7 +89,7 @@ var ( func init() { // Add types to scheme - _ = pubsubv1alpha1.AddToScheme(scheme.Scheme) + _ = inteventsv1alpha1.AddToScheme(scheme.Scheme) } func patchFinalizers(namespace, name string, add bool) clientgotesting.PatchActionImpl { @@ -496,7 +496,7 @@ func TestAllCases(t *testing.T) { }}, WantDeletes: []clientgotesting.DeleteActionImpl{ {ActionImpl: clientgotesting.ActionImpl{ - Namespace: "testnamespace", Verb: "delete", Resource: schema.GroupVersionResource{Group: "pubsub.cloud.google.com", Version: "v1alpha1", Resource: "pullsubscriptions"}}, + Namespace: "testnamespace", Verb: "delete", Resource: schema.GroupVersionResource{Group: "internal.events.cloud.google.com", Version: "v1alpha1", Resource: "pullsubscriptions"}}, Name: "cre-sub-testsubscription-abc-123", }, }, @@ -547,19 +547,17 @@ func TestAllCases(t *testing.T) { defer logtesting.ClearAll() table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher, _ map[string]interface{}) controller.Reconciler { r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager), - channelLister: listers.GetChannelLister(), - topicLister: listers.GetPubSubTopicLister(), - pullSubscriptionLister: listers.GetPubSubPullSubscriptionLister(), - serviceAccountLister: listers.GetServiceAccountLister(), + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + Identity: identity.NewIdentity(ctx, NoopIAMPolicyManager), + channelLister: listers.GetChannelLister(), + topicLister: listers.GetTopicLister(), } return channel.NewReconciler(ctx, r.Logger, r.RunClientSet, listers.GetChannelLister(), r.Recorder, r) })) } -func newTopic() *pubsubv1alpha1.Topic { +func newTopic() *inteventsv1alpha1.Topic { channel := NewChannel(channelName, testNS, WithChannelUID(channelUID), WithChannelSpec(v1alpha1.ChannelSpec{ @@ -582,7 +580,7 @@ func newTopic() *pubsubv1alpha1.Topic { }) } -func newReadyTopic() *pubsubv1alpha1.Topic { +func newReadyTopic() *inteventsv1alpha1.Topic { topic := newTopic() url, _ := apis.ParseURL(topicURI) topic.Status.SetAddress(url) @@ -591,7 +589,7 @@ func newReadyTopic() *pubsubv1alpha1.Topic { return topic } -func newFalseTopic() *pubsubv1alpha1.Topic { +func newFalseTopic() *inteventsv1alpha1.Topic { topic := newTopic() url, _ := apis.ParseURL(topicURI) topic.Status.SetAddress(url) @@ -599,7 +597,7 @@ func newFalseTopic() *pubsubv1alpha1.Topic { return topic } -func newPullSubscription(subscriber eventingduck.SubscriberSpec) *pubsubv1alpha1.PullSubscription { +func newPullSubscription(subscriber eventingduck.SubscriberSpec) *inteventsv1alpha1.PullSubscription { channel := NewChannel(channelName, testNS, WithChannelUID(channelUID), WithChannelSpec(v1alpha1.ChannelSpec{ @@ -612,7 +610,7 @@ func newPullSubscription(subscriber eventingduck.SubscriberSpec) *pubsubv1alpha1 return newPullSubscriptionWithOwner(subscriber, channel) } -func newPullSubscriptionWithOwner(subscriber eventingduck.SubscriberSpec, channel *v1alpha1.Channel) *pubsubv1alpha1.PullSubscription { +func newPullSubscriptionWithOwner(subscriber eventingduck.SubscriberSpec, channel *v1alpha1.Channel) *inteventsv1alpha1.PullSubscription { return resources.MakePullSubscription(&resources.PullSubscriptionArgs{ Owner: channel, Name: resources.GenerateSubscriptionName(subscriber.UID), diff --git a/pkg/reconciler/messaging/channel/controller.go b/pkg/reconciler/messaging/channel/controller.go index 5e0c2ae1a5..2619baf79b 100644 --- a/pkg/reconciler/messaging/channel/controller.go +++ b/pkg/reconciler/messaging/channel/controller.go @@ -24,9 +24,9 @@ import ( "knative.dev/pkg/controller" "github.com/google/knative-gcp/pkg/apis/messaging/v1alpha1" + pullsubscriptioninformer "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/pullsubscription" + topicinformer "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/topic" channelinformer "github.com/google/knative-gcp/pkg/client/injection/informers/messaging/v1alpha1/channel" - pullsubscriptioninformer "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/pullsubscription" - topicinformer "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/topic" channelreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/messaging/v1alpha1/channel" "github.com/google/knative-gcp/pkg/reconciler" "github.com/google/knative-gcp/pkg/reconciler/identity" @@ -67,12 +67,10 @@ func newControllerWithIAMPolicyManager( serviceAccountInformer := serviceaccountinformers.Get(ctx) r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - Identity: identity.NewIdentity(ctx, ipm), - channelLister: channelInformer.Lister(), - topicLister: topicInformer.Lister(), - pullSubscriptionLister: pullSubscriptionInformer.Lister(), - serviceAccountLister: serviceAccountInformer.Lister(), + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + Identity: identity.NewIdentity(ctx, ipm), + channelLister: channelInformer.Lister(), + topicLister: topicInformer.Lister(), } impl := channelreconciler.NewImpl(ctx, r) diff --git a/pkg/reconciler/messaging/channel/controller_test.go b/pkg/reconciler/messaging/channel/controller_test.go index b8a45386f5..26f7acd1e1 100644 --- a/pkg/reconciler/messaging/channel/controller_test.go +++ b/pkg/reconciler/messaging/channel/controller_test.go @@ -26,9 +26,9 @@ import ( // Fake injection informers + _ "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/pullsubscription/fake" + _ "github.com/google/knative-gcp/pkg/client/injection/informers/intevents/v1alpha1/topic/fake" _ "github.com/google/knative-gcp/pkg/client/injection/informers/messaging/v1alpha1/channel/fake" - _ "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/pullsubscription/fake" - _ "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/topic/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake" ) diff --git a/pkg/reconciler/messaging/channel/resources/pullsubscription.go b/pkg/reconciler/messaging/channel/resources/pullsubscription.go index 1e4f5c0b46..e7adbee268 100644 --- a/pkg/reconciler/messaging/channel/resources/pullsubscription.go +++ b/pkg/reconciler/messaging/channel/resources/pullsubscription.go @@ -24,7 +24,7 @@ import ( "knative.dev/pkg/kmeta" gcpduckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" - "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1" + "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1" ) // PullSubscriptionArgs are the arguments needed to create a Channel Subscriber. diff --git a/pkg/reconciler/messaging/channel/resources/pullsubscription_test.go b/pkg/reconciler/messaging/channel/resources/pullsubscription_test.go index ed0cb49fb9..63156b3fb9 100644 --- a/pkg/reconciler/messaging/channel/resources/pullsubscription_test.go +++ b/pkg/reconciler/messaging/channel/resources/pullsubscription_test.go @@ -20,9 +20,9 @@ import ( "testing" "github.com/google/go-cmp/cmp" - duckpubsubv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" + duckinteventsv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" + inteventsv1alpha1 "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1" "github.com/google/knative-gcp/pkg/apis/messaging/v1alpha1" - pubsubv1alpha1 "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" @@ -77,7 +77,7 @@ func TestMakePullSubscription(t *testing.T) { }) yes := true - want := &pubsubv1alpha1.PullSubscription{ + want := &inteventsv1alpha1.PullSubscription{ ObjectMeta: metav1.ObjectMeta{ Namespace: "channel-namespace", Name: "cre-sub-subscriber-uid", @@ -94,8 +94,8 @@ func TestMakePullSubscription(t *testing.T) { BlockOwnerDeletion: &yes, }}, }, - Spec: pubsubv1alpha1.PullSubscriptionSpec{ - PubSubSpec: duckpubsubv1alpha1.PubSubSpec{ + Spec: inteventsv1alpha1.PullSubscriptionSpec{ + PubSubSpec: duckinteventsv1alpha1.PubSubSpec{ Secret: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ Name: "eventing-secret-name", @@ -163,7 +163,7 @@ func TestMakePullSubscription_JustSubscriber(t *testing.T) { }) yes := true - want := &pubsubv1alpha1.PullSubscription{ + want := &inteventsv1alpha1.PullSubscription{ ObjectMeta: metav1.ObjectMeta{ Namespace: "channel-namespace", Name: "cre-sub-subscriber-uid", @@ -180,8 +180,8 @@ func TestMakePullSubscription_JustSubscriber(t *testing.T) { BlockOwnerDeletion: &yes, }}, }, - Spec: pubsubv1alpha1.PullSubscriptionSpec{ - PubSubSpec: duckpubsubv1alpha1.PubSubSpec{ + Spec: inteventsv1alpha1.PullSubscriptionSpec{ + PubSubSpec: duckinteventsv1alpha1.PubSubSpec{ Secret: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ Name: "eventing-secret-name", @@ -246,7 +246,7 @@ func TestMakePullSubscription_JustReply(t *testing.T) { }) yes := true - want := &pubsubv1alpha1.PullSubscription{ + want := &inteventsv1alpha1.PullSubscription{ ObjectMeta: metav1.ObjectMeta{ Namespace: "channel-namespace", Name: "cre-sub-subscriber-uid", @@ -263,8 +263,8 @@ func TestMakePullSubscription_JustReply(t *testing.T) { BlockOwnerDeletion: &yes, }}, }, - Spec: pubsubv1alpha1.PullSubscriptionSpec{ - PubSubSpec: duckpubsubv1alpha1.PubSubSpec{ + Spec: inteventsv1alpha1.PullSubscriptionSpec{ + PubSubSpec: duckinteventsv1alpha1.PubSubSpec{ Secret: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ Name: "eventing-secret-name", diff --git a/pkg/reconciler/messaging/channel/resources/topic.go b/pkg/reconciler/messaging/channel/resources/topic.go index a9c5b2ec30..d0d326970d 100644 --- a/pkg/reconciler/messaging/channel/resources/topic.go +++ b/pkg/reconciler/messaging/channel/resources/topic.go @@ -23,7 +23,7 @@ import ( "knative.dev/pkg/kmeta" duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" - "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1" + "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1" ) // TopicArgs are the arguments needed to create a Channel Topic. diff --git a/pkg/reconciler/messaging/channel/resources/topic_test.go b/pkg/reconciler/messaging/channel/resources/topic_test.go index 30616ee760..209a33b189 100644 --- a/pkg/reconciler/messaging/channel/resources/topic_test.go +++ b/pkg/reconciler/messaging/channel/resources/topic_test.go @@ -23,8 +23,8 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + inteventsv1alpha1 "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1" "github.com/google/knative-gcp/pkg/apis/messaging/v1alpha1" - pubsubv1alpha1 "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1" ) func TestMakeTopic(t *testing.T) { @@ -61,7 +61,7 @@ func TestMakeTopic(t *testing.T) { }) yes := true - want := &pubsubv1alpha1.Topic{ + want := &inteventsv1alpha1.Topic{ ObjectMeta: metav1.ObjectMeta{ Namespace: "channel-namespace", Name: "cre-channel-name-chan", @@ -77,7 +77,7 @@ func TestMakeTopic(t *testing.T) { BlockOwnerDeletion: &yes, }}, }, - Spec: pubsubv1alpha1.TopicSpec{ + Spec: inteventsv1alpha1.TopicSpec{ Secret: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ Name: "eventing-secret-name", @@ -86,7 +86,7 @@ func TestMakeTopic(t *testing.T) { }, Project: "project-123", Topic: "topic-abc", - PropagationPolicy: pubsubv1alpha1.TopicPolicyCreateDelete, + PropagationPolicy: inteventsv1alpha1.TopicPolicyCreateDelete, }, } diff --git a/test/e2e/lib/creation.go b/test/e2e/lib/creation.go index 2bea4ab521..ab2432e947 100644 --- a/test/e2e/lib/creation.go +++ b/test/e2e/lib/creation.go @@ -24,8 +24,8 @@ import ( "knative.dev/eventing/test/lib/resources" eventsv1alpha1 "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" + inteventsv1alpha1 "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1" messagingv1alpha1 "github.com/google/knative-gcp/pkg/apis/messaging/v1alpha1" - pubsubv1alpha1 "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1" ) func (c *Client) CreateUnstructuredObjOrFail(spec *unstructured.Unstructured) { @@ -83,9 +83,9 @@ func (c *Client) CreateStorageOrFail(storage *eventsv1alpha1.CloudStorageSource) c.Tracker.AddObj(storage) } -func (c *Client) CreatePullSubscriptionOrFail(pullsubscription *pubsubv1alpha1.PullSubscription) { +func (c *Client) CreatePullSubscriptionOrFail(pullsubscription *inteventsv1alpha1.PullSubscription) { c.T.Helper() - pullsubscriptions := c.KnativeGCP.PubsubV1alpha1().PullSubscriptions(c.Namespace) + pullsubscriptions := c.KnativeGCP.InternalV1alpha1().PullSubscriptions(c.Namespace) _, err := pullsubscriptions.Create(pullsubscription) if err != nil { c.T.Fatalf("Failed to create pullsubscription %s/%s: %v", c.Namespace, pullsubscription.Name, err) diff --git a/test/e2e/lib/resources/constants.go b/test/e2e/lib/resources/constants.go index 2ce3fa5d45..28a5b097da 100644 --- a/test/e2e/lib/resources/constants.go +++ b/test/e2e/lib/resources/constants.go @@ -22,7 +22,7 @@ const ( MessagingAPIVersion = "messaging.cloud.google.com/v1alpha1" MessagingV1beta1APIVersion = "messaging.cloud.google.com/v1beta1" EventsAPIVersion = "events.cloud.google.com/v1alpha1" - PubSubAPIVersion = "pubsub.cloud.google.com/v1alpha1" + IntEventsAPIVersion = "internal.events.cloud.google.com/v1alpha1" ServingAPIVersion = "serving.knative.dev/v1" ) diff --git a/test/e2e/lib/typemetas.go b/test/e2e/lib/typemetas.go index a43c2aa81e..fc95542329 100644 --- a/test/e2e/lib/typemetas.go +++ b/test/e2e/lib/typemetas.go @@ -64,11 +64,11 @@ func eventsTypeMeta(kind string) *metav1.TypeMeta { } } -var PullSubscriptionTypeMeta = pubsubTypeMeta(resources.PullSubscriptionKind) +var PullSubscriptionTypeMeta = inteventsTypeMeta(resources.PullSubscriptionKind) -func pubsubTypeMeta(kind string) *metav1.TypeMeta { +func inteventsTypeMeta(kind string) *metav1.TypeMeta { return &metav1.TypeMeta{ Kind: kind, - APIVersion: resources.PubSubAPIVersion, + APIVersion: resources.IntEventsAPIVersion, } } diff --git a/test/e2e/test_pullsubscription.go b/test/e2e/test_pullsubscription.go index 7e609d6324..cd27e8425b 100644 --- a/test/e2e/test_pullsubscription.go +++ b/test/e2e/test_pullsubscription.go @@ -28,7 +28,7 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" duckv1alpha1 "github.com/google/knative-gcp/pkg/apis/duck/v1alpha1" - "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1" + "github.com/google/knative-gcp/pkg/apis/intevents/v1alpha1" kngcptesting "github.com/google/knative-gcp/pkg/reconciler/testing" "github.com/google/knative-gcp/test/e2e/lib" "github.com/google/knative-gcp/test/e2e/lib/resources" @@ -47,8 +47,8 @@ func SmokePullSubscriptionTestImpl(t *testing.T, authConfig lib.AuthConfig) { defer lib.TearDown(client) // Create PullSubscription. - pullsubscription := kngcptesting.NewPubSubPullSubscription(psName, client.Namespace, - kngcptesting.WithPubSubPullSubscriptionSpec(v1alpha1.PullSubscriptionSpec{ + pullsubscription := kngcptesting.NewPullSubscription(psName, client.Namespace, + kngcptesting.WithPullSubscriptionSpec(v1alpha1.PullSubscriptionSpec{ Topic: topic, PubSubSpec: duckv1alpha1.PubSubSpec{ IdentitySpec: duckv1alpha1.IdentitySpec{ @@ -56,7 +56,7 @@ func SmokePullSubscriptionTestImpl(t *testing.T, authConfig lib.AuthConfig) { }, }, }), - kngcptesting.WithPubSubPullSubscriptionSink(lib.ServiceGVK, svcName)) + kngcptesting.WithPullSubscriptionSink(lib.ServiceGVK, svcName)) client.CreatePullSubscriptionOrFail(pullsubscription) client.Core.WaitForResourceReadyOrFail(psName, lib.PullSubscriptionTypeMeta) @@ -82,15 +82,15 @@ func PullSubscriptionWithTargetTestImpl(t *testing.T, authConfig lib.AuthConfig) client.CreateJobOrFail(job, lib.WithServiceForJob(targetName)) // Create PullSubscription. - pullsubscription := kngcptesting.NewPubSubPullSubscription(psName, client.Namespace, - kngcptesting.WithPubSubPullSubscriptionSpec(v1alpha1.PullSubscriptionSpec{ + pullsubscription := kngcptesting.NewPullSubscription(psName, client.Namespace, + kngcptesting.WithPullSubscriptionSpec(v1alpha1.PullSubscriptionSpec{ Topic: topicName, PubSubSpec: duckv1alpha1.PubSubSpec{ IdentitySpec: duckv1alpha1.IdentitySpec{ authConfig.PubsubServiceAccount, }, }, - }), kngcptesting.WithPubSubPullSubscriptionSink(lib.ServiceGVK, targetName)) + }), kngcptesting.WithPullSubscriptionSink(lib.ServiceGVK, targetName)) client.CreatePullSubscriptionOrFail(pullsubscription) client.Core.WaitForResourceReadyOrFail(psName, lib.PullSubscriptionTypeMeta)