Skip to content

Commit

Permalink
Add protobuf datacodec (#688) (#704)
Browse files Browse the repository at this point in the history
* Add protobuf datacodec

This adds the ability to send and receive protobuf encoded data within
the envelope.

Signed-off-by: Kevin Conway <kevinconway@invisionapp.com>

* Move pb.Any handling to protobuf format

This refactors the original protobuf datacodec so that it is generally
useful as a codec within other formats. Previously, the coded wrapped
and unwrapped every value in an Any container because this is required
when the protobuf codec and format are used together. Now, the format
handles wrapping the protobuf encoded binary data in the Any type.

Signed-off-by: Kevin Conway <kevinconway@invisionapp.com>

* Add init to register protobuf codec

Signed-off-by: Kevin Conway <kevinconway@invisionapp.com>

Co-authored-by: kconwayinvision <58523435+kconwayinvision@users.noreply.github.com>
  • Loading branch information
Scott Nichols and kconwayinvision authored Aug 13, 2021
1 parent 44de529 commit 79226a9
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 22 deletions.
50 changes: 50 additions & 0 deletions binding/format/protobuf/v2/datacodec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package format

import (
"context"
"fmt"

"google.golang.org/protobuf/proto"

"github.com/cloudevents/sdk-go/v2/event/datacodec"
)

const (
// ContentTypeProtobuf indicates that the data attribute is a protobuf
// message.
ContentTypeProtobuf = "application/protobuf"
)

func init() {
datacodec.AddDecoder(ContentTypeProtobuf, DecodeData)
datacodec.AddEncoder(ContentTypeProtobuf, EncodeData)
}

// DecodeData converts an encoded protobuf message back into the message (out).
// The message must be a type compatible with whatever was given to EncodeData.
func DecodeData(ctx context.Context, in []byte, out interface{}) error {
outmsg, ok := out.(proto.Message)
if !ok {
return fmt.Errorf("can only decode protobuf into proto.Message. got %T", out)
}
if err := proto.Unmarshal(in, outmsg); err != nil {
return fmt.Errorf("failed to unmarshal message: %s", err)
}
return nil
}

// EncodeData a protobuf message to bytes.
//
// Like the official datacodec implementations, this one returns the given value
// as-is if it is already a byte slice.
func EncodeData(ctx context.Context, in interface{}) ([]byte, error) {
if b, ok := in.([]byte); ok {
return b, nil
}
var pbmsg proto.Message
var ok bool
if pbmsg, ok = in.(proto.Message); !ok {
return nil, fmt.Errorf("protobuf encoding only works with protobuf messages. got %T", in)
}
return proto.Marshal(pbmsg)
}
34 changes: 12 additions & 22 deletions binding/format/protobuf/v2/protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
stdtime "time"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/cloudevents/sdk-go/v2/binding/format"
Expand All @@ -30,9 +31,6 @@ var (

const (
ApplicationCloudEventsProtobuf = "application/cloudevents+protobuf"
// applicationProtobuf embedded here but unexported temporarily until it
// is moved to the datacodec implementation.
applicationProtobuf = "application/protobuf"
)

// StringOfApplicationCloudEventsProtobuf returns a string pointer to
Expand Down Expand Up @@ -103,22 +101,15 @@ func sdkToProto(e *event.Event) (*pb.CloudEvent, error) {
container.Data = &pb.CloudEvent_BinaryData{
BinaryData: e.Data(),
}
// NOTE: This is commented out until we add the data codec in a later patch
// or PR. Embedded here for illustration of how we will implement the
// requirement to use ProtoData IFF the data content type is protobuf.
// if e.DataContentType() == event.ApplicationProtobuf {
// anymsg := &anypb.Any{}
// if err := proto.Unmarshal(e.Data(), anymsg); err != nil {
// if e.DataSchema() == "" {
// return nil, fmt.Errorf("cannot encode direct protobuf message without dataschema. set dataschema to the appropriate protobuf type like type.googleapis.com/packge.v1.Type or make sure you are using the appropriate data content type %s", event.ApplicationProtobuf)
// }
// anymsg.TypeUrl = e.DataSchema()
// anymsg.Value = e.Data()
// }
// container.Data = &pb.CloudEvent_ProtoData{
// ProtoData: anymsg,
// }
// }
if e.DataContentType() == ContentTypeProtobuf {
anymsg := &anypb.Any{
TypeUrl: e.DataSchema(),
Value: e.Data(),
}
container.Data = &pb.CloudEvent_ProtoData{
ProtoData: anymsg,
}
}
return container, nil
}

Expand Down Expand Up @@ -242,9 +233,8 @@ func protoToSDK(container *pb.CloudEvent) (*event.Event, error) {
return nil, fmt.Errorf("failed to convert text type (%s) data: %s", contentType, err)
}
case *pb.CloudEvent_ProtoData:
if err := e.SetData(applicationProtobuf, dt.ProtoData); err != nil {
return nil, fmt.Errorf("failed to convert protobuf type (%s) data: %s", contentType, err)
}
e.SetDataContentType(ContentTypeProtobuf)
e.DataEncoded = dt.ProtoData.Value
}
for name, value := range container.Attributes {
v, err := valueFrom(value)
Expand Down
44 changes: 44 additions & 0 deletions binding/format/protobuf/v2/protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cloudevents/sdk-go/v2/types"

format "github.com/cloudevents/sdk-go/binding/format/protobuf/v2"
pb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/internal/pb"
)

func TestProtobufFormatWithoutProtobufCodec(t *testing.T) {
Expand Down Expand Up @@ -42,3 +43,46 @@ func TestProtobufFormatWithoutProtobufCodec(t *testing.T) {
require.NoError(format.Protobuf.Unmarshal(b, &e2))
require.Equal(e, e2)
}

func TestProtobufFormatWithProtobufCodec(t *testing.T) {
require := require.New(t)
const test = "test"
e := event.New()
e.SetID(test)
e.SetTime(stdtime.Date(2021, 1, 1, 1, 1, 1, 1, stdtime.UTC))
e.SetExtension(test, test)
e.SetExtension("int", 1)
e.SetExtension("bool", true)
e.SetExtension("URI", &url.URL{
Host: "test-uri",
})
e.SetExtension("URIRef", types.URIRef{URL: url.URL{
Host: "test-uriref",
}})
e.SetExtension("bytes", []byte(test))
e.SetExtension("timestamp", stdtime.Date(2021, 2, 1, 1, 1, 1, 1, stdtime.UTC))
e.SetSubject(test)
e.SetSource(test)
e.SetType(test)
e.SetDataSchema(test)

// Using the CloudEventAttributeValue because it is convenient and is an
// independent protobuf message. Any protobuf message would work but this
// one is already generated and included in the source.
payload := &pb.CloudEventAttributeValue{
Attr: &pb.CloudEventAttributeValue_CeBoolean{
CeBoolean: true,
},
}
require.NoError(e.SetData(format.ContentTypeProtobuf, payload))

b, err := format.Protobuf.Marshal(&e)
require.NoError(err)
var e2 event.Event
require.NoError(format.Protobuf.Unmarshal(b, &e2))
require.Equal(e, e2)

payload2 := &pb.CloudEventAttributeValue{}
require.NoError(e2.DataAs(payload2))
require.True(payload2.GetCeBoolean())
}

0 comments on commit 79226a9

Please sign in to comment.