diff --git a/binding/format/protobuf/v2/datacodec.go b/binding/format/protobuf/v2/datacodec.go new file mode 100644 index 000000000..1a72c66ba --- /dev/null +++ b/binding/format/protobuf/v2/datacodec.go @@ -0,0 +1,50 @@ +package format + +import ( + "context" + "fmt" + + "google.golang.org/protobuf/proto" + + "github.com/cloudevents/sdk-go/v2/event/datacodec" +) + +const ( + // ContentTypeProtobuf indicates that the data attribute is a protobuf + // message. + ContentTypeProtobuf = "application/protobuf" +) + +func init() { + datacodec.AddDecoder(ContentTypeProtobuf, DecodeData) + datacodec.AddEncoder(ContentTypeProtobuf, EncodeData) +} + +// DecodeData converts an encoded protobuf message back into the message (out). +// The message must be a type compatible with whatever was given to EncodeData. +func DecodeData(ctx context.Context, in []byte, out interface{}) error { + outmsg, ok := out.(proto.Message) + if !ok { + return fmt.Errorf("can only decode protobuf into proto.Message. got %T", out) + } + if err := proto.Unmarshal(in, outmsg); err != nil { + return fmt.Errorf("failed to unmarshal message: %s", err) + } + return nil +} + +// EncodeData a protobuf message to bytes. +// +// Like the official datacodec implementations, this one returns the given value +// as-is if it is already a byte slice. +func EncodeData(ctx context.Context, in interface{}) ([]byte, error) { + if b, ok := in.([]byte); ok { + return b, nil + } + var pbmsg proto.Message + var ok bool + if pbmsg, ok = in.(proto.Message); !ok { + return nil, fmt.Errorf("protobuf encoding only works with protobuf messages. got %T", in) + } + return proto.Marshal(pbmsg) +} diff --git a/binding/format/protobuf/v2/protobuf.go b/binding/format/protobuf/v2/protobuf.go index b43549286..4a0a0b29e 100644 --- a/binding/format/protobuf/v2/protobuf.go +++ b/binding/format/protobuf/v2/protobuf.go @@ -6,6 +6,7 @@ import ( stdtime "time" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/timestamppb" "github.com/cloudevents/sdk-go/v2/binding/format" @@ -30,9 +31,6 @@ var ( const ( ApplicationCloudEventsProtobuf = "application/cloudevents+protobuf" - // applicationProtobuf embedded here but unexported temporarily until it - // is moved to the datacodec implementation. - applicationProtobuf = "application/protobuf" ) // StringOfApplicationCloudEventsProtobuf returns a string pointer to @@ -103,22 +101,15 @@ func sdkToProto(e *event.Event) (*pb.CloudEvent, error) { container.Data = &pb.CloudEvent_BinaryData{ BinaryData: e.Data(), } - // NOTE: This is commented out until we add the data codec in a later patch - // or PR. Embedded here for illustration of how we will implement the - // requirement to use ProtoData IFF the data content type is protobuf. - // if e.DataContentType() == event.ApplicationProtobuf { - // anymsg := &anypb.Any{} - // if err := proto.Unmarshal(e.Data(), anymsg); err != nil { - // if e.DataSchema() == "" { - // return nil, fmt.Errorf("cannot encode direct protobuf message without dataschema. set dataschema to the appropriate protobuf type like type.googleapis.com/packge.v1.Type or make sure you are using the appropriate data content type %s", event.ApplicationProtobuf) - // } - // anymsg.TypeUrl = e.DataSchema() - // anymsg.Value = e.Data() - // } - // container.Data = &pb.CloudEvent_ProtoData{ - // ProtoData: anymsg, - // } - // } + if e.DataContentType() == ContentTypeProtobuf { + anymsg := &anypb.Any{ + TypeUrl: e.DataSchema(), + Value: e.Data(), + } + container.Data = &pb.CloudEvent_ProtoData{ + ProtoData: anymsg, + } + } return container, nil } @@ -242,9 +233,8 @@ func protoToSDK(container *pb.CloudEvent) (*event.Event, error) { return nil, fmt.Errorf("failed to convert text type (%s) data: %s", contentType, err) } case *pb.CloudEvent_ProtoData: - if err := e.SetData(applicationProtobuf, dt.ProtoData); err != nil { - return nil, fmt.Errorf("failed to convert protobuf type (%s) data: %s", contentType, err) - } + e.SetDataContentType(ContentTypeProtobuf) + e.DataEncoded = dt.ProtoData.Value } for name, value := range container.Attributes { v, err := valueFrom(value) diff --git a/binding/format/protobuf/v2/protobuf_test.go b/binding/format/protobuf/v2/protobuf_test.go index 4db60a75a..3ae319aeb 100644 --- a/binding/format/protobuf/v2/protobuf_test.go +++ b/binding/format/protobuf/v2/protobuf_test.go @@ -11,6 +11,7 @@ import ( "github.com/cloudevents/sdk-go/v2/types" format "github.com/cloudevents/sdk-go/binding/format/protobuf/v2" + pb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/internal/pb" ) func TestProtobufFormatWithoutProtobufCodec(t *testing.T) { @@ -42,3 +43,46 @@ func TestProtobufFormatWithoutProtobufCodec(t *testing.T) { require.NoError(format.Protobuf.Unmarshal(b, &e2)) require.Equal(e, e2) } + +func TestProtobufFormatWithProtobufCodec(t *testing.T) { + require := require.New(t) + const test = "test" + e := event.New() + e.SetID(test) + e.SetTime(stdtime.Date(2021, 1, 1, 1, 1, 1, 1, stdtime.UTC)) + e.SetExtension(test, test) + e.SetExtension("int", 1) + e.SetExtension("bool", true) + e.SetExtension("URI", &url.URL{ + Host: "test-uri", + }) + e.SetExtension("URIRef", types.URIRef{URL: url.URL{ + Host: "test-uriref", + }}) + e.SetExtension("bytes", []byte(test)) + e.SetExtension("timestamp", stdtime.Date(2021, 2, 1, 1, 1, 1, 1, stdtime.UTC)) + e.SetSubject(test) + e.SetSource(test) + e.SetType(test) + e.SetDataSchema(test) + + // Using the CloudEventAttributeValue because it is convenient and is an + // independent protobuf message. Any protobuf message would work but this + // one is already generated and included in the source. + payload := &pb.CloudEventAttributeValue{ + Attr: &pb.CloudEventAttributeValue_CeBoolean{ + CeBoolean: true, + }, + } + require.NoError(e.SetData(format.ContentTypeProtobuf, payload)) + + b, err := format.Protobuf.Marshal(&e) + require.NoError(err) + var e2 event.Event + require.NoError(format.Protobuf.Unmarshal(b, &e2)) + require.Equal(e, e2) + + payload2 := &pb.CloudEventAttributeValue{} + require.NoError(e2.DataAs(payload2)) + require.True(payload2.GetCeBoolean()) +}