Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
CloudPubSubSource to Push mode (#521)
Browse files Browse the repository at this point in the history
* Setting CloudPubSubSource to be push-based.
Adding a PullSubscriptionArgs struct

* updating docs

* making it public for e2e tests.
attempt to fix test

* rollback the change to boilerplate

* properly unmarshalling the message
  • Loading branch information
nachocano authored and knative-prow-robot committed Jan 24, 2020
1 parent 2b3a523 commit 9c6c2a6
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 59 deletions.
4 changes: 2 additions & 2 deletions docs/examples/cloudauditlogssource/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ Context Attributes,
specversion: 1.0
type: com.google.cloud.auditlog.event
source: pubsub.googleapis.com/projects/test
subject: pubsub.googleapis.com/projects/test/topics/test-auditlogs-source
id: 8c2iznd54odprojects/test/logs/cloudaudit.googleapis.com%2Factivity2020-01-07T20:56:30.516179081Z
subject: projects/test/topics/test-auditlogs-source
id: efdb9bf7d6fdfc922352530c1ba51242
time: 2020-01-07T20:56:30.516179081Z
dataschema: type.googleapis.com/google.logging.v2.LogEntry
datacontenttype: application/json
Expand Down
28 changes: 21 additions & 7 deletions docs/examples/cloudpubsubsource/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

## Overview

This sample shows how to configure the CloudPubSubSource. CloudPubSubSource fires a new event each time a message is published on a [Google Cloud Platform PubSub topic](https://cloud.google.com/pubsub/).
This sample shows how to configure `CloudPubSubSources`.
The `CloudPubSubSource` fires a new event each time a message is published on a [Cloud Pub/Sub topic](https://cloud.google.com/pubsub/).
This source sends events using a Push-compatible format.

## Prerequisites

Expand Down Expand Up @@ -68,18 +70,30 @@ Validation: valid
Context Attributes,
specversion: 1.0
type: com.google.cloud.pubsub.topic.publish
source: //pubsub.googleapis.com/projects/xiyue-knative-gcp/topics/testing
id: 946366448650699
time: 2020-01-21T22:12:06.742Z
datacontenttype: application/octet-stream
source: //pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_NAME
id: 951049449503068
time: 2020-01-24T18:29:36.874Z
datacontenttype: application/json
Extensions,
knativecemode: binary
knativearrivaltime: 2020-01-24T18:29:37.212883996Z
knativecemode: push
traceparent: 00-7e7fb503ae694cc0f1cbf84ea63354be-f8c4848c9c11e073-00
Data,
{"Hello": "world"}
{
"subscription": "cre-pull-7b35a745-877f-4f1f-9434-74062631a958",
"message": {
"id": "951049449503068",
"data": {
"Hello": "world"
},
"publish_time": "2020-01-24T18:29:36.874Z"
}
}
```

## What's Next

1. For more details on Cloud Pub/Sub formats refer to the [Subscriber overview guide](https://cloud.google.com/pubsub/docs/subscriber).
1. For integrating with Cloud Storage see the [Storage example](../../examples/cloudstoragesource/README.md).
1. For integrating with Cloud Scheduler see the [Scheduler example](../../examples/cloudschedulersource/README.md).
1. For integrating with Cloud Audit Logs see the [Cloud Audit Logs example](../../examples/cloudauditlogssource/README.md).
Expand Down
8 changes: 5 additions & 3 deletions docs/examples/pullsubscription/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# PullSubscription Example

This sample shows how to configure `PullSubscriptions`. This
can be considered an implementation detail of the [CloudPubSubSource](../../examples/cloudpubsubsource/README.md), and users should rather
use the latter if they want to bridge events from other Google Cloud services using Pub/Sub into their clusters.
can be considered an implementation detail of the [CloudPubSubSource](../../examples/cloudpubsubsource/README.md),
and users should rather use the latter if they want to bridge events from Pub/Sub into their clusters. As opposed to the
`CloudPubSubSource`, which sends events using the Push-compatible format, this does so using a Pull format.

## Prerequisites

Expand Down Expand Up @@ -117,7 +118,8 @@ For more information about the format of the `Data` see the `data` field of

## What's next

1. For a higher-level construct to interact with Cloud Pub/Sub, see the [PubSub example](../../examples/cloudpubsubsource/README.md).
1. For more details on Cloud Pub/Sub formats refer to the [Subscriber overview guide](https://cloud.google.com/pubsub/docs/subscriber).
1. For a higher-level construct to interact with Cloud Pub/Sub that sends Push-compatible format events, see the [PubSub example](../../examples/cloudpubsubsource/README.md).
1. For integrating with Cloud Storage see the [Storage example](../../examples/cloudstoragesource/README.md).
1. For integrating with Cloud Scheduler see the [Scheduler example](../../examples/cloudschedulersource/README.md).
1. For integrating with Cloud Audit Logs see the [Cloud Audit Logs example](../../examples/cloudauditlogssource/README.md).
Expand Down
49 changes: 27 additions & 22 deletions pkg/pubsub/adapter/converters/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ func convertPubSub(ctx context.Context, msg *cepubsub.Message, sendMode ModeType
event.SetID(tx.ID)
event.SetTime(tx.PublishTime)
event.SetSource(v1alpha1.CloudPubSubSourceEventSource(tx.Project, tx.Topic))
// We do not know the content type and we do not want to inspect the payload,
// thus we set this generic one.
event.SetDataContentType("application/octet-stream")
event.SetType(v1alpha1.CloudPubSubSourcePublish)
// Set the schema if it comes as an attribute.
if val, ok := msg.Attributes["schema"]; ok {
Expand All @@ -49,61 +46,69 @@ func convertPubSub(ctx context.Context, msg *cepubsub.Message, sendMode ModeType
}
// Set the mode to be an extension attribute.
event.SetExtension("knativecemode", string(sendMode))
// Setting the event Data for Pull format. If it's Push, it will be overwritten below.
// Setting it here to be able to leverage event.DataAs call below.
event.Data = msg.Data
event.DataEncoded = true

logger := logging.FromContext(ctx).With(zap.Any("event.id", event.ID()))
// If send mode is Push, convert to Pub/Sub Push payload style.
if sendMode == Push {
logger := logging.FromContext(ctx).With(zap.Any("event.id", event.ID()))
// set the content type to something that can be handled by codec.go
// Set the content type to something that can be handled by codec.go.
event.SetDataContentType(cloudevents.ApplicationJSON)
msg := &pubSubMessage{
msg := &PubSubMessage{
ID: event.ID(),
Attributes: msg.Attributes,
PublishTime: event.Time(),
}

var raw json.RawMessage
if err := event.DataAs(&raw); err != nil {
logger.Debugw("Failed to get data as raw json, using as is.", zap.Error(err))
logger.Desugar().Debug("Failed to get data as raw json, using as is.", zap.Error(err))
// Use data as a byte slice.
msg.Data = event.Data
} else {
// Use data as a raw message.
msg.Data = raw
}

if err := event.SetData(&pushMessage{
if err := event.SetData(&PushMessage{
Subscription: tx.Subscription,
Message: msg,
}); err != nil {
logger.Warnw("Failed to set data.", zap.Error(err))
logger.Desugar().Warn("Failed to set data.", zap.Error(err))
}
} else if msg.Attributes != nil && len(msg.Attributes) > 0 {
// Attributes are promoted to extensions if send mode is not Push.
for k, v := range msg.Attributes {
// CloudEvents v1.0 attributes MUST consist of lower-case letters ('a' to 'z') or digits ('0' to '9') as per
// the spec. It's not even possible for a conformant transport to allow non-base36 characters.
// Note `SetExtension` will make it lowercase so only `IsAlphaNumeric` needs to be checked here.
if IsAlphaNumeric(k) {
event.SetExtension(k, v)
} else {
// non-Push mode, attributes should be promoted to extensions.
// We do not know the content type and we do not want to inspect the payload,
// thus we set this generic one.
event.SetDataContentType("application/octet-stream")
if msg.Attributes != nil && len(msg.Attributes) > 0 {
for k, v := range msg.Attributes {
// CloudEvents v1.0 attributes MUST consist of lower-case letters ('a' to 'z') or digits ('0' to '9') as per
// the spec. It's not even possible for a conformant transport to allow non-base36 characters.
// Note `SetExtension` will make it lowercase so only `IsAlphaNumeric` needs to be checked here.
if IsAlphaNumeric(k) {
event.SetExtension(k, v)
} else {
logger.Desugar().Warn("Skipping attribute that is not a valid extension", zap.String(k, v))
}
}
}

}
return &event, nil
}

// pushMessage represents the format Pub/Sub uses to push events.
type pushMessage struct {
// PushMessage represents the format Pub/Sub uses to push events.
type PushMessage struct {
// Subscription is the subscription ID that received this Message.
Subscription string `json:"subscription"`
// Message holds the Pub/Sub message contents.
Message *pubSubMessage `json:"message,omitempty"`
Message *PubSubMessage `json:"message,omitempty"`
}

// PubSubMessage matches the inner message format used by Push Subscriptions.
type pubSubMessage struct {
type PubSubMessage struct {
// ID identifies this message. This ID is assigned by the server and is
// populated for Messages obtained from a subscription.
// This field is read-only.
Expand Down
12 changes: 11 additions & 1 deletion pkg/reconciler/events/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,17 @@ func (r *Reconciler) reconcilePullSubscription(ctx context.Context, source *v1al
logging.FromContext(ctx).Desugar().Error("Failed to get PullSubscription", zap.Error(err))
return nil, fmt.Errorf("failed to get PullSubscription: %w", err)
}
newPS := resources.MakePullSubscription(source.Namespace, source.Name, &source.Spec.PubSubSpec, source, source.Spec.Topic, r.receiveAdapterName, "", resourceGroup)
args := &resources.PullSubscriptionArgs{
Namespace: source.Namespace,
Name: source.Name,
Spec: &source.Spec.PubSubSpec,
Owner: source,
Topic: source.Spec.Topic,
ReceiveAdapter: r.receiveAdapterName,
ResourceGroup: resourceGroup,
Mode: pubsubv1alpha1.ModePushCompatible,
}
newPS := resources.MakePullSubscription(args)
logging.FromContext(ctx).Desugar().Debug("Creating PullSubscription", zap.Any("ps", newPS))
ps, err = r.RunClientSet.PubsubV1alpha1().PullSubscriptions(newPS.Namespace).Create(newPS)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/reconciler/events/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func TestAllCases(t *testing.T) {
Secret: &secret,
}),
WithPullSubscriptionSink(sinkGVK, sinkName),
WithPullSubscriptionMode(pubsubv1alpha1.ModePushCompatible),
WithPullSubscriptionLabels(map[string]string{
"receive-adapter": receiveAdapterName,
}),
Expand Down
12 changes: 11 additions & 1 deletion pkg/reconciler/pubsub/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,17 @@ func (psb *PubSubBase) ReconcilePubSub(ctx context.Context, pubsubable duck.PubS
logging.FromContext(ctx).Desugar().Error("Failed to get PullSubscription", zap.Error(err))
return t, nil, fmt.Errorf("failed to get Pullsubscription: %w", err)
}
newPS := resources.MakePullSubscription(namespace, name, spec, pubsubable, topic, psb.receiveAdapterName, psb.adapterType, resourceGroup)
args := &resources.PullSubscriptionArgs{
Namespace: namespace,
Name: name,
Spec: spec,
Owner: pubsubable,
Topic: topic,
ReceiveAdapter: psb.receiveAdapterName,
AdapterType: psb.adapterType,
ResourceGroup: resourceGroup,
}
newPS := resources.MakePullSubscription(args)
ps, err = pullSubscriptions.Create(newPS)
if err != nil {
logging.FromContext(ctx).Desugar().Error("Failed to create PullSubscription", zap.Any("ps", newPS), zap.Error(err))
Expand Down
43 changes: 28 additions & 15 deletions pkg/reconciler/pubsub/resources/pullsubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,43 +25,56 @@ import (
pubsubv1alpha1 "github.com/google/knative-gcp/pkg/apis/pubsub/v1alpha1"
)

type PullSubscriptionArgs struct {
Namespace string
Name string
Spec *duckv1alpha1.PubSubSpec
Owner kmeta.OwnerRefable
Topic string
ReceiveAdapter string
AdapterType string
ResourceGroup string
Mode pubsubv1alpha1.ModeType
}

// MakePullSubscription creates the spec for, but does not create, a GCP PullSubscription
// for a given GCS.
func MakePullSubscription(namespace, name string, spec *duckv1alpha1.PubSubSpec, owner kmeta.OwnerRefable, topic, receiveAdapterName, adapterType, resourceGroup string) *pubsubv1alpha1.PullSubscription {
func MakePullSubscription(args *PullSubscriptionArgs) *pubsubv1alpha1.PullSubscription {
labels := map[string]string{
"receive-adapter": receiveAdapterName,
"receive-adapter": args.ReceiveAdapter,
}

annotations := map[string]string{
"metrics-resource-group": resourceGroup,
"metrics-resource-group": args.ResourceGroup,
}

pubsubSecret := spec.Secret
if spec.PubSubSecret != nil {
pubsubSecret = spec.PubSubSecret
pubsubSecret := args.Spec.Secret
if args.Spec.PubSubSecret != nil {
pubsubSecret = args.Spec.PubSubSecret
}

ps := &pubsubv1alpha1.PullSubscription{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Name: args.Name,
Namespace: args.Namespace,
Labels: labels,
Annotations: annotations,
OwnerReferences: []metav1.OwnerReference{*kmeta.NewControllerRef(owner)},
OwnerReferences: []metav1.OwnerReference{*kmeta.NewControllerRef(args.Owner)},
},
Spec: pubsubv1alpha1.PullSubscriptionSpec{
Secret: pubsubSecret,
Project: spec.Project,
Topic: topic,
AdapterType: adapterType,
Project: args.Spec.Project,
Topic: args.Topic,
AdapterType: args.AdapterType,
Mode: args.Mode,
SourceSpec: duckv1.SourceSpec{
Sink: spec.Sink,
Sink: args.Spec.Sink,
},
},
}
if spec.CloudEventOverrides != nil && spec.CloudEventOverrides.Extensions != nil {
if args.Spec.CloudEventOverrides != nil && args.Spec.CloudEventOverrides.Extensions != nil {
ps.Spec.SourceSpec.CloudEventOverrides = &duckv1.CloudEventOverrides{
Extensions: spec.CloudEventOverrides.Extensions,
Extensions: args.Spec.CloudEventOverrides.Extensions,
}
}
return ps
Expand Down
12 changes: 11 additions & 1 deletion pkg/reconciler/pubsub/resources/pullsubscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,17 @@ func TestMakePullSubscription(t *testing.T) {
},
}

got := MakePullSubscription(source.Namespace, source.Name, &source.Spec.PubSubSpec, source, "topic-abc", "storage.events.cloud.google.com", "google.storage", "storages.events.cloud.google.com")
args := &PullSubscriptionArgs{
Namespace: source.Namespace,
Name: source.Name,
Spec: &source.Spec.PubSubSpec,
Owner: source,
Topic: "topic-abc",
ReceiveAdapter: "storage.events.cloud.google.com",
AdapterType: "google.storage",
ResourceGroup: "storages.events.cloud.google.com",
}
got := MakePullSubscription(args)

yes := true
want := &pubsubv1alpha1.PullSubscription{
Expand Down
6 changes: 6 additions & 0 deletions pkg/reconciler/testing/pullsubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,9 @@ func WithPullSubscriptionReadyStatus(status corev1.ConditionStatus, reason, mess
}}
}
}

func WithPullSubscriptionMode(mode v1alpha1.ModeType) PullSubscriptionOption {
return func(s *v1alpha1.PullSubscription) {
s.Spec.Mode = mode
}
}
35 changes: 28 additions & 7 deletions test/test_images/target/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
"os"
"strings"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go"
"github.com/google/knative-gcp/pkg/pubsub/adapter/converters"
"github.com/kelseyhightower/envconfig"
)

Expand Down Expand Up @@ -37,16 +38,36 @@ type Receiver struct {

func (r *Receiver) Receive(event cloudevents.Event) {
var target string
if err := event.ExtensionAs("target", &target); err != nil {
fmt.Println("failed to get target from extensions:", err)
return

// Try Pull first used by the PullSubscription.
err := event.ExtensionAs("target", &target)
if err != nil {
// If it fails, try Push format used by the CloudPubSubSource.
data, err := event.DataBytes()
if err != nil {
fmt.Println("failed to get data from event", err)
return
}
push := converters.PushMessage{}
if err := json.Unmarshal(data, &push); err != nil {
fmt.Println("failed to unmarshall PubMessage", err)
return
}

if tt, ok := push.Message.Attributes["target"]; !ok {
fmt.Println("failed to get target from attributes:", err)
return
} else {
target = tt
}
}

var success bool
if strings.HasPrefix(r.Target, target) {
fmt.Printf("Target prefix matched, %q.\n", r.Target)
if strings.Contains(r.Target, target) {
fmt.Printf("Target found, %q.\n", r.Target)
success = true
} else {
fmt.Printf("Target prefix did not match, %q != %q.\n", target, r.Target)
fmt.Printf("Target not found, got:%q, want:%q.\n", target, r.Target)
success = false
}
// Write the termination message.
Expand Down

0 comments on commit 9c6c2a6

Please sign in to comment.