Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
Move ConvertToPush method to pubsub.go (#512) (#520)
Browse files Browse the repository at this point in the history
* move convert to push to pubsub.go

* adding new test

* collapse convertToPush in convertPubSub

* increase testing coverage
  • Loading branch information
Nicolas Lopez authored and knative-prow-robot committed Jan 24, 2020
1 parent 8c317a7 commit b4c323d
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 233 deletions.
8 changes: 2 additions & 6 deletions pkg/pubsub/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import (
"context"
"fmt"

nethttp "net/http"

"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/b3"
"go.opencensus.io/trace"
"go.uber.org/zap"
nethttp "net/http"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
Expand Down Expand Up @@ -213,11 +214,6 @@ func (a *Adapter) receive(ctx context.Context, event cloudevents.Event, resp *cl
ctx = trace.NewContext(ctx, trace.FromContext(transformedCTX))
}

// If send mode is Push, convert to Pub/Sub Push payload style.
if a.SendMode == converters.Push {
event = ConvertToPush(ctx, event)
}

// Apply CloudEvent override extensions to the outbound event.
for k, v := range a.extensions {
event.SetExtension(k, v)
Expand Down
65 changes: 63 additions & 2 deletions pkg/pubsub/adapter/converters/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ package converters

import (
"context"
"encoding/json"
"time"

cloudevents "github.com/cloudevents/sdk-go"
. "github.com/cloudevents/sdk-go/pkg/cloudevents"
cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
pubsubcontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context"
"go.uber.org/zap"
"knative.dev/pkg/logging"

"github.com/google/knative-gcp/pkg/apis/events/v1alpha1"
)
Expand All @@ -47,8 +51,36 @@ func convertPubSub(ctx context.Context, msg *cepubsub.Message, sendMode ModeType
event.SetExtension("knativecemode", string(sendMode))
event.Data = msg.Data
event.DataEncoded = true
// Attributes are extensions.
if msg.Attributes != nil && len(msg.Attributes) > 0 {

// If send mode is Push, convert to Pub/Sub Push payload style.
if sendMode == Push {
logger := logging.FromContext(ctx).With(zap.Any("event.id", event.ID()))
// set the content type to something that can be handled by codec.go
event.SetDataContentType(cloudevents.ApplicationJSON)
msg := &pubSubMessage{
ID: event.ID(),
Attributes: msg.Attributes,
PublishTime: event.Time(),
}

var raw json.RawMessage
if err := event.DataAs(&raw); err != nil {
logger.Debugw("Failed to get data as raw json, using as is.", zap.Error(err))
// Use data as a byte slice.
msg.Data = event.Data
} else {
// Use data as a raw message.
msg.Data = raw
}

if err := event.SetData(&pushMessage{
Subscription: tx.Subscription,
Message: msg,
}); err != nil {
logger.Warnw("Failed to set data.", zap.Error(err))
}
} else if msg.Attributes != nil && len(msg.Attributes) > 0 {
// Attributes are promoted to extensions if send mode is not Push.
for k, v := range msg.Attributes {
// CloudEvents v1.0 attributes MUST consist of lower-case letters ('a' to 'z') or digits ('0' to '9') as per
// the spec. It's not even possible for a conformant transport to allow non-base36 characters.
Expand All @@ -57,6 +89,35 @@ func convertPubSub(ctx context.Context, msg *cepubsub.Message, sendMode ModeType
event.SetExtension(k, v)
}
}

}
return &event, nil
}

// pushMessage represents the format Pub/Sub uses to push events.
type pushMessage struct {
// Subscription is the subscription ID that received this Message.
Subscription string `json:"subscription"`
// Message holds the Pub/Sub message contents.
Message *pubSubMessage `json:"message,omitempty"`
}

// PubSubMessage matches the inner message format used by Push Subscriptions.
type pubSubMessage struct {
// ID identifies this message. This ID is assigned by the server and is
// populated for Messages obtained from a subscription.
// This field is read-only.
ID string `json:"id,omitempty"`

// Data is the actual data in the message.
Data interface{} `json:"data,omitempty"`

// Attributes represents the key-value pairs the current message
// is labelled with.
Attributes map[string]string `json:"attributes,omitempty"`

// The time at which the message was published. This is populated by the
// server for Messages obtained from a subscription.
// This field is read-only.
PublishTime time.Time `json:"publish_time,omitempty"`
}
75 changes: 70 additions & 5 deletions pkg/pubsub/adapter/converters/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package converters

import (
"context"
"encoding/json"
"fmt"
"testing"

"cloud.google.com/go/pubsub"
Expand Down Expand Up @@ -50,7 +52,7 @@ func TestConvertCloudPubSub(t *testing.T) {
return pubSubCloudEvent(map[string]string{
"attribute1": "value1",
"attribute2": "value2",
})
}, "")
},
}, {
name: "upper case attributes",
Expand All @@ -66,7 +68,7 @@ func TestConvertCloudPubSub(t *testing.T) {
return pubSubCloudEvent(map[string]string{
"attribute1": "value1",
"attribute2": "value2",
})
}, "")
},
}, {
name: "only setting valid alphanumeric attribute",
Expand All @@ -81,7 +83,50 @@ func TestConvertCloudPubSub(t *testing.T) {
wantEventFn: func() *cloudevents.Event {
return pubSubCloudEvent(map[string]string{
"attribute1": "value1",
})
}, "")
},
}, {
name: "schema as attribute",
message: &cepubsub.Message{
Data: []byte("test data"),
Attributes: map[string]string{
"attribute1": "value1",
"attribute2": "value2",
"schema": "schema_val",
},
},
sendMode: Binary,
wantEventFn: func() *cloudevents.Event {
return pubSubCloudEvent(map[string]string{
"attribute1": "value1",
"attribute2": "value2",
}, "schema_val")
},
}, {
name: "Push mode with non valid alphanumeric attribute",
message: &cepubsub.Message{
Data: []byte("\"test data\""), // Data passed in quotes for it to be marshalled properly
Attributes: map[string]string{
"attribute1": "value1",
"Invalid-Attrib#$^": "value2",
},
},
sendMode: Push,
wantEventFn: func() *cloudevents.Event {
return pushCloudEvent(map[string]string{
"attribute1": "value1",
"Invalid-Attrib#$^": "value2",
}, "\"test data\"")
},
}, {
name: "Push mode with no attributes",
message: &cepubsub.Message{
Data: []byte("\"test data\""), // Data passed in quotes for it to be marshalled properly
Attributes: map[string]string{},
},
sendMode: Push,
wantEventFn: func() *cloudevents.Event {
return pushCloudEvent(nil, "\"test data\"")
},
}}

