Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Move ConvertToPush method to pubsub.go #512

Merged
merged 4 commits into from
Jan 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions pkg/pubsub/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import (
"context"
"fmt"

nethttp "net/http"

"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/b3"
"go.opencensus.io/trace"
"go.uber.org/zap"
nethttp "net/http"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
Expand Down Expand Up @@ -213,11 +214,6 @@ func (a *Adapter) receive(ctx context.Context, event cloudevents.Event, resp *cl
ctx = trace.NewContext(ctx, trace.FromContext(transformedCTX))
}

// If send mode is Push, convert to Pub/Sub Push payload style.
if a.SendMode == converters.Push {
event = ConvertToPush(ctx, event)
}

// Apply CloudEvent override extensions to the outbound event.
for k, v := range a.extensions {
event.SetExtension(k, v)
Expand Down
65 changes: 63 additions & 2 deletions pkg/pubsub/adapter/converters/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ package converters

import (
"context"
"encoding/json"
"time"

cloudevents "github.com/cloudevents/sdk-go"
. "github.com/cloudevents/sdk-go/pkg/cloudevents"
cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
pubsubcontext "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub/context"
"go.uber.org/zap"
"knative.dev/pkg/logging"

"github.com/google/knative-gcp/pkg/apis/events/v1alpha1"
)
Expand All @@ -47,8 +51,36 @@ func convertPubSub(ctx context.Context, msg *cepubsub.Message, sendMode ModeType
event.SetExtension("knativecemode", string(sendMode))
event.Data = msg.Data
event.DataEncoded = true
// Attributes are extensions.
if msg.Attributes != nil && len(msg.Attributes) > 0 {

// If send mode is Push, convert to Pub/Sub Push payload style.
if sendMode == Push {
logger := logging.FromContext(ctx).With(zap.Any("event.id", event.ID()))
// set the content type to something that can be handled by codec.go
event.SetDataContentType(cloudevents.ApplicationJSON)
msg := &pubSubMessage{
ID: event.ID(),
Attributes: msg.Attributes,
PublishTime: event.Time(),
}

var raw json.RawMessage
if err := event.DataAs(&raw); err != nil {
logger.Debugw("Failed to get data as raw json, using as is.", zap.Error(err))
// Use data as a byte slice.
msg.Data = event.Data
} else {
// Use data as a raw message.
msg.Data = raw
}

if err := event.SetData(&pushMessage{
Subscription: tx.Subscription,
Message: msg,
}); err != nil {
logger.Warnw("Failed to set data.", zap.Error(err))
}
} else if msg.Attributes != nil && len(msg.Attributes) > 0 {
// Attributes are promoted to extensions if send mode is not Push.
for k, v := range msg.Attributes {
// CloudEvents v1.0 attributes MUST consist of lower-case letters ('a' to 'z') or digits ('0' to '9') as per
// the spec. It's not even possible for a conformant transport to allow non-base36 characters.
Expand All @@ -57,6 +89,35 @@ func convertPubSub(ctx context.Context, msg *cepubsub.Message, sendMode ModeType
event.SetExtension(k, v)
}
}

}
return &event, nil
}

// pushMessage represents the format Pub/Sub uses to push events.
type pushMessage struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for making it private!

// Subscription is the subscription ID that received this Message.
Subscription string `json:"subscription"`
// Message holds the Pub/Sub message contents.
Message *pubSubMessage `json:"message,omitempty"`
}

// PubSubMessage matches the inner message format used by Push Subscriptions.
type pubSubMessage struct {
// ID identifies this message. This ID is assigned by the server and is
// populated for Messages obtained from a subscription.
// This field is read-only.
ID string `json:"id,omitempty"`

// Data is the actual data in the message.
Data interface{} `json:"data,omitempty"`

// Attributes represents the key-value pairs the current message
// is labelled with.
Attributes map[string]string `json:"attributes,omitempty"`

// The time at which the message was published. This is populated by the
// server for Messages obtained from a subscription.
// This field is read-only.
PublishTime time.Time `json:"publish_time,omitempty"`
}
75 changes: 70 additions & 5 deletions pkg/pubsub/adapter/converters/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package converters

import (
"context"
"encoding/json"
"fmt"
"testing"

"cloud.google.com/go/pubsub"
Expand Down Expand Up @@ -50,7 +52,7 @@ func TestConvertCloudPubSub(t *testing.T) {
return pubSubCloudEvent(map[string]string{
"attribute1": "value1",
"attribute2": "value2",
})
}, "")
},
}, {
name: "upper case attributes",
Expand All @@ -66,7 +68,7 @@ func TestConvertCloudPubSub(t *testing.T) {
return pubSubCloudEvent(map[string]string{
"attribute1": "value1",
"attribute2": "value2",
})
}, "")
},
}, {
name: "only setting valid alphanumeric attribute",
Expand All @@ -81,7 +83,50 @@ func TestConvertCloudPubSub(t *testing.T) {
wantEventFn: func() *cloudevents.Event {
return pubSubCloudEvent(map[string]string{
"attribute1": "value1",
})
}, "")
},
}, {
name: "schema as attribute",
message: &cepubsub.Message{
Data: []byte("test data"),
Attributes: map[string]string{
"attribute1": "value1",
"attribute2": "value2",
"schema": "schema_val",
},
},
sendMode: Binary,
wantEventFn: func() *cloudevents.Event {
return pubSubCloudEvent(map[string]string{
"attribute1": "value1",
"attribute2": "value2",
}, "schema_val")
},
}, {
name: "Push mode with non valid alphanumeric attribute",
message: &cepubsub.Message{
Data: []byte("\"test data\""), // Data passed in quotes for it to be marshalled properly
Attributes: map[string]string{
"attribute1": "value1",
"Invalid-Attrib#$^": "value2",
},
},
sendMode: Push,
wantEventFn: func() *cloudevents.Event {
return pushCloudEvent(map[string]string{
"attribute1": "value1",
"Invalid-Attrib#$^": "value2",
}, "\"test data\"")
},
}, {
name: "Push mode with no attributes",
message: &cepubsub.Message{
Data: []byte("\"test data\""), // Data passed in quotes for it to be marshalled properly
Attributes: map[string]string{},
},
sendMode: Push,
wantEventFn: func() *cloudevents.Event {
return pushCloudEvent(nil, "\"test data\"")
},
}}

