From 361a39304159996cfe6ebd75e8d25a3cba18b239 Mon Sep 17 00:00:00 2001 From: Nicolas Lopez Date: Wed, 22 Jan 2020 15:19:49 -0500 Subject: [PATCH 1/4] move convert to push to pubsub.go --- pkg/pubsub/adapter/adapter.go | 8 +- pkg/pubsub/adapter/converters/pubsub.go | 75 ++++++++++++ pkg/pubsub/adapter/converters/pubsub_test.go | 91 ++++++++++++++ pkg/pubsub/adapter/push.go | 102 ---------------- pkg/pubsub/adapter/push_test.go | 118 ------------------- 5 files changed, 168 insertions(+), 226 deletions(-) delete mode 100644 pkg/pubsub/adapter/push.go delete mode 100644 pkg/pubsub/adapter/push_test.go diff --git a/pkg/pubsub/adapter/adapter.go b/pkg/pubsub/adapter/adapter.go index fa8b50beac..364494d539 100644 --- a/pkg/pubsub/adapter/adapter.go +++ b/pkg/pubsub/adapter/adapter.go @@ -20,11 +20,12 @@ import ( "context" "fmt" + nethttp "net/http" + "go.opencensus.io/plugin/ochttp" "go.opencensus.io/plugin/ochttp/propagation/b3" "go.opencensus.io/trace" "go.uber.org/zap" - nethttp "net/http" cloudevents "github.com/cloudevents/sdk-go" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" @@ -213,11 +214,6 @@ func (a *Adapter) receive(ctx context.Context, event cloudevents.Event, resp *cl ctx = trace.NewContext(ctx, trace.FromContext(transformedCTX)) } - // If send mode is Push, convert to Pub/Sub Push payload style. - if a.SendMode == converters.Push { - event = ConvertToPush(ctx, event) - } - // Apply CloudEvent override extensions to the outbound event. for k, v := range a.extensions { event.SetExtension(k, v) diff --git a/pkg/pubsub/adapter/converters/pubsub.go b/pkg/pubsub/adapter/converters/pubsub.go index bcf0b80bdf..b4f1c90c65 100644 --- a/pkg/pubsub/adapter/converters/pubsub.go +++ b/pkg/pubsub/adapter/converters/pubsub.go @@ -18,11 +18,15 @@ package converters import ( "context" + "encoding/json" + "time" cloudevents "github.com/cloudevents/sdk-go" . "github.com/cloudevents/sdk-go/pkg/cloudevents" cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" pubsubcontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context" + "go.uber.org/zap" + "knative.dev/pkg/logging" "github.com/google/knative-gcp/pkg/apis/events/v1alpha1" ) @@ -48,6 +52,8 @@ func convertPubSub(ctx context.Context, msg *cepubsub.Message, sendMode ModeType event.Data = msg.Data event.DataEncoded = true // Attributes are extensions. + // Also grab all extensions as a string, which we will set as attributes payload if send mode is Push. + attrs := make(map[string]string, 0) if msg.Attributes != nil && len(msg.Attributes) > 0 { for k, v := range msg.Attributes { // CloudEvents v1.0 attributes MUST consist of lower-case letters ('a' to 'z') or digits ('0' to '9') as per @@ -56,7 +62,76 @@ func convertPubSub(ctx context.Context, msg *cepubsub.Message, sendMode ModeType if IsAlphaNumeric(k) { event.SetExtension(k, v) } + // All atributes (whether valid as extensions) are stored in attrs to be set in the payload if mode is Push. + attrs[k] = v } } + // If send mode is Push, convert to Pub/Sub Push payload style. + if sendMode == Push { + event = convertToPush(ctx, event, attrs) + } return &event, nil } + +// PushMessage represents the format Pub/Sub uses to push events. +type PushMessage struct { + // Subscription is the subscription ID that received this Message. + Subscription string `json:"subscription"` + // Message holds the Pub/Sub message contents. + Message *PubSubMessage `json:"message,omitempty"` +} + +// PubSubMessage matches the inner message format used by Push Subscriptions. +type PubSubMessage struct { + // ID identifies this message. This ID is assigned by the server and is + // populated for Messages obtained from a subscription. + // This field is read-only. + ID string `json:"id,omitempty"` + + // Data is the actual data in the message. + Data interface{} `json:"data,omitempty"` + + // Attributes represents the key-value pairs the current message + // is labelled with. + Attributes map[string]string `json:"attributes,omitempty"` + + // The time at which the message was published. This is populated by the + // server for Messages obtained from a subscription. + // This field is read-only. + PublishTime time.Time `json:"publish_time,omitempty"` +} + +// convertToPush convert an event to a Pub/Sub style Push payload. +func convertToPush(ctx context.Context, event cloudevents.Event, attrs map[string]string) cloudevents.Event { + logger := logging.FromContext(ctx).With(zap.Any("event.id", event.ID())) + + tx := pubsubcontext.TransportContextFrom(ctx) + + push := cloudevents.NewEvent(event.SpecVersion()) + push.Context = event.Context.Clone() + + msg := &PubSubMessage{ + ID: event.ID(), + Attributes: attrs, + PublishTime: event.Time(), + } + + var raw json.RawMessage + if err := event.DataAs(&raw); err != nil { + logger.Debugw("Failed to get data as raw json, using as is.", zap.Error(err)) + // Use data as a byte slice. + msg.Data = event.Data + } else { + // Use data as a raw message. + msg.Data = raw + } + + if err := push.SetData(&PushMessage{ + Subscription: tx.Subscription, + Message: msg, + }); err != nil { + logger.Warnw("Failed to set data.", zap.Error(err)) + } + + return push +} diff --git a/pkg/pubsub/adapter/converters/pubsub_test.go b/pkg/pubsub/adapter/converters/pubsub_test.go index 13a4fe9baf..4465fe25df 100644 --- a/pkg/pubsub/adapter/converters/pubsub_test.go +++ b/pkg/pubsub/adapter/converters/pubsub_test.go @@ -126,3 +126,94 @@ func pubSubCloudEvent(extensions map[string]string) *cloudevents.Event { } return &e } + +func TestConvertToPush_noattrs(t *testing.T) { + want := `Validation: valid +Context Attributes, + specversion: 0.2 + type: unit.testing + source: source + id: abc-123 + contenttype: application/json +Data, + { + "subscription": "sub", + "message": { + "id": "abc-123", + "data": "testing", + "publish_time": "0001-01-01T00:00:00Z" + } + } +` + + event := cloudevents.NewEvent() + event.SetSource("source") + event.SetType("unit.testing") + event.SetID("abc-123") + event.SetDataContentType("application/json") + _ = event.SetData("testing") + attrs := make(map[string]string, 0) + ctx := pubsubcontext.WithTransportContext(context.TODO(), pubsubcontext.NewTransportContext( + "proj", + "top", + "sub", + "test", + &pubsub.Message{}, + )) + + got := convertToPush(ctx, event, attrs) + + if diff := cmp.Diff(want, got.String()); diff != "" { + t.Logf("%s", got.String()) + t.Errorf("failed to get expected (-want, +got) = %v", diff) + } +} + +func TestConvertToPush_attrs(t *testing.T) { + want := `Validation: valid +Context Attributes, + specversion: 0.2 + type: unit.testing + source: source + id: abc-123 + contenttype: application/json +Extensions, + foo: bar +Data, + { + "subscription": "sub", + "message": { + "id": "abc-123", + "data": "testing", + "attributes": { + "foo": "bar" + }, + "publish_time": "0001-01-01T00:00:00Z" + } + } +` + + event := cloudevents.NewEvent() + event.SetSource("source") + event.SetType("unit.testing") + event.SetID("abc-123") + event.SetExtension("foo", "bar") + event.SetDataContentType("application/json") + _ = event.SetData("testing") + attrs := make(map[string]string, 0) + attrs["foo"] = "bar" + ctx := pubsubcontext.WithTransportContext(context.TODO(), pubsubcontext.NewTransportContext( + "proj", + "top", + "sub", + "test", + &pubsub.Message{}, + )) + + got := convertToPush(ctx, event, attrs) + + if diff := cmp.Diff(want, got.String()); diff != "" { + t.Logf("%s", got.String()) + t.Errorf("failed to get expected (-want, +got) = %v", diff) + } +} diff --git a/pkg/pubsub/adapter/push.go b/pkg/pubsub/adapter/push.go deleted file mode 100644 index feec708bcc..0000000000 --- a/pkg/pubsub/adapter/push.go +++ /dev/null @@ -1,102 +0,0 @@ -/* -Copyright 2019 Google LLC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package adapter - -import ( - "context" - "encoding/json" - "time" - - cloudevents "github.com/cloudevents/sdk-go" - pubsubcontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context" - "go.uber.org/zap" - "knative.dev/pkg/logging" -) - -// PushMessage represents the format Pub/Sub uses to push events. -type PushMessage struct { - // Subscription is the subscription ID that received this Message. - Subscription string `json:"subscription"` - // Message holds the Pub/Sub message contents. - Message *PubSubMessage `json:"message,omitempty"` -} - -// PubSubMessage matches the inner message format used by Push Subscriptions. -type PubSubMessage struct { - // ID identifies this message. This ID is assigned by the server and is - // populated for Messages obtained from a subscription. - // This field is read-only. - ID string `json:"id,omitempty"` - - // Data is the actual data in the message. - Data interface{} `json:"data,omitempty"` - - // Attributes represents the key-value pairs the current message - // is labelled with. - Attributes map[string]string `json:"attributes,omitempty"` - - // The time at which the message was published. This is populated by the - // server for Messages obtained from a subscription. - // This field is read-only. - PublishTime time.Time `json:"publish_time,omitempty"` -} - -// ConvertToPush convert an event to a Pub/Sub style Push payload. -func ConvertToPush(ctx context.Context, event cloudevents.Event) cloudevents.Event { - logger := logging.FromContext(ctx).With(zap.Any("event.id", event.ID())) - - tx := pubsubcontext.TransportContextFrom(ctx) - - push := cloudevents.NewEvent(event.SpecVersion()) - push.Context = event.Context.Clone() - - // Grab all extensions as a string, set them as attributes payload. - attrs := make(map[string]string, 0) - for k := range event.Extensions() { - var v string - if err := event.ExtensionAs(k, &v); err != nil { - logger.Warnw("Not using extension as attribute for push payload.", zap.String("key", k)) - } else { - attrs[k] = v - } - } - - msg := &PubSubMessage{ - ID: event.ID(), - Attributes: attrs, - PublishTime: event.Time(), - } - - var raw json.RawMessage - if err := event.DataAs(&raw); err != nil { - logger.Debugw("Failed to get data as raw json, using as is.", zap.Error(err)) - // Use data as a byte slice. - msg.Data = event.Data - } else { - // Use data as a raw message. - msg.Data = raw - } - - if err := push.SetData(&PushMessage{ - Subscription: tx.Subscription, - Message: msg, - }); err != nil { - logger.Warnw("Failed to set data.", zap.Error(err)) - } - - return push -} diff --git a/pkg/pubsub/adapter/push_test.go b/pkg/pubsub/adapter/push_test.go deleted file mode 100644 index 111058df26..0000000000 --- a/pkg/pubsub/adapter/push_test.go +++ /dev/null @@ -1,118 +0,0 @@ -/* -Copyright 2019 Google LLC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package adapter - -import ( - "context" - "testing" - - "cloud.google.com/go/pubsub" - "github.com/google/go-cmp/cmp" - - cloudevents "github.com/cloudevents/sdk-go" - pubsubcontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context" -) - -func TestConvertToPush_noattrs(t *testing.T) { - want := `Validation: valid -Context Attributes, - specversion: 0.2 - type: unit.testing - source: source - id: abc-123 - contenttype: application/json -Data, - { - "subscription": "sub", - "message": { - "id": "abc-123", - "data": "testing", - "publish_time": "0001-01-01T00:00:00Z" - } - } -` - - event := cloudevents.NewEvent() - event.SetSource("source") - event.SetType("unit.testing") - event.SetID("abc-123") - event.SetDataContentType("application/json") - _ = event.SetData("testing") - - ctx := pubsubcontext.WithTransportContext(context.TODO(), pubsubcontext.NewTransportContext( - "proj", - "top", - "sub", - "test", - &pubsub.Message{}, - )) - - got := ConvertToPush(ctx, event) - - if diff := cmp.Diff(want, got.String()); diff != "" { - t.Logf("%s", got.String()) - t.Errorf("failed to get expected (-want, +got) = %v", diff) - } -} - -func TestConvertToPush_attrs(t *testing.T) { - want := `Validation: valid -Context Attributes, - specversion: 0.2 - type: unit.testing - source: source - id: abc-123 - contenttype: application/json -Extensions, - foo: bar -Data, - { - "subscription": "sub", - "message": { - "id": "abc-123", - "data": "testing", - "attributes": { - "foo": "bar" - }, - "publish_time": "0001-01-01T00:00:00Z" - } - } -` - - event := cloudevents.NewEvent() - event.SetSource("source") - event.SetType("unit.testing") - event.SetID("abc-123") - event.SetExtension("foo", "bar") - event.SetDataContentType("application/json") - _ = event.SetData("testing") - - ctx := pubsubcontext.WithTransportContext(context.TODO(), pubsubcontext.NewTransportContext( - "proj", - "top", - "sub", - "test", - &pubsub.Message{}, - )) - - got := ConvertToPush(ctx, event) - - if diff := cmp.Diff(want, got.String()); diff != "" { - t.Logf("%s", got.String()) - t.Errorf("failed to get expected (-want, +got) = %v", diff) - } -} From fb4570e938dff2d79a5c032856c49c3882619fa2 Mon Sep 17 00:00:00 2001 From: Nicolas Lopez Date: Thu, 23 Jan 2020 12:49:20 -0500 Subject: [PATCH 2/4] adding new test --- pkg/pubsub/adapter/converters/pubsub.go | 8 ++--- pkg/pubsub/adapter/converters/pubsub_test.go | 38 +++++++++++++++++++- 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/pkg/pubsub/adapter/converters/pubsub.go b/pkg/pubsub/adapter/converters/pubsub.go index b4f1c90c65..0b18339a8b 100644 --- a/pkg/pubsub/adapter/converters/pubsub.go +++ b/pkg/pubsub/adapter/converters/pubsub.go @@ -52,8 +52,6 @@ func convertPubSub(ctx context.Context, msg *cepubsub.Message, sendMode ModeType event.Data = msg.Data event.DataEncoded = true // Attributes are extensions. - // Also grab all extensions as a string, which we will set as attributes payload if send mode is Push. - attrs := make(map[string]string, 0) if msg.Attributes != nil && len(msg.Attributes) > 0 { for k, v := range msg.Attributes { // CloudEvents v1.0 attributes MUST consist of lower-case letters ('a' to 'z') or digits ('0' to '9') as per @@ -62,13 +60,13 @@ func convertPubSub(ctx context.Context, msg *cepubsub.Message, sendMode ModeType if IsAlphaNumeric(k) { event.SetExtension(k, v) } - // All atributes (whether valid as extensions) are stored in attrs to be set in the payload if mode is Push. - attrs[k] = v } } // If send mode is Push, convert to Pub/Sub Push payload style. if sendMode == Push { - event = convertToPush(ctx, event, attrs) + // set the content type to something that can be handled by codec.go + event.SetDataContentType("application/json") + event = convertToPush(ctx, event, msg.Attributes) } return &event, nil } diff --git a/pkg/pubsub/adapter/converters/pubsub_test.go b/pkg/pubsub/adapter/converters/pubsub_test.go index 4465fe25df..f81f00c203 100644 --- a/pkg/pubsub/adapter/converters/pubsub_test.go +++ b/pkg/pubsub/adapter/converters/pubsub_test.go @@ -18,6 +18,8 @@ package converters import ( "context" + "encoding/json" + "fmt" "testing" "cloud.google.com/go/pubsub" @@ -83,6 +85,24 @@ func TestConvertCloudPubSub(t *testing.T) { "attribute1": "value1", }) }, + }, { + name: "Push mode with non valid alphanumeric attribute", + message: &cepubsub.Message{ + Data: []byte("\"test data\""), // Data passed in quotes for it to be marshalled properly + Attributes: map[string]string{ + "attribute1": "value1", + "Invalid-Attrib#$^": "value2", + }, + }, + sendMode: Push, + wantEventFn: func() *cloudevents.Event { + return pushCloudEvent(map[string]string{ + "attribute1": "value1", + }, map[string]string{ + "attribute1": "value1", + "Invalid-Attrib#$^": "value2", + }, "\"test data\"") + }, }} for _, test := range tests { @@ -98,7 +118,6 @@ func TestConvertCloudPubSub(t *testing.T) { )) gotEvent, err := Convert(ctx, test.message, test.sendMode, "") - if err != nil { if !test.wantErr { t.Errorf("converters.convertPubsub got error %v want error=%v", err, test.wantErr) @@ -127,6 +146,23 @@ func pubSubCloudEvent(extensions map[string]string) *cloudevents.Event { return &e } +func pushCloudEvent(extensions, allExtensions map[string]string, data string) *cloudevents.Event { + e := cloudevents.NewEvent(cloudevents.VersionV1) + e.SetID("id") + e.SetSource(v1alpha1.CloudPubSubSourceEventSource("testproject", "testtopic")) + e.SetDataContentType("application/json") + e.SetType(v1alpha1.CloudPubSubSourcePublish) + e.SetExtension("knativecemode", string(Push)) + ex, _ := json.Marshal(allExtensions) + s := fmt.Sprintf(`{"subscription":"testsubscription","message":{"id":"id","data":%s,"attributes":%s,"publish_time":"0001-01-01T00:00:00Z"}}`, data, ex) + e.Data = []byte(s) + e.DataEncoded = true + for k, v := range extensions { + e.SetExtension(k, v) + } + return &e +} + func TestConvertToPush_noattrs(t *testing.T) { want := `Validation: valid Context Attributes, From e40884d3c06da5da4262c64bff9ddb5cf824713d Mon Sep 17 00:00:00 2001 From: Nicolas Lopez Date: Fri, 24 Jan 2020 11:10:00 -0500 Subject: [PATCH 3/4] collapse convertToPush in convertPubSub --- pkg/pubsub/adapter/converters/pubsub.go | 74 +++++------- pkg/pubsub/adapter/converters/pubsub_test.go | 118 +++---------------- 2 files changed, 49 insertions(+), 143 deletions(-) diff --git a/pkg/pubsub/adapter/converters/pubsub.go b/pkg/pubsub/adapter/converters/pubsub.go index 0b18339a8b..c3da456042 100644 --- a/pkg/pubsub/adapter/converters/pubsub.go +++ b/pkg/pubsub/adapter/converters/pubsub.go @@ -51,8 +51,36 @@ func convertPubSub(ctx context.Context, msg *cepubsub.Message, sendMode ModeType event.SetExtension("knativecemode", string(sendMode)) event.Data = msg.Data event.DataEncoded = true - // Attributes are extensions. - if msg.Attributes != nil && len(msg.Attributes) > 0 { + + // If send mode is Push, convert to Pub/Sub Push payload style. + if sendMode == Push { + logger := logging.FromContext(ctx).With(zap.Any("event.id", event.ID())) + // set the content type to something that can be handled by codec.go + event.SetDataContentType(cloudevents.ApplicationJSON) + msg := &PubSubMessage{ + ID: event.ID(), + Attributes: msg.Attributes, + PublishTime: event.Time(), + } + + var raw json.RawMessage + if err := event.DataAs(&raw); err != nil { + logger.Debugw("Failed to get data as raw json, using as is.", zap.Error(err)) + // Use data as a byte slice. + msg.Data = event.Data + } else { + // Use data as a raw message. + msg.Data = raw + } + + if err := event.SetData(&PushMessage{ + Subscription: tx.Subscription, + Message: msg, + }); err != nil { + logger.Warnw("Failed to set data.", zap.Error(err)) + } + } else if msg.Attributes != nil && len(msg.Attributes) > 0 { + // Attributes are promoted to extensions if send mode is not Push. for k, v := range msg.Attributes { // CloudEvents v1.0 attributes MUST consist of lower-case letters ('a' to 'z') or digits ('0' to '9') as per // the spec. It's not even possible for a conformant transport to allow non-base36 characters. @@ -61,12 +89,7 @@ func convertPubSub(ctx context.Context, msg *cepubsub.Message, sendMode ModeType event.SetExtension(k, v) } } - } - // If send mode is Push, convert to Pub/Sub Push payload style. - if sendMode == Push { - // set the content type to something that can be handled by codec.go - event.SetDataContentType("application/json") - event = convertToPush(ctx, event, msg.Attributes) + } return &event, nil } @@ -98,38 +121,3 @@ type PubSubMessage struct { // This field is read-only. PublishTime time.Time `json:"publish_time,omitempty"` } - -// convertToPush convert an event to a Pub/Sub style Push payload. -func convertToPush(ctx context.Context, event cloudevents.Event, attrs map[string]string) cloudevents.Event { - logger := logging.FromContext(ctx).With(zap.Any("event.id", event.ID())) - - tx := pubsubcontext.TransportContextFrom(ctx) - - push := cloudevents.NewEvent(event.SpecVersion()) - push.Context = event.Context.Clone() - - msg := &PubSubMessage{ - ID: event.ID(), - Attributes: attrs, - PublishTime: event.Time(), - } - - var raw json.RawMessage - if err := event.DataAs(&raw); err != nil { - logger.Debugw("Failed to get data as raw json, using as is.", zap.Error(err)) - // Use data as a byte slice. - msg.Data = event.Data - } else { - // Use data as a raw message. - msg.Data = raw - } - - if err := push.SetData(&PushMessage{ - Subscription: tx.Subscription, - Message: msg, - }); err != nil { - logger.Warnw("Failed to set data.", zap.Error(err)) - } - - return push -} diff --git a/pkg/pubsub/adapter/converters/pubsub_test.go b/pkg/pubsub/adapter/converters/pubsub_test.go index f81f00c203..d0cf0e8c5c 100644 --- a/pkg/pubsub/adapter/converters/pubsub_test.go +++ b/pkg/pubsub/adapter/converters/pubsub_test.go @@ -97,12 +97,20 @@ func TestConvertCloudPubSub(t *testing.T) { sendMode: Push, wantEventFn: func() *cloudevents.Event { return pushCloudEvent(map[string]string{ - "attribute1": "value1", - }, map[string]string{ "attribute1": "value1", "Invalid-Attrib#$^": "value2", }, "\"test data\"") }, + }, { + name: "Push mode with no attributes", + message: &cepubsub.Message{ + Data: []byte("\"test data\""), // Data passed in quotes for it to be marshalled properly + Attributes: map[string]string{}, + }, + sendMode: Push, + wantEventFn: func() *cloudevents.Event { + return pushCloudEvent(nil, "\"test data\"") + }, }} for _, test := range tests { @@ -146,110 +154,20 @@ func pubSubCloudEvent(extensions map[string]string) *cloudevents.Event { return &e } -func pushCloudEvent(extensions, allExtensions map[string]string, data string) *cloudevents.Event { +func pushCloudEvent(attributes map[string]string, data string) *cloudevents.Event { e := cloudevents.NewEvent(cloudevents.VersionV1) e.SetID("id") e.SetSource(v1alpha1.CloudPubSubSourceEventSource("testproject", "testtopic")) - e.SetDataContentType("application/json") + e.SetDataContentType(cloudevents.ApplicationJSON) e.SetType(v1alpha1.CloudPubSubSourcePublish) e.SetExtension("knativecemode", string(Push)) - ex, _ := json.Marshal(allExtensions) - s := fmt.Sprintf(`{"subscription":"testsubscription","message":{"id":"id","data":%s,"attributes":%s,"publish_time":"0001-01-01T00:00:00Z"}}`, data, ex) + at := "" + if attributes != nil { + ex, _ := json.Marshal(attributes) + at = fmt.Sprintf(`"attributes":%s,`, ex) + } + s := fmt.Sprintf(`{"subscription":"testsubscription","message":{"id":"id","data":%s,%s"publish_time":"0001-01-01T00:00:00Z"}}`, data, at) e.Data = []byte(s) e.DataEncoded = true - for k, v := range extensions { - e.SetExtension(k, v) - } return &e } - -func TestConvertToPush_noattrs(t *testing.T) { - want := `Validation: valid -Context Attributes, - specversion: 0.2 - type: unit.testing - source: source - id: abc-123 - contenttype: application/json -Data, - { - "subscription": "sub", - "message": { - "id": "abc-123", - "data": "testing", - "publish_time": "0001-01-01T00:00:00Z" - } - } -` - - event := cloudevents.NewEvent() - event.SetSource("source") - event.SetType("unit.testing") - event.SetID("abc-123") - event.SetDataContentType("application/json") - _ = event.SetData("testing") - attrs := make(map[string]string, 0) - ctx := pubsubcontext.WithTransportContext(context.TODO(), pubsubcontext.NewTransportContext( - "proj", - "top", - "sub", - "test", - &pubsub.Message{}, - )) - - got := convertToPush(ctx, event, attrs) - - if diff := cmp.Diff(want, got.String()); diff != "" { - t.Logf("%s", got.String()) - t.Errorf("failed to get expected (-want, +got) = %v", diff) - } -} - -func TestConvertToPush_attrs(t *testing.T) { - want := `Validation: valid -Context Attributes, - specversion: 0.2 - type: unit.testing - source: source - id: abc-123 - contenttype: application/json -Extensions, - foo: bar -Data, - { - "subscription": "sub", - "message": { - "id": "abc-123", - "data": "testing", - "attributes": { - "foo": "bar" - }, - "publish_time": "0001-01-01T00:00:00Z" - } - } -` - - event := cloudevents.NewEvent() - event.SetSource("source") - event.SetType("unit.testing") - event.SetID("abc-123") - event.SetExtension("foo", "bar") - event.SetDataContentType("application/json") - _ = event.SetData("testing") - attrs := make(map[string]string, 0) - attrs["foo"] = "bar" - ctx := pubsubcontext.WithTransportContext(context.TODO(), pubsubcontext.NewTransportContext( - "proj", - "top", - "sub", - "test", - &pubsub.Message{}, - )) - - got := convertToPush(ctx, event, attrs) - - if diff := cmp.Diff(want, got.String()); diff != "" { - t.Logf("%s", got.String()) - t.Errorf("failed to get expected (-want, +got) = %v", diff) - } -} From abf2bb368a81592cad320d26e076bb3ca2c1e274 Mon Sep 17 00:00:00 2001 From: Nicolas Lopez Date: Fri, 24 Jan 2020 11:39:19 -0500 Subject: [PATCH 4/4] increase testing coverage --- pkg/pubsub/adapter/converters/pubsub.go | 12 ++++----- pkg/pubsub/adapter/converters/pubsub_test.go | 28 +++++++++++++++++--- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/pkg/pubsub/adapter/converters/pubsub.go b/pkg/pubsub/adapter/converters/pubsub.go index c3da456042..4bc815794b 100644 --- a/pkg/pubsub/adapter/converters/pubsub.go +++ b/pkg/pubsub/adapter/converters/pubsub.go @@ -57,7 +57,7 @@ func convertPubSub(ctx context.Context, msg *cepubsub.Message, sendMode ModeType logger := logging.FromContext(ctx).With(zap.Any("event.id", event.ID())) // set the content type to something that can be handled by codec.go event.SetDataContentType(cloudevents.ApplicationJSON) - msg := &PubSubMessage{ + msg := &pubSubMessage{ ID: event.ID(), Attributes: msg.Attributes, PublishTime: event.Time(), @@ -73,7 +73,7 @@ func convertPubSub(ctx context.Context, msg *cepubsub.Message, sendMode ModeType msg.Data = raw } - if err := event.SetData(&PushMessage{ + if err := event.SetData(&pushMessage{ Subscription: tx.Subscription, Message: msg, }); err != nil { @@ -94,16 +94,16 @@ func convertPubSub(ctx context.Context, msg *cepubsub.Message, sendMode ModeType return &event, nil } -// PushMessage represents the format Pub/Sub uses to push events. -type PushMessage struct { +// pushMessage represents the format Pub/Sub uses to push events. +type pushMessage struct { // Subscription is the subscription ID that received this Message. Subscription string `json:"subscription"` // Message holds the Pub/Sub message contents. - Message *PubSubMessage `json:"message,omitempty"` + Message *pubSubMessage `json:"message,omitempty"` } // PubSubMessage matches the inner message format used by Push Subscriptions. -type PubSubMessage struct { +type pubSubMessage struct { // ID identifies this message. This ID is assigned by the server and is // populated for Messages obtained from a subscription. // This field is read-only. diff --git a/pkg/pubsub/adapter/converters/pubsub_test.go b/pkg/pubsub/adapter/converters/pubsub_test.go index d0cf0e8c5c..7305e1a389 100644 --- a/pkg/pubsub/adapter/converters/pubsub_test.go +++ b/pkg/pubsub/adapter/converters/pubsub_test.go @@ -52,7 +52,7 @@ func TestConvertCloudPubSub(t *testing.T) { return pubSubCloudEvent(map[string]string{ "attribute1": "value1", "attribute2": "value2", - }) + }, "") }, }, { name: "upper case attributes", @@ -68,7 +68,7 @@ func TestConvertCloudPubSub(t *testing.T) { return pubSubCloudEvent(map[string]string{ "attribute1": "value1", "attribute2": "value2", - }) + }, "") }, }, { name: "only setting valid alphanumeric attribute", @@ -83,7 +83,24 @@ func TestConvertCloudPubSub(t *testing.T) { wantEventFn: func() *cloudevents.Event { return pubSubCloudEvent(map[string]string{ "attribute1": "value1", - }) + }, "") + }, + }, { + name: "schema as attribute", + message: &cepubsub.Message{ + Data: []byte("test data"), + Attributes: map[string]string{ + "attribute1": "value1", + "attribute2": "value2", + "schema": "schema_val", + }, + }, + sendMode: Binary, + wantEventFn: func() *cloudevents.Event { + return pubSubCloudEvent(map[string]string{ + "attribute1": "value1", + "attribute2": "value2", + }, "schema_val") }, }, { name: "Push mode with non valid alphanumeric attribute", @@ -139,7 +156,7 @@ func TestConvertCloudPubSub(t *testing.T) { } } -func pubSubCloudEvent(extensions map[string]string) *cloudevents.Event { +func pubSubCloudEvent(extensions map[string]string, schema string) *cloudevents.Event { e := cloudevents.NewEvent(cloudevents.VersionV1) e.SetID("id") e.SetSource(v1alpha1.CloudPubSubSourceEventSource("testproject", "testtopic")) @@ -151,6 +168,9 @@ func pubSubCloudEvent(extensions map[string]string) *cloudevents.Event { for k, v := range extensions { e.SetExtension(k, v) } + if schema != "" { + e.SetDataSchema(schema) + } return &e }