Expand All @@ -98,7 +143,6 @@ func TestConvertCloudPubSub(t *testing.T) {
))

gotEvent, err := Convert(ctx, test.message, test.sendMode, "")

if err != nil {
if !test.wantErr {
t.Errorf("converters.convertPubsub got error %v want error=%v", err, test.wantErr)
Expand All @@ -112,7 +156,7 @@ func TestConvertCloudPubSub(t *testing.T) {
}
}

func pubSubCloudEvent(extensions map[string]string) *cloudevents.Event {
func pubSubCloudEvent(extensions map[string]string, schema string) *cloudevents.Event {
e := cloudevents.NewEvent(cloudevents.VersionV1)
e.SetID("id")
e.SetSource(v1alpha1.CloudPubSubSourceEventSource("testproject", "testtopic"))
Expand All @@ -124,5 +168,26 @@ func pubSubCloudEvent(extensions map[string]string) *cloudevents.Event {
for k, v := range extensions {
e.SetExtension(k, v)
}
if schema != "" {
e.SetDataSchema(schema)
}
return &e
}

func pushCloudEvent(attributes map[string]string, data string) *cloudevents.Event {
e := cloudevents.NewEvent(cloudevents.VersionV1)
e.SetID("id")
e.SetSource(v1alpha1.CloudPubSubSourceEventSource("testproject", "testtopic"))
e.SetDataContentType(cloudevents.ApplicationJSON)
e.SetType(v1alpha1.CloudPubSubSourcePublish)
e.SetExtension("knativecemode", string(Push))
at := ""
if attributes != nil {
ex, _ := json.Marshal(attributes)
at = fmt.Sprintf(`"attributes":%s,`, ex)
}
s := fmt.Sprintf(`{"subscription":"testsubscription","message":{"id":"id","data":%s,%s"publish_time":"0001-01-01T00:00:00Z"}}`, data, at)
e.Data = []byte(s)
e.DataEncoded = true
return &e
}
102 changes: 0 additions & 102 deletions pkg/pubsub/adapter/push.go

This file was deleted.

Loading

0 comments on commit b4c323d

Please sign in to comment.