Expand All @@ -98,7 +143,6 @@ func TestConvertCloudPubSub(t *testing.T) {
))

gotEvent, err := Convert(ctx, test.message, test.sendMode, "")

if err != nil {
if !test.wantErr {
t.Errorf("converters.convertPubsub got error %v want error=%v", err, test.wantErr)
Expand All @@ -112,7 +156,7 @@ func TestConvertCloudPubSub(t *testing.T) {
}
}

func pubSubCloudEvent(extensions map[string]string) *cloudevents.Event {
func pubSubCloudEvent(extensions map[string]string, schema string) *cloudevents.Event {
e := cloudevents.NewEvent(cloudevents.VersionV1)
e.SetID("id")
e.SetSource(v1alpha1.CloudPubSubSourceEventSource("testproject", "testtopic"))
Expand All @@ -124,5 +168,26 @@ func pubSubCloudEvent(extensions map[string]string) *cloudevents.Event {
for k, v := range extensions {
e.SetExtension(k, v)
}
if schema != "" {
e.SetDataSchema(schema)
}
return &e
}

func pushCloudEvent(attributes map[string]string, data string) *cloudevents.Event {
e := cloudevents.NewEvent(cloudevents.VersionV1)
e.SetID("id")
e.SetSource(v1alpha1.CloudPubSubSourceEventSource("testproject", "testtopic"))
e.SetDataContentType(cloudevents.ApplicationJSON)
e.SetType(v1alpha1.CloudPubSubSourcePublish)
e.SetExtension("knativecemode", string(Push))
at := ""
if attributes != nil {
ex, _ := json.Marshal(attributes)
at = fmt.Sprintf(`"attributes":%s,`, ex)
}
s := fmt.Sprintf(`{"subscription":"testsubscription","message":{"id":"id","data":%s,%s"publish_time":"0001-01-01T00:00:00Z"}}`, data, at)
e.Data = []byte(s)
e.DataEncoded = true
return &e
}
102 changes: 0 additions & 102 deletions pkg/pubsub/adapter/push.go

This file was deleted.

Loading