From 1be2577385561d2645fa9a7474a7ff95c630d7bf Mon Sep 17 00:00:00 2001 From: Matheus Pimenta Date: Tue, 6 Jun 2023 16:40:39 +0100 Subject: [PATCH] Add Provider for Google Pub/Sub Topic Signed-off-by: Matheus Pimenta Co-authored-by: Somtochi Onyekwere Co-authored-by: Max Jonas Werner Co-authored-by: Sunny --- api/v1beta2/provider_types.go | 3 +- ...ification.toolkit.fluxcd.io_providers.yaml | 1 + docs/spec/v1beta2/providers.md | 64 +++++- go.mod | 16 ++ go.sum | 34 ++++ internal/notifier/factory.go | 2 + internal/notifier/google_pubsub.go | 132 ++++++++++++ internal/notifier/google_pubsub_test.go | 191 ++++++++++++++++++ 8 files changed, 440 insertions(+), 3 deletions(-) create mode 100644 internal/notifier/google_pubsub.go create mode 100644 internal/notifier/google_pubsub_test.go diff --git a/api/v1beta2/provider_types.go b/api/v1beta2/provider_types.go index ba08b1e39..b8bee7515 100644 --- a/api/v1beta2/provider_types.go +++ b/api/v1beta2/provider_types.go @@ -39,6 +39,7 @@ const ( BitbucketProvider string = "bitbucket" AzureDevOpsProvider string = "azuredevops" GoogleChatProvider string = "googlechat" + GooglePubSubProvider string = "googlepubsub" WebexProvider string = "webex" SentryProvider string = "sentry" AzureEventHubProvider string = "azureeventhub" @@ -52,7 +53,7 @@ const ( // ProviderSpec defines the desired state of the Provider. type ProviderSpec struct { // Type specifies which Provider implementation to use. - // +kubebuilder:validation:Enum=slack;discord;msteams;rocket;generic;generic-hmac;github;gitlab;gitea;bitbucket;azuredevops;googlechat;webex;sentry;azureeventhub;telegram;lark;matrix;opsgenie;alertmanager;grafana;githubdispatch; + // +kubebuilder:validation:Enum=slack;discord;msteams;rocket;generic;generic-hmac;github;gitlab;gitea;bitbucket;azuredevops;googlechat;googlepubsub;webex;sentry;azureeventhub;telegram;lark;matrix;opsgenie;alertmanager;grafana;githubdispatch; // +required Type string `json:"type"` diff --git a/config/crd/bases/notification.toolkit.fluxcd.io_providers.yaml b/config/crd/bases/notification.toolkit.fluxcd.io_providers.yaml index 580aff1d4..31d6314ec 100644 --- a/config/crd/bases/notification.toolkit.fluxcd.io_providers.yaml +++ b/config/crd/bases/notification.toolkit.fluxcd.io_providers.yaml @@ -291,6 +291,7 @@ spec: - bitbucket - azuredevops - googlechat + - googlepubsub - webex - sentry - azureeventhub diff --git a/docs/spec/v1beta2/providers.md b/docs/spec/v1beta2/providers.md index 0b0f052ab..2b33f079c 100644 --- a/docs/spec/v1beta2/providers.md +++ b/docs/spec/v1beta2/providers.md @@ -112,6 +112,7 @@ The supported alerting providers are: | [Discord](#discord) | `discord` | | [GitHub dispatch](#github-dispatch) | `githubdispatch` | | [Google Chat](#google-chat) | `googlechat` | +| [Google Pub/Sub](#google-pubsub) | `googlepubsub` | | [Grafana](#grafana) | `grafana` | | [Lark](#lark) | `lark` | | [Matrix](#matrix) | `matrix` | @@ -671,6 +672,65 @@ stringData: address: https://chat.googleapis.com/v1/spaces/... ``` +##### Google Pub/Sub + +When `.spec.type` is set to `googlepubsub`, the controller will publish the payload of +an [Event](events.md#event-structure) on the Google Pub/Sub Topic ID provided in the +[Channel](#channel) field, which must exist in the GCP Project ID provided in the +[Address](#address) field. + +This Provider type can optionally use the [Secret reference](#secret-reference) to +authenticate on the Google Pub/Sub API by using [JSON credentials](https://cloud.google.com/iam/docs/service-account-creds#key-types). +The credentials must be specified on [the `token`](#token-example) field of the Secret. + +If no JSON credentials are specified, then the automatic authentication methods of +the Google libraries will take place, and therefore methods like Workload Identity +will be automatically attempted. + +The Google identity effectively used for publishing messages must have +[the required permissions](https://cloud.google.com/iam/docs/understanding-roles#pubsub.publisher) +on the Pub/Sub Topic. + +You can optionally add [attributes](https://cloud.google.com/pubsub/docs/samples/pubsub-publish-custom-attributes#pubsub_publish_custom_attributes-go) +to the Pub/Sub message using a [`headers` key in the referenced Secret](#http-headers-example). + +This Provider type does not support the configuration of a [proxy URL](#https-proxy) +or [TLS certificates](#tls-certificates). + +###### Google Pub/Sub with JSON Credentials and Custom Headers Example + +To configure a Provider for Google Pub/Sub authenticating with JSON credentials and +custom headers, create a Secret with [the `token`](#token-example) set to the +necessary JSON credentials, [the `headers`](#http-headers-example) field set to a +YAML string-to-string dictionary, and a `googlepubsub` Provider with the associated +[Secret reference](#secret-reference). + +```yaml +--- +apiVersion: notification.toolkit.fluxcd.io/v1beta2 +kind: Provider +metadata: + name: googlepubsub-provider + namespace: desired-namespace +spec: + type: googlepubsub + address: + channel: + secretRef: + name: googlepubsub-provider-creds +--- +apiVersion: v1 +kind: Secret +metadata: + name: googlepubsub-provider-creds + namespace: desired-namespace +stringData: + token: + headers: | + attr1-name: attr1-value + attr2-name: attr2-value +``` + ##### Opsgenie When `.spec.type` is set to `opsgenie`, the controller will send a payload for @@ -892,8 +952,8 @@ metadata: namespace: default stringData: headers: | - Authorization: my-api-token - X-Forwarded-Proto: https + Authorization: my-api-token + X-Forwarded-Proto: https ``` #### Proxy auth example diff --git a/go.mod b/go.mod index 488587f3d..8b43866a3 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.18 replace github.com/fluxcd/notification-controller/api => ./api require ( + cloud.google.com/go/pubsub v1.30.0 code.gitea.io/sdk/gitea v0.15.1 github.com/AdaLogics/go-fuzz-headers v0.0.0-20230106234847-43070de90fa1 github.com/Azure/azure-amqp-common-go/v4 v4.1.0 @@ -31,6 +32,7 @@ require ( github.com/whilp/git-urls v1.0.0 github.com/xanzy/go-gitlab v0.83.0 golang.org/x/oauth2 v0.8.0 + google.golang.org/api v0.125.0 k8s.io/api v0.27.2 k8s.io/apimachinery v0.27.2 k8s.io/client-go v0.27.2 @@ -44,6 +46,11 @@ require ( replace gopkg.in/yaml.v3 => gopkg.in/yaml.v3 v3.0.1 require ( + cloud.google.com/go v0.110.0 // indirect + cloud.google.com/go/compute v1.19.3 // indirect + cloud.google.com/go/compute/metadata v0.2.3 // indirect + cloud.google.com/go/iam v1.0.1 // indirect + cloud.google.com/go/kms v1.11.0 // indirect github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect github.com/Azure/go-amqp v1.0.0 // indirect github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect @@ -85,8 +92,11 @@ require ( github.com/google/go-cmp v0.5.9 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/gofuzz v1.2.0 // indirect + github.com/google/s2a-go v0.1.4 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/google/uuid v1.3.0 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect + github.com/googleapis/gax-go/v2 v2.10.0 // indirect github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-version v1.6.0 // indirect @@ -118,18 +128,24 @@ require ( github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/spf13/cobra v1.7.0 // indirect github.com/xlab/treeprint v1.2.0 // indirect + go.opencensus.io v0.24.0 // indirect go.starlark.net v0.0.0-20230302034142-4b1e35fe2254 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/crypto v0.9.0 // indirect golang.org/x/net v0.10.0 // indirect + golang.org/x/sync v0.2.0 // indirect golang.org/x/sys v0.8.0 // indirect golang.org/x/term v0.8.0 // indirect golang.org/x/text v0.9.0 // indirect golang.org/x/time v0.3.0 // indirect gomodules.xyz/jsonpatch/v2 v2.3.0 // indirect google.golang.org/appengine v1.6.7 // indirect + google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect + google.golang.org/grpc v1.55.0 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index f78d6add2..1ad0ecf1e 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,8 @@ cloud.google.com/go v0.100.2/go.mod h1:4Xra9TjzAeYHrl5+oeLlzbM2k3mjVhZh4UqTZ//w9 cloud.google.com/go v0.102.0/go.mod h1:oWcCzKlqJ5zgHQt9YsaeTY9KzIvjyy0ArmiBUgpQ+nc= cloud.google.com/go v0.102.1/go.mod h1:XZ77E9qnTEnrgEOvr4xzfdX5TRo7fB4T2F4O6+34hIU= cloud.google.com/go v0.104.0/go.mod h1:OO6xxXdJyvuJPcEPBLN9BJPD+jep5G1+2U5B5gkRYtA= +cloud.google.com/go v0.110.0 h1:Zc8gqp3+a9/Eyph2KDmcGaPtbKRIoqq4YTlL4NMD0Ys= +cloud.google.com/go v0.110.0/go.mod h1:SJnCLqQ0FCFGSZMUNUf84MV3Aia54kn7pi8st7tMzaY= cloud.google.com/go/aiplatform v1.22.0/go.mod h1:ig5Nct50bZlzV6NvKaTwmplLLddFx0YReh9WfTO5jKw= cloud.google.com/go/aiplatform v1.24.0/go.mod h1:67UUvRBKG6GTayHKV8DBv2RtR1t93YRu5B1P3x99mYY= cloud.google.com/go/analytics v0.11.0/go.mod h1:DjEWCu41bVbYcKyvlws9Er60YE4a//bK6mnhWvQeFNI= @@ -70,9 +72,13 @@ cloud.google.com/go/compute v1.7.0/go.mod h1:435lt8av5oL9P3fv1OEzSbSUe+ybHXGMPQH cloud.google.com/go/compute v1.10.0/go.mod h1:ER5CLbMxl90o2jtNbGSbtfOpQKR0t15FOtRsugnLrlU= cloud.google.com/go/compute v1.12.0/go.mod h1:e8yNOBcBONZU1vJKCvCoDw/4JQsA0dpM4x/6PIIOocU= cloud.google.com/go/compute v1.12.1/go.mod h1:e8yNOBcBONZU1vJKCvCoDw/4JQsA0dpM4x/6PIIOocU= +cloud.google.com/go/compute v1.19.3 h1:DcTwsFgGev/wV5+q8o2fzgcHOaac+DKGC91ZlvpsQds= +cloud.google.com/go/compute v1.19.3/go.mod h1:qxvISKp/gYnXkSAD1ppcSOveRAmzxicEv/JlizULFrI= cloud.google.com/go/compute/metadata v0.1.0/go.mod h1:Z1VN+bulIf6bt4P/C37K4DyZYZEXYonfTBHHFPO/4UU= cloud.google.com/go/compute/metadata v0.2.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxBkeanZ9wwa75XHJgOM= +cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= +cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/containeranalysis v0.5.1/go.mod h1:1D92jd8gRR/c0fGMlymRgxWD3Qw9C1ff6/T7mLgVL8I= cloud.google.com/go/containeranalysis v0.6.0/go.mod h1:HEJoiEIu+lEXM+k7+qLCci0h33lX3ZqoYFdmPcoO7s4= cloud.google.com/go/datacatalog v1.3.0/go.mod h1:g9svFY6tuR+j+hrTw3J2dNcmI0dzmSiyOzm8kpLq0a0= @@ -110,6 +116,10 @@ cloud.google.com/go/gkehub v0.9.0/go.mod h1:WYHN6WG8w9bXU0hqNxt8rm5uxnk8IH+lPY9J cloud.google.com/go/gkehub v0.10.0/go.mod h1:UIPwxI0DsrpsVoWpLB0stwKCP+WFVG9+y977wO+hBH0= cloud.google.com/go/grafeas v0.2.0/go.mod h1:KhxgtF2hb0P191HlY5besjYm6MqTSTj3LSI+M+ByZHc= cloud.google.com/go/iam v0.3.0/go.mod h1:XzJPvDayI+9zsASAFO68Hk07u3z+f+JrT2xXNdp4bnY= +cloud.google.com/go/iam v1.0.1 h1:lyeCAU6jpnVNrE9zGQkTl3WgNgK/X+uWwaw0kynZJMU= +cloud.google.com/go/iam v1.0.1/go.mod h1:yR3tmSL8BcZB4bxByRv2jkSIahVmCtfKZwLYGBalRE8= +cloud.google.com/go/kms v1.11.0 h1:0LPJPKamw3xsVpkel1bDtK0vVJec3EyqdQOLitiD030= +cloud.google.com/go/kms v1.11.0/go.mod h1:hwdiYC0xjnWsKQQCQQmIQnS9asjYVSK6jtXm+zFqXLM= cloud.google.com/go/language v1.4.0/go.mod h1:F9dRpNFQmJbkaop6g0JhSBXCNlO90e1KWx5iDdxbWic= cloud.google.com/go/language v1.6.0/go.mod h1:6dJ8t3B+lUYfStgls25GusK04NLh3eDLQnWM3mdEbhI= cloud.google.com/go/lifesciences v0.5.0/go.mod h1:3oIKy8ycWGPUyZDR/8RNnTOYevhaMLqh5vLUXs9zvT8= @@ -138,6 +148,8 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU= +cloud.google.com/go/pubsub v1.30.0 h1:vCge8m7aUKBJYOgrZp7EsNDf6QMd2CAlXZqWTn3yq6s= +cloud.google.com/go/pubsub v1.30.0/go.mod h1:qWi1OPS0B+b5L+Sg6Gmc9zD1Y+HaM0MdUr7LsupY1P4= cloud.google.com/go/recaptchaenterprise v1.3.1/go.mod h1:OdD+q+y4XGeAlxRaMn1Y7/GveP6zmq76byL6tjPE7d4= cloud.google.com/go/recaptchaenterprise/v2 v2.1.0/go.mod h1:w9yVqajwroDNTfGuhmOjPDN//rZGySaf6PtFVcSCa7o= cloud.google.com/go/recaptchaenterprise/v2 v2.2.0/go.mod h1:/Zu5jisWGeERrd5HnlS3EUGb/D335f9k51B/FVil0jk= @@ -457,6 +469,8 @@ github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/s2a-go v0.1.4 h1:1kZ/sQM3srePvKs3tXAvQzo66XfcReoqFpIpIccE7Oc= +github.com/google/s2a-go v0.1.4/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -466,6 +480,8 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/googleapis/enterprise-certificate-proxy v0.0.0-20220520183353-fd19c99a87aa/go.mod h1:17drOmN3MwGY7t0e+Ei9b45FFGA3fBs3x36SsCg1hq8= github.com/googleapis/enterprise-certificate-proxy v0.1.0/go.mod h1:17drOmN3MwGY7t0e+Ei9b45FFGA3fBs3x36SsCg1hq8= github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg= +github.com/googleapis/enterprise-certificate-proxy v0.2.3 h1:yk9/cqRKtT9wXZSsRH9aurXEpJX+U6FLtpYTdC3R06k= +github.com/googleapis/enterprise-certificate-proxy v0.2.3/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0= @@ -475,6 +491,8 @@ github.com/googleapis/gax-go/v2 v2.3.0/go.mod h1:b8LNqSzNabLiUpXKkY7HAR5jr6bIT99 github.com/googleapis/gax-go/v2 v2.4.0/go.mod h1:XOTVJ59hdnfJLIP/dh8n5CGryZR2LxK9wbMD5+iXC6c= github.com/googleapis/gax-go/v2 v2.5.1/go.mod h1:h6B0KMMFNtI2ddbGJn3T3ZbwkeT6yqEF02fYlzkUCyo= github.com/googleapis/gax-go/v2 v2.6.0/go.mod h1:1mjbznJAPHFpesgE5ucqfYEscaz5kMdcIDwU/6+DDoY= +github.com/googleapis/gax-go/v2 v2.10.0 h1:ebSgKfMxynOdxw8QQuFOKMgomqeLGPqNLQox2bo42zg= +github.com/googleapis/gax-go/v2 v2.10.0/go.mod h1:4UOEnMCrxsSqQ940WnTiD6qJ63le2ev3xfyagutxiPw= github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= @@ -767,6 +785,8 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= +go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.starlark.net v0.0.0-20230302034142-4b1e35fe2254 h1:Ss6D3hLXTM0KobyBYEAygXzFfGcjnmfEJOBgSbemCtg= go.starlark.net v0.0.0-20230302034142-4b1e35fe2254/go.mod h1:jxU+3+j+71eXOW14274+SmmuW82qJzl6iZSeqEtTGds= @@ -794,6 +814,7 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= @@ -944,6 +965,8 @@ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI= +golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1060,6 +1083,7 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= @@ -1192,6 +1216,8 @@ google.golang.org/api v0.97.0/go.mod h1:w7wJQLTM+wvQpNf5JyEcBoxK0RH7EDrh/L4qfsuJ google.golang.org/api v0.98.0/go.mod h1:w7wJQLTM+wvQpNf5JyEcBoxK0RH7EDrh/L4qfsuJ13s= google.golang.org/api v0.100.0/go.mod h1:ZE3Z2+ZOr87Rx7dqFsdRQkRBk36kDtp/h+QpHbB7a70= google.golang.org/api v0.102.0/go.mod h1:3VFl6/fzoA+qNuS1N1/VfXY4LjoXN/wzeIp7TweWwGo= +google.golang.org/api v0.125.0 h1:7xGvEY4fyWbhWMHf3R2/4w7L4fXyfpRGE9g6lp8+DCk= +google.golang.org/api v0.125.0/go.mod h1:mBwVAtz+87bEN6CbA1GtZPDOqY2R5ONPqJeIlvyo4Aw= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -1304,6 +1330,12 @@ google.golang.org/genproto v0.0.0-20221010155953-15ba04fc1c0e/go.mod h1:3526vdqw google.golang.org/genproto v0.0.0-20221014173430-6e2ab493f96b/go.mod h1:1vXfmgAz9N9Jx0QA82PqRVauvCz1SGSz739p0f183jM= google.golang.org/genproto v0.0.0-20221014213838-99cd37c6964a/go.mod h1:1vXfmgAz9N9Jx0QA82PqRVauvCz1SGSz739p0f183jM= google.golang.org/genproto v0.0.0-20221024183307-1bc688fe9f3e/go.mod h1:9qHF0xnpdSfF6knlcsnpzUu5y+rpwgbvsyGAZPBMg4s= +google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc h1:8DyZCyvI8mE1IdLy/60bS+52xfymkE72wv1asokgtao= +google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:xZnkP7mREFX5MORlOPEzLMr+90PPZQ2QWzrVTWfAq64= +google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc h1:kVKPf/IiYSBWEWtkIn6wZXwWGCnLKcC8oWfZvXjsGnM= +google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc h1:XSJ8Vk1SWuNr8S18z1NZSziL0CPIXLCCMDOEFtHBOFc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -1340,6 +1372,8 @@ google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACu google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/grpc v1.50.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/grpc v1.50.1/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= +google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag= +google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/internal/notifier/factory.go b/internal/notifier/factory.go index ec0fdd834..b12c6af6b 100644 --- a/internal/notifier/factory.go +++ b/internal/notifier/factory.go @@ -91,6 +91,8 @@ func (f Factory) Notifier(provider string) (Interface, error) { n, err = NewAzureDevOps(f.ProviderUID, f.URL, f.Token, f.CertPool) case apiv1.GoogleChatProvider: n, err = NewGoogleChat(f.URL, f.ProxyURL) + case apiv1.GooglePubSubProvider: + n, err = NewGooglePubSub(f.URL, f.Channel, f.Token, f.Headers) case apiv1.WebexProvider: n, err = NewWebex(f.URL, f.ProxyURL, f.CertPool, f.Channel, f.Token) case apiv1.SentryProvider: diff --git a/internal/notifier/google_pubsub.go b/internal/notifier/google_pubsub.go new file mode 100644 index 000000000..5250e11a9 --- /dev/null +++ b/internal/notifier/google_pubsub.go @@ -0,0 +1,132 @@ +/* +Copyright 2023 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package notifier + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "cloud.google.com/go/pubsub" + "google.golang.org/api/option" + kerrors "k8s.io/apimachinery/pkg/util/errors" + "sigs.k8s.io/controller-runtime/pkg/log" + + eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" +) + +type ( + // GooglePubSub holds a Google Pub/Sub client and target topic. + GooglePubSub struct { + topicID string + attrs map[string]string + topicName string + + client interface { + publish(ctx context.Context, topicID string, eventPayload []byte, attrs map[string]string) (serverID string, err error) + } + } + + googlePubSubClient struct { + projectID string + jsonCreds []byte + } +) + +// ensure *GooglePubSub implements Interface. +var _ Interface = &GooglePubSub{} + +// NewGooglePubSub creates a Google Pub/Sub client tied to a specific +// project and topic. +// +// The jsonCreds parameter is optional, and if len(jsonCreds) == 0 then the +// automatic authentication methods of the Google libraries will take place, +// and therefore methods like Workload Identity will be automatically attempted. +// +// The attrs paramter is optional, and if len(attrs) == 0 then no attributes will +// be added to the Pub/Sub message. +func NewGooglePubSub(projectID, topicID, jsonCreds string, attrs map[string]string) (*GooglePubSub, error) { + if projectID == "" { + return nil, errors.New("GCP project ID cannot be empty") + } + if topicID == "" { + return nil, errors.New("GCP Pub/Sub topic ID cannot be empty") + } + if len(attrs) == 0 { + attrs = nil + } + return &GooglePubSub{ + topicID: topicID, + attrs: attrs, + topicName: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID), + client: &googlePubSubClient{ + projectID: projectID, + jsonCreds: []byte(jsonCreds), + }, + }, nil +} + +// Post posts Flux events to a Google Pub/Sub topic. +func (g *GooglePubSub) Post(ctx context.Context, event eventv1.Event) error { + // Skip Git commit status update event. + if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) { + return nil + } + + eventPayload, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("error json-marshaling event: %w", err) + } + + serverID, err := g.client.publish(ctx, g.topicID, eventPayload, g.attrs) + if err != nil { + return fmt.Errorf("error publishing event to topic %s: %w", g.topicName, err) + } + + // debug log + log.FromContext(ctx).V(1).Info("Event published to GCP Pub/Sub topic", + "topic", g.topicName, + "server message id", serverID) + + return nil +} + +func (g *googlePubSubClient) publish(ctx context.Context, topicID string, eventPayload []byte, attrs map[string]string) (serverID string, err error) { + var opts []option.ClientOption + if len(g.jsonCreds) > 0 { + opts = append(opts, option.WithCredentialsJSON(g.jsonCreds)) + } + var client *pubsub.Client + client, err = pubsub.NewClient(ctx, g.projectID, opts...) + if err != nil { + return + } + defer func() { + if closeErr := client.Close(); closeErr != nil { + err = kerrors.NewAggregate([]error{err, closeErr}) + } + }() + serverID, err = client. + Topic(topicID). + Publish(ctx, &pubsub.Message{ + Data: eventPayload, + Attributes: attrs, + }). + Get(ctx) + return +} diff --git a/internal/notifier/google_pubsub_test.go b/internal/notifier/google_pubsub_test.go new file mode 100644 index 000000000..81aff21b2 --- /dev/null +++ b/internal/notifier/google_pubsub_test.go @@ -0,0 +1,191 @@ +/* +Copyright 2023 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package notifier + +import ( + "context" + "errors" + "fmt" + "testing" + + . "github.com/onsi/gomega" + + eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" +) + +func TestNewGooglePubSub(t *testing.T) { + tests := []struct { + name string + projectID string + topicID string + jsonCreds string + attrs map[string]string + expectedErr error + expectedTopicName string + expectedJSONCreds []byte + expectedAttrs map[string]string + }{ + { + name: "empty project ID is not allowed", + projectID: "", + expectedErr: errors.New("GCP project ID cannot be empty"), + }, + { + name: "empty topic ID is not allowed", + projectID: "project-id", + topicID: "", + expectedErr: errors.New("GCP Pub/Sub topic ID cannot be empty"), + }, + { + name: "topic name is stored properly", + projectID: "project-id", + topicID: "topic-id", + expectedTopicName: "projects/project-id/topics/topic-id", + }, + { + name: "json creds are stored properly", + projectID: "project-id", + topicID: "topic-id", + expectedTopicName: "projects/project-id/topics/topic-id", + jsonCreds: "json credentials", + expectedJSONCreds: []byte("json credentials"), + }, + { + name: "non-empty attributes are stored properly", + projectID: "project-id", + topicID: "topic-id", + expectedTopicName: "projects/project-id/topics/topic-id", + attrs: map[string]string{"foo": "bar"}, + expectedAttrs: map[string]string{"foo": "bar"}, + }, + { + name: "empty attributes are stored properly", + projectID: "project-id", + topicID: "topic-id", + expectedTopicName: "projects/project-id/topics/topic-id", + attrs: map[string]string{}, + expectedAttrs: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + provider, err := NewGooglePubSub(tt.projectID, tt.topicID, tt.jsonCreds, tt.attrs) + if tt.expectedErr != nil { + g.Expect(err).To(Equal(tt.expectedErr)) + g.Expect(provider).To(BeNil()) + } else { + g.Expect(err).To(BeNil()) + g.Expect(provider).NotTo(BeNil()) + + g.Expect(provider.topicID).To(Equal(tt.topicID)) + g.Expect(provider.attrs).To(Equal(tt.expectedAttrs)) + g.Expect(provider.topicName).To(Equal(tt.expectedTopicName)) + + g.Expect(provider.client).NotTo(BeNil()) + client := provider.client.(*googlePubSubClient) + g.Expect(client).NotTo(BeNil()) + + g.Expect(client.projectID).To(Equal(tt.projectID)) + g.Expect(client.jsonCreds).To(Equal(tt.expectedJSONCreds)) + } + }) + } +} + +type googlePubSubPostTestCase struct { + name string + topicID string + attrs map[string]string + topicName string + event eventv1.Event + expectedEventPayload string + publishErr error + expectedErr error + publishShouldExecute bool + publishExecuted bool + + g *WithT +} + +func (tt *googlePubSubPostTestCase) publish(ctx context.Context, topicID string, eventPayload []byte, attrs map[string]string) (serverID string, err error) { + tt.publishExecuted = true + tt.g.Expect(topicID).To(Equal(tt.topicID)) + tt.g.Expect(string(eventPayload)).To(Equal(tt.expectedEventPayload)) + tt.g.Expect(attrs).To(Equal(tt.attrs)) + // serverID is only used in a debug log for now, there's no way to assert it + return "", tt.publishErr +} + +func TestGooglePubSubPost(t *testing.T) { + tests := []*googlePubSubPostTestCase{ + { + name: "events are properly marshaled", + event: eventv1.Event{ + Metadata: map[string]string{"foo": "bar"}, + }, + expectedEventPayload: `{"involvedObject":{},"severity":"","timestamp":null,"message":"","reason":"","metadata":{"foo":"bar"},"reportingController":""}`, + publishShouldExecute: true, + }, + { + name: "commit status updates are dropped", + event: eventv1.Event{ + Metadata: map[string]string{"commit_status": "update"}, + }, + publishShouldExecute: false, + }, + { + name: "publish error is wrapped and relayed", + expectedEventPayload: `{"involvedObject":{},"severity":"","timestamp":null,"message":"","reason":"","reportingController":""}`, + topicName: "projects/projectID/topics/topicID", + publishErr: errors.New("publish error"), + expectedErr: fmt.Errorf("error publishing event to topic projects/projectID/topics/topicID: %w", errors.New("publish error")), + publishShouldExecute: true, + }, + { + name: "topic and attributes are relayed to the internal client", + topicID: "topicID", + attrs: map[string]string{"foo": "bar"}, + expectedEventPayload: `{"involvedObject":{},"severity":"","timestamp":null,"message":"","reason":"","reportingController":""}`, + publishShouldExecute: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + tt.g = g + + topic := &GooglePubSub{ + client: tt, + topicID: tt.topicID, + attrs: tt.attrs, + topicName: tt.topicName, + } + + err := topic.Post(context.Background(), tt.event) + if tt.expectedErr == nil { + g.Expect(err).To(BeNil()) + } else { + g.Expect(err).To(Equal(tt.expectedErr)) + } + g.Expect(tt.publishExecuted).To(Equal(tt.publishShouldExecute)) + }) + } +}