From df60349ee9a350fc07486167ca8fbdb896cdcf6d Mon Sep 17 00:00:00 2001 From: Chen Shou Date: Tue, 24 Mar 2020 20:53:51 +0000 Subject: [PATCH 1/6] Add event wrapper to label/delabel events --- Gopkg.lock | 7 + pkg/broker/config/targets.pb.go | 69 +++--- pkg/broker/config/targets.proto | 3 + pkg/broker/eventutil/label.go | 74 ++++++ pkg/broker/eventutil/label_test.go | 48 ++++ .../github.com/cloudevents/sdk-go/v2/alias.go | 145 ++++++++++++ .../cloudevents/sdk-go/v2/client/client.go | 213 ++++++++++++++++++ .../sdk-go/v2/client/client_default.go | 26 +++ .../sdk-go/v2/client/client_observed.go | 99 ++++++++ .../sdk-go/v2/client/defaulters.go | 52 +++++ .../cloudevents/sdk-go/v2/client/doc.go | 6 + .../sdk-go/v2/client/http_receiver.go | 39 ++++ .../cloudevents/sdk-go/v2/client/invoker.go | 82 +++++++ .../sdk-go/v2/client/observability.go | 94 ++++++++ .../cloudevents/sdk-go/v2/client/options.go | 73 ++++++ .../cloudevents/sdk-go/v2/client/receiver.go | 189 ++++++++++++++++ .../distributed_tracing_extension.go | 126 +++++++++++ 17 files changed, 1315 insertions(+), 30 deletions(-) create mode 100644 pkg/broker/eventutil/label.go create mode 100644 pkg/broker/eventutil/label_test.go create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/alias.go create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/client/client.go create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/client/client_default.go create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/client/client_observed.go create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/client/defaulters.go create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/client/doc.go create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/client/http_receiver.go create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/client/observability.go create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/client/options.go create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/client/receiver.go create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/extensions/distributed_tracing_extension.go diff --git a/Gopkg.lock b/Gopkg.lock index d540fb33e5..b5422a55d0 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -163,16 +163,19 @@ "v1/cloudevents/transport/pubsub/context", "v1/cloudevents/transport/pubsub/internal", "v1/cloudevents/types", + "v2", "v2/binding", "v2/binding/format", "v2/binding/spec", "v2/binding/transformer", + "v2/client", "v2/context", "v2/event", "v2/event/datacodec", "v2/event/datacodec/json", "v2/event/datacodec/text", "v2/event/datacodec/xml", + "v2/extensions", "v2/observability", "v2/protocol", "v2/protocol/http", @@ -1707,7 +1710,11 @@ "github.com/cloudevents/sdk-go/v1/cloudevents/transport/http", "github.com/cloudevents/sdk-go/v1/cloudevents/transport/pubsub", "github.com/cloudevents/sdk-go/v1/cloudevents/transport/pubsub/context", +<<<<<<< HEAD "github.com/cloudevents/sdk-go/v2/protocol/http", +======= + "github.com/cloudevents/sdk-go/v2", +>>>>>>> Add event wrapper to label/delabel events "github.com/fsnotify/fsnotify", "github.com/golang/protobuf/jsonpb", "github.com/golang/protobuf/proto", diff --git a/pkg/broker/config/targets.pb.go b/pkg/broker/config/targets.pb.go index 20917bc918..ad38cc7c23 100644 --- a/pkg/broker/config/targets.pb.go +++ b/pkg/broker/config/targets.pb.go @@ -5,9 +5,8 @@ package config import ( fmt "fmt" - math "math" - proto "github.com/golang/protobuf/proto" + math "math" ) // Reference imports to suppress errors if they are not otherwise used. @@ -64,7 +63,9 @@ type Target struct { // The Pubsub topic name for retrying the events. RetryTopic string `protobuf:"bytes,7,opt,name=retry_topic,json=retryTopic,proto3" json:"retry_topic,omitempty"` // The Pubsub subscription name for retrying the events. - RetrySubscription string `protobuf:"bytes,8,opt,name=retry_subscription,json=retrySubscription,proto3" json:"retry_subscription,omitempty"` + RetrySubscription string `protobuf:"bytes,8,opt,name=retry_subscription,json=retrySubscription,proto3" json:"retry_subscription,omitempty"` + // The broker name that the trigger is referencing. + Broker string `protobuf:"bytes,9,opt,name=broker,proto3" json:"broker,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -151,6 +152,13 @@ func (m *Target) GetRetrySubscription() string { return "" } +func (m *Target) GetBroker() string { + if m != nil { + return m.Broker + } + return "" +} + // NamespacedTargets is the collection of targets grouped by namespaces. type NamespacedTargets struct { Names map[string]*Target `protobuf:"bytes,1,rep,name=names,proto3" json:"names,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` @@ -246,31 +254,32 @@ func init() { } var fileDescriptor_4009e2e15debba2c = []byte{ - // 403 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x93, 0x51, 0x8b, 0x9b, 0x40, - 0x14, 0x85, 0x3b, 0xba, 0xba, 0xcd, 0x95, 0x58, 0x73, 0xd9, 0xc2, 0x34, 0x14, 0x36, 0x84, 0x5d, - 0x08, 0x85, 0x5a, 0x48, 0x5f, 0xca, 0xbe, 0x2d, 0xdb, 0x94, 0x42, 0xc1, 0x52, 0x93, 0xd0, 0xf6, - 0x29, 0xa8, 0x99, 0x84, 0xa1, 0xa9, 0xca, 0x38, 0x16, 0xf2, 0x53, 0xfa, 0x07, 0xfa, 0xdc, 0x9f, - 0x58, 0x9c, 0x31, 0x51, 0xd3, 0xec, 0x93, 0xe3, 0x39, 0xe7, 0x5e, 0x3f, 0x0f, 0x0a, 0x7d, 0x19, - 0x89, 0x2d, 0x93, 0x85, 0x9f, 0x8b, 0x4c, 0x66, 0x68, 0x27, 0x59, 0xba, 0xe1, 0xdb, 0xf1, 0x1f, - 0x13, 0xec, 0x85, 0x72, 0xd0, 0x05, 0x83, 0xaf, 0x29, 0x19, 0x91, 0x49, 0x2f, 0x34, 0xf8, 0x1a, - 0x11, 0x2e, 0xd2, 0xe8, 0x27, 0xa3, 0x86, 0x52, 0xd4, 0x19, 0x5f, 0x42, 0xaf, 0xba, 0x16, 0x79, - 0x94, 0x30, 0x6a, 0x2a, 0xa3, 0x11, 0xf0, 0x16, 0xdc, 0xa2, 0x8c, 0x8b, 0x44, 0xf0, 0x98, 0x89, - 0x55, 0x29, 0x38, 0xbd, 0x50, 0x91, 0x7e, 0xa3, 0x2e, 0x05, 0xc7, 0x2f, 0x30, 0xd8, 0xf0, 0x9d, - 0x64, 0x62, 0x15, 0x49, 0x29, 0x78, 0x5c, 0x4a, 0x56, 0x50, 0x6b, 0x64, 0x4e, 0x9c, 0xe9, 0x8d, - 0xaf, 0xb9, 0x7c, 0xcd, 0xe4, 0x7f, 0x50, 0xb9, 0xfb, 0x63, 0x6c, 0x96, 0x4a, 0xb1, 0x0f, 0xbd, - 0xcd, 0x89, 0x8c, 0xaf, 0xc0, 0x2a, 0x64, 0x24, 0x19, 0xb5, 0x47, 0x64, 0xe2, 0x4e, 0xaf, 0x4e, - 0xd6, 0xcc, 0x2b, 0x2f, 0xd4, 0x11, 0xbc, 0x06, 0x47, 0x30, 0x29, 0xf6, 0x2b, 0x99, 0xe5, 0x3c, - 0xa1, 0x97, 0x0a, 0x11, 0x94, 0xb4, 0xa8, 0x14, 0x7c, 0x0d, 0xa8, 0x03, 0x35, 0x76, 0x2e, 0x79, - 0x96, 0xd2, 0xa7, 0x2a, 0x37, 0x50, 0xce, 0xbc, 0x65, 0x0c, 0x1f, 0xe0, 0xf9, 0x59, 0x4c, 0xf4, - 0xc0, 0xfc, 0xc1, 0xf6, 0x75, 0xa3, 0xd5, 0x11, 0xaf, 0xc0, 0xfa, 0x15, 0xed, 0xca, 0x43, 0xa7, - 0xfa, 0xe6, 0xce, 0x78, 0x47, 0xc6, 0xd7, 0x60, 0x29, 0x48, 0x74, 0xe0, 0x72, 0x19, 0x7c, 0x0a, - 0x3e, 0x7f, 0x0d, 0xbc, 0x27, 0xd8, 0x03, 0x2b, 0x9c, 0xdd, 0xbf, 0xff, 0xee, 0x91, 0xf1, 0x6f, - 0x02, 0x83, 0xe0, 0xd0, 0xf4, 0x5a, 0xbf, 0x57, 0x81, 0x77, 0x60, 0xa9, 0xfa, 0x29, 0xe9, 0xd6, - 0xf7, 0x5f, 0x52, 0x2b, 0xba, 0x3e, 0x3d, 0x32, 0xfc, 0x08, 0xd0, 0x88, 0x67, 0x60, 0x6f, 0xda, - 0xb0, 0xce, 0xd4, 0xed, 0x76, 0xda, 0x86, 0xff, 0x4b, 0xa0, 0x5f, 0x3f, 0xe7, 0x41, 0x65, 0x70, - 0x06, 0x70, 0xfc, 0x2c, 0x0e, 0x70, 0xb7, 0xdd, 0x05, 0x75, 0xb4, 0x41, 0xad, 0xe9, 0x5a, 0x83, - 0xc3, 0x6f, 0xf0, 0xec, 0xc4, 0x3e, 0xc3, 0xf9, 0xa6, 0xcb, 0xf9, 0xe2, 0xd1, 0x0e, 0x5a, 0xc8, - 0xb1, 0xad, 0x7e, 0x83, 0xb7, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0xe3, 0xda, 0x5a, 0xad, 0x17, - 0x03, 0x00, 0x00, + // 417 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x93, 0xd1, 0x8a, 0xd3, 0x40, + 0x18, 0x85, 0x9d, 0x64, 0x93, 0x35, 0x7f, 0x68, 0x4c, 0x7f, 0x56, 0x19, 0x8b, 0xb0, 0xa5, 0xec, + 0x42, 0x11, 0x8c, 0x50, 0x6f, 0x64, 0xef, 0x96, 0xb5, 0x22, 0x08, 0x11, 0xb3, 0xbb, 0xa8, 0x57, + 0x25, 0x49, 0xa7, 0xcb, 0xd0, 0x9a, 0x84, 0xc9, 0x44, 0xe8, 0xa3, 0xf8, 0x16, 0x3e, 0x81, 0xcf, + 0x26, 0x99, 0x49, 0x9b, 0xa4, 0x76, 0xaf, 0x32, 0x73, 0xce, 0x99, 0xc9, 0x97, 0x33, 0x19, 0x18, + 0xc8, 0x58, 0x3c, 0x30, 0x59, 0x06, 0x85, 0xc8, 0x65, 0x8e, 0x76, 0x9a, 0x67, 0x2b, 0xfe, 0x30, + 0xf9, 0x6b, 0x82, 0x7d, 0xa7, 0x1c, 0xf4, 0xc0, 0xe0, 0x4b, 0x4a, 0xc6, 0x64, 0xea, 0x44, 0x06, + 0x5f, 0x22, 0xc2, 0x49, 0x16, 0xff, 0x64, 0xd4, 0x50, 0x8a, 0x1a, 0xe3, 0x2b, 0x70, 0xea, 0x67, + 0x59, 0xc4, 0x29, 0xa3, 0xa6, 0x32, 0x5a, 0x01, 0x2f, 0xc1, 0x2b, 0xab, 0xa4, 0x4c, 0x05, 0x4f, + 0x98, 0x58, 0x54, 0x82, 0xd3, 0x13, 0x15, 0x19, 0xb4, 0xea, 0xbd, 0xe0, 0xf8, 0x15, 0x86, 0x2b, + 0xbe, 0x91, 0x4c, 0x2c, 0x62, 0x29, 0x05, 0x4f, 0x2a, 0xc9, 0x4a, 0x6a, 0x8d, 0xcd, 0xa9, 0x3b, + 0xbb, 0x08, 0x34, 0x57, 0xa0, 0x99, 0x82, 0x8f, 0x2a, 0x77, 0xbd, 0x8f, 0xcd, 0x33, 0x29, 0xb6, + 0x91, 0xbf, 0x3a, 0x90, 0xf1, 0x35, 0x58, 0xa5, 0x8c, 0x25, 0xa3, 0xf6, 0x98, 0x4c, 0xbd, 0xd9, + 0xd9, 0xc1, 0x36, 0xb7, 0xb5, 0x17, 0xe9, 0x08, 0x9e, 0x83, 0x2b, 0x98, 0x14, 0xdb, 0x85, 0xcc, + 0x0b, 0x9e, 0xd2, 0x53, 0x85, 0x08, 0x4a, 0xba, 0xab, 0x15, 0x7c, 0x03, 0xa8, 0x03, 0x0d, 0x76, + 0x21, 0x79, 0x9e, 0xd1, 0xa7, 0x2a, 0x37, 0x54, 0xce, 0x6d, 0xc7, 0xc0, 0x17, 0x60, 0x27, 0x22, + 0x5f, 0x33, 0x41, 0x1d, 0x15, 0x69, 0x66, 0xa3, 0x1b, 0x78, 0x7e, 0x14, 0x1f, 0x7d, 0x30, 0xd7, + 0x6c, 0xdb, 0x34, 0x5d, 0x0f, 0xf1, 0x0c, 0xac, 0x5f, 0xf1, 0xa6, 0xda, 0x75, 0xad, 0x27, 0x57, + 0xc6, 0x7b, 0x32, 0x39, 0x07, 0x4b, 0xc1, 0xa3, 0x0b, 0xa7, 0xf7, 0xe1, 0xe7, 0xf0, 0xcb, 0xb7, + 0xd0, 0x7f, 0x82, 0x0e, 0x58, 0xd1, 0xfc, 0xfa, 0xc3, 0x0f, 0x9f, 0x4c, 0x7e, 0x13, 0x18, 0x86, + 0xbb, 0x13, 0x58, 0xea, 0xef, 0x2d, 0xf1, 0x0a, 0x2c, 0x75, 0x2c, 0x94, 0xf4, 0x6b, 0xfd, 0x2f, + 0xa9, 0x15, 0x5d, 0xab, 0x5e, 0x32, 0xfa, 0x04, 0xd0, 0x8a, 0x47, 0x60, 0x2f, 0xba, 0xb0, 0xee, + 0xcc, 0xeb, 0x77, 0xdd, 0x85, 0xff, 0x43, 0x60, 0xd0, 0xbc, 0xe7, 0x46, 0x65, 0x70, 0x0e, 0xb0, + 0xff, 0x5d, 0x76, 0x70, 0x97, 0xfd, 0x0d, 0x9a, 0x68, 0x8b, 0xda, 0xd0, 0x75, 0x16, 0x8e, 0xbe, + 0xc3, 0xb3, 0x03, 0xfb, 0x08, 0xe7, 0xdb, 0x3e, 0xe7, 0xcb, 0x47, 0x3b, 0xe8, 0x20, 0x27, 0xb6, + 0xba, 0x1e, 0xef, 0xfe, 0x05, 0x00, 0x00, 0xff, 0xff, 0x43, 0x33, 0x03, 0x5d, 0x2f, 0x03, 0x00, + 0x00, } diff --git a/pkg/broker/config/targets.proto b/pkg/broker/config/targets.proto index 1f54903302..25d22a5e93 100644 --- a/pkg/broker/config/targets.proto +++ b/pkg/broker/config/targets.proto @@ -47,6 +47,9 @@ message Target { // The Pubsub subscription name for retrying the events. string retry_subscription = 8; + + // The broker name that the trigger is referencing. + string broker = 9; } // NamespacedTargets is the collection of targets grouped by namespaces. diff --git a/pkg/broker/eventutil/label.go b/pkg/broker/eventutil/label.go new file mode 100644 index 0000000000..58b7d398af --- /dev/null +++ b/pkg/broker/eventutil/label.go @@ -0,0 +1,74 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventutil + +import ( + "strings" + + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +const ( + // LabelPrefix is the prefix for label keys. + LabelPrefix = "kgcp" +) + +// LabeledEvent is a wrapper of a cloudevent +// that allows labeling the event. +type LabeledEvent struct { + event *cloudevents.Event +} + +// NewLabeledEvent creates a new LabeledEvent. +func NewLabeledEvent(e *cloudevents.Event) *LabeledEvent { + return &LabeledEvent{event: e} +} + +// WithLabel attaches a label to the event as an extension. +func (le *LabeledEvent) WithLabel(key, value string) *LabeledEvent { + le.event.SetExtension(LabelPrefix+key, value) + return le +} + +// GetLabels gets all the labels as a map. +func (le *LabeledEvent) GetLabels() map[string]string { + m := make(map[string]string) + exts := le.event.Extensions() + for k, v := range exts { + if strings.HasPrefix(strings.ToLower(k), LabelPrefix) { + m[k] = v.(string) + } + } + return m +} + +// Event returns the LabeledEvent as a cloudevent. +func (le *LabeledEvent) Event() *cloudevents.Event { + return le.event +} + +// Delabeled returns the cloudevent without labels. +func (le *LabeledEvent) Delabeled() *cloudevents.Event { + e := le.event.Clone() + for k := range e.Extensions() { + if strings.HasPrefix(strings.ToLower(k), LabelPrefix) { + // delete(e.Extensions(), k) + e.SetExtension(k, nil) + } + } + return &e +} diff --git a/pkg/broker/eventutil/label_test.go b/pkg/broker/eventutil/label_test.go new file mode 100644 index 0000000000..b6650e19e7 --- /dev/null +++ b/pkg/broker/eventutil/label_test.go @@ -0,0 +1,48 @@ +/* +Copyright 2020 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventutil + +import ( + "testing" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/go-cmp/cmp" +) + +func TestLabeledEevent(t *testing.T) { + e := cloudevents.NewEvent() + e.SetSource("example/uri") + e.SetType("example.type") + e.SetData(cloudevents.ApplicationJSON, map[string]string{"hello": "world"}) + e.SetExtension("custom", "foo") + + wantDelabeled := e.Clone() + wantLabeled := e.Clone() + wantLabeled.SetExtension("kgcplabel1", "val1") + wantLabeled.SetExtension("kgcplabel2", "val2") + + le := NewLabeledEvent(&e).WithLabel("label1", "val1").WithLabel("label2", "val2") + + gotDelabeled := le.Delabeled() + if diff := cmp.Diff(&wantDelabeled, gotDelabeled); diff != "" { + t.Errorf("LabeledEvent.Delabeled() (-want,+got): %v", diff) + } + gotLabeled := le.Event() + if diff := cmp.Diff(&wantLabeled, gotLabeled); diff != "" { + t.Errorf("LabeledEvent.Event() (-want,+got): %v", diff) + } +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/alias.go b/vendor/github.com/cloudevents/sdk-go/v2/alias.go new file mode 100644 index 0000000000..6d15c4e1ab --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/alias.go @@ -0,0 +1,145 @@ +package v2 + +// Package cloudevents alias' common functions and types to improve discoverability and reduce +// the number of imports for simple HTTP clients. + +import ( + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/client" + "github.com/cloudevents/sdk-go/v2/context" + "github.com/cloudevents/sdk-go/v2/event" + "github.com/cloudevents/sdk-go/v2/observability" + "github.com/cloudevents/sdk-go/v2/protocol" + "github.com/cloudevents/sdk-go/v2/protocol/http" + "github.com/cloudevents/sdk-go/v2/types" +) + +// Client + +type ClientOption client.Option +type Client = client.Client + +// Event + +type Event = event.Event + +// Context + +type EventContext = event.EventContext +type EventContextV1 = event.EventContextV1 +type EventContextV03 = event.EventContextV03 + +// Custom Types + +type Timestamp = types.Timestamp +type URIRef = types.URIRef + +// HTTP Protocol + +type HTTPOption http.Option + +type HTTPProtocol = http.Protocol + +// Encoding + +type Encoding = binding.Encoding + +// Message + +type Message = binding.Message + +const ( + // ReadEncoding + + ApplicationXML = event.ApplicationXML + ApplicationJSON = event.ApplicationJSON + TextPlain = event.TextPlain + ApplicationCloudEventsJSON = event.ApplicationCloudEventsJSON + ApplicationCloudEventsBatchJSON = event.ApplicationCloudEventsBatchJSON + Base64 = event.Base64 + + // Event Versions + + VersionV1 = event.CloudEventsVersionV1 + VersionV03 = event.CloudEventsVersionV03 + + // Encoding + + EncodingBinary = binding.EncodingBinary + EncodingStructured = binding.EncodingStructured +) + +var ( + + // ContentType Helpers + + StringOfApplicationJSON = event.StringOfApplicationJSON + StringOfApplicationXML = event.StringOfApplicationXML + StringOfTextPlain = event.StringOfTextPlain + StringOfApplicationCloudEventsJSON = event.StringOfApplicationCloudEventsJSON + StringOfApplicationCloudEventsBatchJSON = event.StringOfApplicationCloudEventsBatchJSON + StringOfBase64 = event.StringOfBase64 + + // Client Creation + + NewClient = client.New + NewClientObserved = client.NewObserved + NewDefaultClient = client.NewDefault + NewHTTPReceiveHandler = client.NewHTTPReceiveHandler + + // Client Options + + WithEventDefaulter = client.WithEventDefaulter + WithUUIDs = client.WithUUIDs + WithTimeNow = client.WithTimeNow + WithTracePropagation = client.WithTracePropagation() + + // Event Creation + + NewEvent = event.New + NewResult = protocol.NewResult + + NewHTTPResult = http.NewResult + + // Message Creation + + ToMessage = binding.ToMessage + + // HTTP Messages + + WriteHTTPRequest = http.WriteRequest + + // Tracing + + EnableTracing = observability.EnableTracing + + // Context + + ContextWithTarget = context.WithTarget + TargetFromContext = context.TargetFrom + WithEncodingBinary = binding.WithForceBinary + WithEncodingStructured = binding.WithForceStructured + + // Custom Types + + ParseTimestamp = types.ParseTimestamp + ParseURIRef = types.ParseURIRef + ParseURI = types.ParseURI + + // HTTP Protocol + + NewHTTP = http.New + + // HTTP Protocol Options + + WithTarget = http.WithTarget + WithHeader = http.WithHeader + WithShutdownTimeout = http.WithShutdownTimeout + //WithEncoding = http.WithEncoding + //WithStructuredEncoding = http.WithStructuredEncoding // TODO: expose new way + WithPort = http.WithPort + WithPath = http.WithPath + WithMiddleware = http.WithMiddleware + WithListener = http.WithListener + WithRoundTripper = http.WithRoundTripper +) diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/client.go b/vendor/github.com/cloudevents/sdk-go/v2/client/client.go new file mode 100644 index 0000000000..8b0a9d1527 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/client.go @@ -0,0 +1,213 @@ +package client + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + + "go.uber.org/zap" + + "github.com/cloudevents/sdk-go/v2/binding" + cecontext "github.com/cloudevents/sdk-go/v2/context" + "github.com/cloudevents/sdk-go/v2/event" + "github.com/cloudevents/sdk-go/v2/protocol" +) + +// Client interface defines the runtime contract the CloudEvents client supports. +type Client interface { + // Send will transmit the given event over the client's configured transport. + Send(ctx context.Context, event event.Event) error + + // Request will transmit the given event over the client's configured + // transport and return any response event. + Request(ctx context.Context, event event.Event) (*event.Event, error) + + // StartReceiver will register the provided function for callback on receipt + // of a cloudevent. It will also start the underlying protocol as it has + // been configured. + // This call is blocking. + // Valid fn signatures are: + // * func() + // * func() error + // * func(context.Context) + // * func(context.Context) protocol.Result + // * func(event.Event) + // * func(event.Event) protocol.Result + // * func(context.Context, event.Event) + // * func(context.Context, event.Event) protocol.Result + // * func(event.Event) *event.Event + // * func(event.Event) (*event.Event, protocol.Result) + // * func(context.Context, event.Event) *event.Event + // * func(context.Context, event.Event) (*event.Event, protocol.Result) + StartReceiver(ctx context.Context, fn interface{}) error +} + +// New produces a new client with the provided transport object and applied +// client options. +func New(obj interface{}, opts ...Option) (Client, error) { + c := &ceClient{} + + if p, ok := obj.(protocol.Sender); ok { + c.sender = p + } + if p, ok := obj.(protocol.Requester); ok { + c.requester = p + } + if p, ok := obj.(protocol.Responder); ok { + c.responder = p + } + if p, ok := obj.(protocol.Receiver); ok { + c.receiver = p + } + if p, ok := obj.(protocol.Opener); ok { + c.opener = p + } + + if err := c.applyOptions(opts...); err != nil { + return nil, err + } + return c, nil +} + +type ceClient struct { + sender protocol.Sender + requester protocol.Requester + receiver protocol.Receiver + responder protocol.Responder + // Optional. + opener protocol.Opener + + outboundContextDecorators []func(context.Context) context.Context + invoker Invoker + receiverMu sync.Mutex + eventDefaulterFns []EventDefaulter +} + +func (c *ceClient) applyOptions(opts ...Option) error { + for _, fn := range opts { + if err := fn(c); err != nil { + return err + } + } + return nil +} + +func (c *ceClient) Send(ctx context.Context, e event.Event) error { + if c.sender == nil { + return errors.New("sender not set") + } + + for _, f := range c.outboundContextDecorators { + ctx = f(ctx) + } + + if len(c.eventDefaulterFns) > 0 { + for _, fn := range c.eventDefaulterFns { + e = fn(ctx, e) + } + } + + if err := e.Validate(); err != nil { + return err + } + + return c.sender.Send(ctx, (*binding.EventMessage)(&e)) +} + +func (c *ceClient) Request(ctx context.Context, e event.Event) (*event.Event, error) { + if c.requester == nil { + return nil, errors.New("requester not set") + } + for _, f := range c.outboundContextDecorators { + ctx = f(ctx) + } + + if len(c.eventDefaulterFns) > 0 { + for _, fn := range c.eventDefaulterFns { + e = fn(ctx, e) + } + } + + if err := e.Validate(); err != nil { + return nil, err + } + + // If provided a requester, use it to do request/response. + var resp *event.Event + msg, err := c.requester.Request(ctx, (*binding.EventMessage)(&e)) + defer func() { + if err := msg.Finish(err); err != nil { + cecontext.LoggerFrom(ctx).Warnw("failed calling message.Finish", zap.Error(err)) + } + }() + if err == nil { + fmt.Printf("%#v", msg) + if rs, err := binding.ToEvent(ctx, msg); err != nil { + cecontext.LoggerFrom(ctx).Infow("failed calling ToEvent", zap.Error(err), zap.Any("resp", msg)) + } else { + resp = rs + } + } + return resp, err +} + +// StartReceiver sets up the given fn to handle Receive. +// See Client.StartReceiver for details. This is a blocking call. +func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error { + c.receiverMu.Lock() + defer c.receiverMu.Unlock() + + if c.invoker != nil { + return fmt.Errorf("client already has a receiver") + } + + invoker, err := newReceiveInvoker(fn, c.eventDefaulterFns...) // TODO: this will have to pick between a observed invoker or not. + if err != nil { + return err + } + if invoker.IsReceiver() && c.receiver == nil { + return fmt.Errorf("mismatched receiver callback without protocol.Receiver supported by protocol") + } + if invoker.IsResponder() && c.responder == nil { + return fmt.Errorf("mismatched receiver callback without protocol.Responder supported by protocol") + } + c.invoker = invoker + + defer func() { + c.invoker = nil + }() + + // Start the opener, if set. + if c.opener != nil { + go func() { + // TODO: handle error correctly here. + if err := c.opener.OpenInbound(ctx); err != nil { + panic(err) + } + }() + } + + var msg binding.Message + var respFn protocol.ResponseFn + // Start Polling. + for { + if c.responder != nil { + msg, respFn, err = c.responder.Respond(ctx) + } else if c.receiver != nil { + msg, err = c.receiver.Receive(ctx) + } else { + return errors.New("responder and receiver not set") + } + + if err == io.EOF { // Normal close + return nil + } else if err != nil { + return err + } + if err := c.invoker.Invoke(ctx, msg, respFn); err != nil { + return err + } + } +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/client_default.go b/vendor/github.com/cloudevents/sdk-go/v2/client/client_default.go new file mode 100644 index 0000000000..82877d6791 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/client_default.go @@ -0,0 +1,26 @@ +package client + +import ( + "github.com/cloudevents/sdk-go/v2/protocol/http" +) + +// NewDefault provides the good defaults for the common case using an HTTP +// Protocol client. The http transport has had WithBinaryEncoding http +// transport option applied to it. The client will always send Binary +// encoding but will inspect the outbound event context and match the version. +// The WithTimeNow, and WithUUIDs client options are also applied to the +// client, all outbound events will have a time and id set if not already +// present. +func NewDefault() (Client, error) { + p, err := http.New() + if err != nil { + return nil, err + } + + c, err := NewObserved(p, WithTimeNow(), WithUUIDs()) + if err != nil { + return nil, err + } + + return c, nil +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/client_observed.go b/vendor/github.com/cloudevents/sdk-go/v2/client/client_observed.go new file mode 100644 index 0000000000..583b9dc2b2 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/client_observed.go @@ -0,0 +1,99 @@ +package client + +import ( + "context" + "github.com/cloudevents/sdk-go/v2/event" + "github.com/cloudevents/sdk-go/v2/extensions" + "github.com/cloudevents/sdk-go/v2/observability" + "go.opencensus.io/trace" +) + +// New produces a new client with the provided transport object and applied +// client options. +func NewObserved(protocol interface{}, opts ...Option) (Client, error) { + client, err := New(protocol, opts...) + if err != nil { + return nil, err + } + + c := &obsClient{client: client} + + if err := c.applyOptions(opts...); err != nil { + return nil, err + } + return c, nil +} + +type obsClient struct { + client Client + + addTracing bool +} + +func (c *obsClient) applyOptions(opts ...Option) error { + for _, fn := range opts { + if err := fn(c); err != nil { + return err + } + } + return nil +} + +// Send transmits the provided event on a preconfigured Protocol. Send returns +// an error if there was an an issue validating the outbound event or the +// transport returns an error. +func (c *obsClient) Send(ctx context.Context, e event.Event) error { + ctx, r := observability.NewReporter(ctx, reportSend) + ctx, span := trace.StartSpan(ctx, observability.ClientSpanName, trace.WithSpanKind(trace.SpanKindClient)) + defer span.End() + if span.IsRecordingEvents() { + span.AddAttributes(EventTraceAttributes(&e)...) + } + + if c.addTracing { + e.Context = e.Context.Clone() + extensions.FromSpanContext(span.SpanContext()).AddTracingAttributes(&e) + } + + err := c.client.Send(ctx, e) + + if err != nil { + r.Error() + } else { + r.OK() + } + return err +} + +func (c *obsClient) Request(ctx context.Context, e event.Event) (*event.Event, error) { + ctx, r := observability.NewReporter(ctx, reportRequest) + ctx, span := trace.StartSpan(ctx, observability.ClientSpanName, trace.WithSpanKind(trace.SpanKindClient)) + defer span.End() + if span.IsRecordingEvents() { + span.AddAttributes(EventTraceAttributes(&e)...) + } + + resp, err := c.client.Request(ctx, e) + + if err != nil { + r.Error() + } else { + r.OK() + } + return resp, err +} + +// StartReceiver sets up the given fn to handle Receive. +// See Client.StartReceiver for details. This is a blocking call. +func (c *obsClient) StartReceiver(ctx context.Context, fn interface{}) error { + ctx, r := observability.NewReporter(ctx, reportStartReceiver) + + err := c.client.StartReceiver(ctx, fn) + + if err != nil { + r.Error() + } else { + r.OK() + } + return err +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/defaulters.go b/vendor/github.com/cloudevents/sdk-go/v2/client/defaulters.go new file mode 100644 index 0000000000..5d0d7bc941 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/defaulters.go @@ -0,0 +1,52 @@ +package client + +import ( + "context" + "time" + + "github.com/cloudevents/sdk-go/v2/event" + + "github.com/google/uuid" +) + +// EventDefaulter is the function signature for extensions that are able +// to perform event defaulting. +type EventDefaulter func(ctx context.Context, event event.Event) event.Event + +// DefaultIDToUUIDIfNotSet will inspect the provided event and assign a UUID to +// context.ID if it is found to be empty. +func DefaultIDToUUIDIfNotSet(ctx context.Context, event event.Event) event.Event { + if event.Context != nil { + if event.ID() == "" { + event.Context = event.Context.Clone() + event.SetID(uuid.New().String()) + } + } + return event +} + +// DefaultTimeToNowIfNotSet will inspect the provided event and assign a new +// Timestamp to context.Time if it is found to be nil or zero. +func DefaultTimeToNowIfNotSet(ctx context.Context, event event.Event) event.Event { + if event.Context != nil { + if event.Time().IsZero() { + event.Context = event.Context.Clone() + event.SetTime(time.Now()) + } + } + return event +} + +// NewDefaultDataContentTypeIfNotSet returns a defaulter that will inspect the +// provided event and set the provided content type if content type is found +// to be empty. +func NewDefaultDataContentTypeIfNotSet(contentType string) EventDefaulter { + return func(ctx context.Context, event event.Event) event.Event { + if event.Context != nil { + if event.DataContentType() == "" { + event.SetDataContentType(contentType) + } + } + return event + } +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/doc.go b/vendor/github.com/cloudevents/sdk-go/v2/client/doc.go new file mode 100644 index 0000000000..a6a602bb41 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/doc.go @@ -0,0 +1,6 @@ +/* +Package client holds the recommended entry points for interacting with the CloudEvents Golang SDK. The client wraps +a selected transport. The client adds validation and defaulting for sending events, and flexible receiver method +registration. For full details, read the `client.Client` documentation. +*/ +package client diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/http_receiver.go b/vendor/github.com/cloudevents/sdk-go/v2/client/http_receiver.go new file mode 100644 index 0000000000..416e971e69 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/http_receiver.go @@ -0,0 +1,39 @@ +package client + +import ( + "context" + "net/http" + + thttp "github.com/cloudevents/sdk-go/v2/protocol/http" +) + +func NewHTTPReceiveHandler(ctx context.Context, p *thttp.Protocol, fn interface{}) (*EventReceiver, error) { + invoker, err := newReceiveInvoker(fn) + if err != nil { + return nil, err + } + + return &EventReceiver{ + p: p, + invoker: invoker, + }, nil +} + +type EventReceiver struct { + p *thttp.Protocol + invoker Invoker +} + +func (r *EventReceiver) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + go func() { + r.p.ServeHTTP(rw, req) + }() + + ctx := context.Background() + msg, respFn, err := r.p.Respond(ctx) + if err != nil { + // TODO + } else if err := r.invoker.Invoke(ctx, msg, respFn); err != nil { + // TODO + } +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go b/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go new file mode 100644 index 0000000000..e7c064021d --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go @@ -0,0 +1,82 @@ +package client + +import ( + "context" + "fmt" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/protocol" +) + +type Invoker interface { + Invoke(context.Context, binding.Message, protocol.ResponseFn) error + IsReceiver() bool + IsResponder() bool +} + +var _ Invoker = (*receiveInvoker)(nil) + +func newReceiveInvoker(fn interface{}, fns ...EventDefaulter) (Invoker, error) { + r := &receiveInvoker{ + eventDefaulterFns: fns, + } + + if fn, err := receiver(fn); err != nil { + return nil, err + } else { + r.fn = fn + } + + return r, nil +} + +type receiveInvoker struct { + fn *receiverFn + eventDefaulterFns []EventDefaulter +} + +func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn protocol.ResponseFn) (err error) { + defer func() { + if err2 := m.Finish(err); err2 == nil { + err = err2 + } + }() + + e, err := binding.ToEvent(ctx, m) + if err != nil { + return err + } + + if e != nil && r.fn != nil { + resp, result := r.fn.invoke(ctx, *e) + + // Apply the defaulter chain to the outgoing event. + if resp != nil && len(r.eventDefaulterFns) > 0 { + for _, fn := range r.eventDefaulterFns { + *resp = fn(ctx, *resp) + } + // Validate the event conforms to the CloudEvents Spec. + if verr := resp.Validate(); verr != nil { + return fmt.Errorf("cloudevent validation failed on response event: %v, %w", verr, err) + } + } + if respFn != nil { + var rm binding.Message + if resp != nil { + rm = (*binding.EventMessage)(resp) + } + + return respFn(ctx, rm, result) // TODO: there is a chance this never gets called. Is that ok? + } + } + + return nil +} + +func (r *receiveInvoker) IsReceiver() bool { + return !r.fn.hasEventOut +} + +func (r *receiveInvoker) IsResponder() bool { + return r.fn.hasEventOut +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/observability.go b/vendor/github.com/cloudevents/sdk-go/v2/client/observability.go new file mode 100644 index 0000000000..8e8add28e7 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/observability.go @@ -0,0 +1,94 @@ +package client + +import ( + "context" + "github.com/cloudevents/sdk-go/v2/event" + "github.com/cloudevents/sdk-go/v2/extensions" + "github.com/cloudevents/sdk-go/v2/observability" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/trace" +) + +var ( + // LatencyMs measures the latency in milliseconds for the CloudEvents + // client methods. + LatencyMs = stats.Float64("cloudevents.io/sdk-go/client/latency", "The latency in milliseconds for the CloudEvents client methods.", "ms") +) + +var ( + // LatencyView is an OpenCensus view that shows client method latency. + LatencyView = &view.View{ + Name: "client/latency", + Measure: LatencyMs, + Description: "The distribution of latency inside of client for CloudEvents.", + Aggregation: view.Distribution(0, .01, .1, 1, 10, 100, 1000, 10000), + TagKeys: observability.LatencyTags(), + } +) + +type observed int32 + +// Adheres to Observable +var _ observability.Observable = observed(0) + +const ( + specversionAttr = "cloudevents.specversion" + typeAttr = "cloudevents.type" + sourceAttr = "cloudevents.source" + subjectAttr = "cloudevents.subject" + datacontenttypeAttr = "cloudevents.datacontenttype" + + reportSend observed = iota + reportRequest + reportStartReceiver +) + +// MethodName implements Observable.MethodName +func (o observed) MethodName() string { + switch o { + case reportSend: + return "send" + case reportRequest: + return "request" + case reportStartReceiver: + return "start_receiver" + default: + return "unknown" + } +} + +// LatencyMs implements Observable.LatencyMs +func (o observed) LatencyMs() *stats.Float64Measure { + return LatencyMs +} + +func EventTraceAttributes(e event.EventReader) []trace.Attribute { + as := []trace.Attribute{ + trace.StringAttribute(specversionAttr, e.SpecVersion()), + trace.StringAttribute(typeAttr, e.Type()), + trace.StringAttribute(sourceAttr, e.Source()), + } + if sub := e.Subject(); sub != "" { + as = append(as, trace.StringAttribute(subjectAttr, sub)) + } + if dct := e.DataContentType(); dct != "" { + as = append(as, trace.StringAttribute(datacontenttypeAttr, dct)) + } + return as +} + +// TraceSpan returns context and trace.Span based on event. Caller must call span.End() +func TraceSpan(ctx context.Context, e event.Event) (context.Context, *trace.Span) { + var span *trace.Span + if ext, ok := extensions.GetDistributedTracingExtension(e); ok { + ctx, span = ext.StartChildSpan(ctx, observability.ClientSpanName, trace.WithSpanKind(trace.SpanKindServer)) + } + if span == nil { + ctx, span = trace.StartSpan(ctx, observability.ClientSpanName, trace.WithSpanKind(trace.SpanKindServer)) + } + if span.IsRecordingEvents() { + span.AddAttributes(EventTraceAttributes(&e)...) + } + return ctx, span +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/options.go b/vendor/github.com/cloudevents/sdk-go/v2/client/options.go new file mode 100644 index 0000000000..3a1b40fe9e --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/options.go @@ -0,0 +1,73 @@ +package client + +import ( + "fmt" + "github.com/cloudevents/sdk-go/v2/binding" +) + +// Option is the function signature required to be considered an client.Option. +type Option func(interface{}) error + +// WithEventDefaulter adds an event defaulter to the end of the defaulter chain. +func WithEventDefaulter(fn EventDefaulter) Option { + return func(i interface{}) error { + if c, ok := i.(*ceClient); ok { + if fn == nil { + return fmt.Errorf("client option was given an nil event defaulter") + } + c.eventDefaulterFns = append(c.eventDefaulterFns, fn) + } + return nil + } +} + +func WithForceBinary() Option { + return func(i interface{}) error { + if c, ok := i.(*ceClient); ok { + c.outboundContextDecorators = append(c.outboundContextDecorators, binding.WithForceBinary) + } + return nil + } +} + +func WithForceStructured() Option { + return func(i interface{}) error { + if c, ok := i.(*ceClient); ok { + c.outboundContextDecorators = append(c.outboundContextDecorators, binding.WithForceStructured) + } + return nil + } +} + +// WithUUIDs adds DefaultIDToUUIDIfNotSet event defaulter to the end of the +// defaulter chain. +func WithUUIDs() Option { + return func(i interface{}) error { + if c, ok := i.(*ceClient); ok { + c.eventDefaulterFns = append(c.eventDefaulterFns, DefaultIDToUUIDIfNotSet) + } + return nil + } +} + +// WithTimeNow adds DefaultTimeToNowIfNotSet event defaulter to the end of the +// defaulter chain. +func WithTimeNow() Option { + return func(i interface{}) error { + if c, ok := i.(*ceClient); ok { + c.eventDefaulterFns = append(c.eventDefaulterFns, DefaultTimeToNowIfNotSet) + } + return nil + } +} + +// WithTracePropagation enables trace propagation via the distributed tracing +// extension. +func WithTracePropagation() Option { + return func(i interface{}) error { + if c, ok := i.(*obsClient); ok { + c.addTracing = true + } + return nil + } +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/receiver.go b/vendor/github.com/cloudevents/sdk-go/v2/client/receiver.go new file mode 100644 index 0000000000..e75f9abb98 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/receiver.go @@ -0,0 +1,189 @@ +package client + +import ( + "context" + "errors" + "fmt" + "reflect" + + "github.com/cloudevents/sdk-go/v2/event" + "github.com/cloudevents/sdk-go/v2/protocol" +) + +// Receive is the signature of a fn to be invoked for incoming cloudevents. +type ReceiveFull func(context.Context, event.Event) protocol.Result + +type receiverFn struct { + numIn int + numOut int + fnValue reflect.Value + + hasContextIn bool + hasEventIn bool + + hasEventOut bool + hasResultOut bool +} + +const ( + inParamUsage = "expected a function taking either no parameters, one or more of (context.Context, event.Event) ordered" + outParamUsage = "expected a function returning one or mode of (*event.Event, protocol.Result) ordered" +) + +var ( + contextType = reflect.TypeOf((*context.Context)(nil)).Elem() + eventType = reflect.TypeOf((*event.Event)(nil)).Elem() + eventPtrType = reflect.TypeOf((*event.Event)(nil)) // want the ptr type + resultType = reflect.TypeOf((*protocol.Result)(nil)).Elem() +) + +// receiver creates a receiverFn wrapper class that is used by the client to +// validate and invoke the provided function. +// Valid fn signatures are: +// * func() +// * func() error +// * func(context.Context) +// * func(context.Context) transport.Result +// * func(event.Event) +// * func(event.Event) transport.Result +// * func(context.Context, event.Event) +// * func(context.Context, event.Event) transport.Result +// * func(event.Event) *event.Event +// * func(event.Event) (*event.Event, transport.Result) +// * func(context.Context, event.Event, *event.Event +// * func(context.Context, event.Event) (*event.Event, transport.Result) +// +func receiver(fn interface{}) (*receiverFn, error) { + fnType := reflect.TypeOf(fn) + if fnType.Kind() != reflect.Func { + return nil, errors.New("must pass a function to handle events") + } + + r := &receiverFn{ + fnValue: reflect.ValueOf(fn), + numIn: fnType.NumIn(), + numOut: fnType.NumOut(), + } + + if err := r.validate(fnType); err != nil { + return nil, err + } + + return r, nil +} + +func (r *receiverFn) invoke(ctx context.Context, e event.Event) (*event.Event, protocol.Result) { + args := make([]reflect.Value, 0, r.numIn) + + if r.numIn > 0 { + if r.hasContextIn { + args = append(args, reflect.ValueOf(ctx)) + } + if r.hasEventIn { + args = append(args, reflect.ValueOf(e)) + } + } + v := r.fnValue.Call(args) + var respOut protocol.Result + var eOut *event.Event + if r.numOut > 0 { + i := 0 + if r.hasEventOut { + if eo, ok := v[i].Interface().(*event.Event); ok { + eOut = eo + } + i++ // <-- note, need to inc i. + } + if r.hasResultOut { + if resp, ok := v[i].Interface().(protocol.Result); ok { + respOut = resp + } + } + } + return eOut, respOut +} + +// Verifies that the inputs to a function have a valid signature +// Valid input is to be [0, all] of +// context.Context, event.Event in this order. +func (r *receiverFn) validateInParamSignature(fnType reflect.Type) error { + r.hasContextIn = false + r.hasEventIn = false + + switch fnType.NumIn() { + case 2: + // has to be (context.Context, event.Event) + if !fnType.In(1).ConvertibleTo(eventType) { + return fmt.Errorf("%s; cannot convert parameter 2 from %s to event.Event", inParamUsage, fnType.In(1)) + } else { + r.hasEventIn = true + } + fallthrough + case 1: + if !fnType.In(0).ConvertibleTo(contextType) { + if !fnType.In(0).ConvertibleTo(eventType) { + return fmt.Errorf("%s; cannot convert parameter 1 from %s to context.Context or event.Event", inParamUsage, fnType.In(0)) + } else if r.hasEventIn { + return fmt.Errorf("%s; duplicate parameter of type event.Event", inParamUsage) + } else { + r.hasEventIn = true + } + } else { + r.hasContextIn = true + } + fallthrough + case 0: + return nil + + default: + return fmt.Errorf("%s; function has too many parameters (%d)", inParamUsage, fnType.NumIn()) + } +} + +// Verifies that the outputs of a function have a valid signature +// Valid output signatures to be [0, all] of +// *event.Event, transport.Result in this order +func (r *receiverFn) validateOutParamSignature(fnType reflect.Type) error { + r.hasEventOut = false + r.hasResultOut = false + + switch fnType.NumOut() { + case 2: + // has to be (*event.Event, transport.Result) + if !fnType.Out(1).ConvertibleTo(resultType) { + return fmt.Errorf("%s; cannot convert parameter 2 from %s to event.Response", outParamUsage, fnType.Out(1)) + } else { + r.hasResultOut = true + } + fallthrough + case 1: + if !fnType.Out(0).ConvertibleTo(resultType) { + if !fnType.Out(0).ConvertibleTo(eventPtrType) { + return fmt.Errorf("%s; cannot convert parameter 1 from %s to *event.Event or transport.Result", outParamUsage, fnType.Out(0)) + } else { + r.hasEventOut = true + } + } else if r.hasResultOut { + return fmt.Errorf("%s; duplicate parameter of type event.Response", outParamUsage) + } else { + r.hasResultOut = true + } + fallthrough + case 0: + return nil + default: + return fmt.Errorf("%s; function has too many return types (%d)", outParamUsage, fnType.NumOut()) + } +} + +// validateReceiverFn validates that a function has the right number of in and +// out params and that they are of allowed types. +func (r *receiverFn) validate(fnType reflect.Type) error { + if err := r.validateInParamSignature(fnType); err != nil { + return err + } + if err := r.validateOutParamSignature(fnType); err != nil { + return err + } + return nil +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/extensions/distributed_tracing_extension.go b/vendor/github.com/cloudevents/sdk-go/v2/extensions/distributed_tracing_extension.go new file mode 100644 index 0000000000..f0206f33c1 --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/extensions/distributed_tracing_extension.go @@ -0,0 +1,126 @@ +package extensions + +import ( + "context" + "reflect" + "strings" + + "github.com/cloudevents/sdk-go/v2/event" + + "github.com/cloudevents/sdk-go/v2/types" + "github.com/lightstep/tracecontext.go/traceparent" + "github.com/lightstep/tracecontext.go/tracestate" + "go.opencensus.io/trace" + octs "go.opencensus.io/trace/tracestate" +) + +const ( + TraceParentExtension = "traceparent" + TraceStateExtension = "tracestate" +) + +// DistributedTracingExtension represents the extension for cloudevents context +type DistributedTracingExtension struct { + TraceParent string `json:"traceparent"` + TraceState string `json:"tracestate"` +} + +// AddTracingAttributes adds the tracing attributes traceparent and tracestate to the cloudevents context +func (d DistributedTracingExtension) AddTracingAttributes(e event.EventWriter) { + if d.TraceParent != "" { + value := reflect.ValueOf(d) + typeOf := value.Type() + + for i := 0; i < value.NumField(); i++ { + k := strings.ToLower(typeOf.Field(i).Name) + v := value.Field(i).Interface() + if k == TraceStateExtension && v == "" { + continue + } + e.SetExtension(k, v) + } + } +} + +func GetDistributedTracingExtension(event event.Event) (DistributedTracingExtension, bool) { + if tp, ok := event.Extensions()[TraceParentExtension]; ok { + if tpStr, err := types.ToString(tp); err == nil { + var tsStr string + if ts, ok := event.Extensions()[TraceStateExtension]; ok { + tsStr, _ = types.ToString(ts) + } + return DistributedTracingExtension{TraceParent: tpStr, TraceState: tsStr}, true + } + } + return DistributedTracingExtension{}, false +} + +// FromSpanContext populates DistributedTracingExtension from a SpanContext. +func FromSpanContext(sc trace.SpanContext) DistributedTracingExtension { + tp := traceparent.TraceParent{ + TraceID: sc.TraceID, + SpanID: sc.SpanID, + Flags: traceparent.Flags{ + Recorded: sc.IsSampled(), + }, + } + + entries := make([]string, 0, len(sc.Tracestate.Entries())) + for _, entry := range sc.Tracestate.Entries() { + entries = append(entries, strings.Join([]string{entry.Key, entry.Value}, "=")) + } + + return DistributedTracingExtension{ + TraceParent: tp.String(), + TraceState: strings.Join(entries, ","), + } +} + +// ToSpanContext creates a SpanContext from a DistributedTracingExtension instance. +func (d DistributedTracingExtension) ToSpanContext() (trace.SpanContext, error) { + tp, err := traceparent.ParseString(d.TraceParent) + if err != nil { + return trace.SpanContext{}, err + } + sc := trace.SpanContext{ + TraceID: tp.TraceID, + SpanID: tp.SpanID, + } + if tp.Flags.Recorded { + sc.TraceOptions &= 1 + } + + if ts, err := tracestate.ParseString(d.TraceState); err == nil { + entries := make([]octs.Entry, 0, len(ts)) + for _, member := range ts { + var key string + if member.Vendor != "" { + key = member.Tenant + "@" + member.Vendor + } else { + key = member.Tenant + } + entries = append(entries, octs.Entry{Key: key, Value: member.Value}) + } + sc.Tracestate, _ = octs.New(nil, entries...) + } + + return sc, nil +} + +func (d DistributedTracingExtension) StartChildSpan(ctx context.Context, name string, opts ...trace.StartOption) (context.Context, *trace.Span) { + if sc, err := d.ToSpanContext(); err == nil { + tSpan := trace.FromContext(ctx) + ctx, span := trace.StartSpanWithRemoteParent(ctx, name, sc, opts...) + if tSpan != nil { + // Add link to the previous in-process trace. + tsc := tSpan.SpanContext() + span.AddLink(trace.Link{ + TraceID: tsc.TraceID, + SpanID: tsc.SpanID, + Type: trace.LinkTypeParent, + }) + } + return ctx, span + } + return ctx, nil +} From 33f43306e05f567752734c4cd23b5bb49693f2bb Mon Sep 17 00:00:00 2001 From: Chen Shou Date: Tue, 24 Mar 2020 20:55:04 +0000 Subject: [PATCH 2/6] remove a comment --- pkg/broker/eventutil/label.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/broker/eventutil/label.go b/pkg/broker/eventutil/label.go index 58b7d398af..230f315a85 100644 --- a/pkg/broker/eventutil/label.go +++ b/pkg/broker/eventutil/label.go @@ -66,7 +66,7 @@ func (le *LabeledEvent) Delabeled() *cloudevents.Event { e := le.event.Clone() for k := range e.Extensions() { if strings.HasPrefix(strings.ToLower(k), LabelPrefix) { - // delete(e.Extensions(), k) + // Set to nil to delete that extension. e.SetExtension(k, nil) } } From 21b06638ba37400ddae3c98d6c997a1ede9390d5 Mon Sep 17 00:00:00 2001 From: Chen Shou Date: Tue, 24 Mar 2020 21:43:58 +0000 Subject: [PATCH 3/6] address comment --- pkg/broker/eventutil/label.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/broker/eventutil/label.go b/pkg/broker/eventutil/label.go index 230f315a85..5898f6f8c0 100644 --- a/pkg/broker/eventutil/label.go +++ b/pkg/broker/eventutil/label.go @@ -23,8 +23,8 @@ import ( ) const ( - // LabelPrefix is the prefix for label keys. - LabelPrefix = "kgcp" + // labelPrefix is the prefix for label keys. + labelPrefix = "kgcp" ) // LabeledEvent is a wrapper of a cloudevent @@ -40,7 +40,7 @@ func NewLabeledEvent(e *cloudevents.Event) *LabeledEvent { // WithLabel attaches a label to the event as an extension. func (le *LabeledEvent) WithLabel(key, value string) *LabeledEvent { - le.event.SetExtension(LabelPrefix+key, value) + le.event.SetExtension(labelPrefix+key, value) return le } @@ -49,7 +49,7 @@ func (le *LabeledEvent) GetLabels() map[string]string { m := make(map[string]string) exts := le.event.Extensions() for k, v := range exts { - if strings.HasPrefix(strings.ToLower(k), LabelPrefix) { + if strings.HasPrefix(strings.ToLower(k), labelPrefix) { m[k] = v.(string) } } @@ -65,7 +65,7 @@ func (le *LabeledEvent) Event() *cloudevents.Event { func (le *LabeledEvent) Delabeled() *cloudevents.Event { e := le.event.Clone() for k := range e.Extensions() { - if strings.HasPrefix(strings.ToLower(k), LabelPrefix) { + if strings.HasPrefix(strings.ToLower(k), labelPrefix) { // Set to nil to delete that extension. e.SetExtension(k, nil) } From 8953366882c21dbf6b27d11b97c13629f3af081a Mon Sep 17 00:00:00 2001 From: Chen Shou Date: Tue, 24 Mar 2020 22:02:44 +0000 Subject: [PATCH 4/6] fix bug --- pkg/broker/eventutil/label.go | 8 ++++++-- pkg/broker/eventutil/label_test.go | 14 ++++++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/pkg/broker/eventutil/label.go b/pkg/broker/eventutil/label.go index 5898f6f8c0..5d0a903dd9 100644 --- a/pkg/broker/eventutil/label.go +++ b/pkg/broker/eventutil/label.go @@ -40,7 +40,11 @@ func NewLabeledEvent(e *cloudevents.Event) *LabeledEvent { // WithLabel attaches a label to the event as an extension. func (le *LabeledEvent) WithLabel(key, value string) *LabeledEvent { - le.event.SetExtension(labelPrefix+key, value) + if value == "" { + le.event.SetExtension(labelPrefix+key, nil) + } else { + le.event.SetExtension(labelPrefix+key, value) + } return le } @@ -50,7 +54,7 @@ func (le *LabeledEvent) GetLabels() map[string]string { exts := le.event.Extensions() for k, v := range exts { if strings.HasPrefix(strings.ToLower(k), labelPrefix) { - m[k] = v.(string) + m[strings.TrimPrefix(k, labelPrefix)] = v.(string) } } return m diff --git a/pkg/broker/eventutil/label_test.go b/pkg/broker/eventutil/label_test.go index b6650e19e7..9afd787009 100644 --- a/pkg/broker/eventutil/label_test.go +++ b/pkg/broker/eventutil/label_test.go @@ -37,6 +37,15 @@ func TestLabeledEevent(t *testing.T) { le := NewLabeledEvent(&e).WithLabel("label1", "val1").WithLabel("label2", "val2") + wantLabels := map[string]string{ + "label1": "val1", + "label2": "val2", + } + gotLabels := le.GetLabels() + if diff := cmp.Diff(wantLabels, gotLabels); diff != "" { + t.Errorf("LabeledEvent.GetLabels() (-want,+got): %v", diff) + } + gotDelabeled := le.Delabeled() if diff := cmp.Diff(&wantDelabeled, gotDelabeled); diff != "" { t.Errorf("LabeledEvent.Delabeled() (-want,+got): %v", diff) @@ -45,4 +54,9 @@ func TestLabeledEevent(t *testing.T) { if diff := cmp.Diff(&wantLabeled, gotLabeled); diff != "" { t.Errorf("LabeledEvent.Event() (-want,+got): %v", diff) } + + le.WithLabel("label1", "").WithLabel("label2", "") + if gotCnt := len(le.GetLabels()); gotCnt != 0 { + t.Errorf("Labels count after removing labels got=%d want=0", gotCnt) + } } From b6d8911b9df4b46b7f2f6e41fd1e734a980ed9f6 Mon Sep 17 00:00:00 2001 From: Chen Shou Date: Wed, 25 Mar 2020 17:59:02 +0000 Subject: [PATCH 5/6] merge conflicts --- Gopkg.lock | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index b5422a55d0..c6af90ad16 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -144,7 +144,7 @@ version = "v0.2.1" [[projects]] - digest = "1:f1c479b9da374300702b9589dff5e7985dfc4ab29eca8703045342519e42bc20" + digest = "1:4b4b77eef38610d614ea54aceca55d9a4279858e52d62a46d317745eff452246" name = "github.com/cloudevents/sdk-go" packages = [ "v1", @@ -1710,11 +1710,8 @@ "github.com/cloudevents/sdk-go/v1/cloudevents/transport/http", "github.com/cloudevents/sdk-go/v1/cloudevents/transport/pubsub", "github.com/cloudevents/sdk-go/v1/cloudevents/transport/pubsub/context", -<<<<<<< HEAD - "github.com/cloudevents/sdk-go/v2/protocol/http", -======= "github.com/cloudevents/sdk-go/v2", ->>>>>>> Add event wrapper to label/delabel events + "github.com/cloudevents/sdk-go/v2/protocol/http", "github.com/fsnotify/fsnotify", "github.com/golang/protobuf/jsonpb", "github.com/golang/protobuf/proto", From 4d53dcdb5e3c45fa5e320c13a0f6a9f10aebf2a5 Mon Sep 17 00:00:00 2001 From: Chen Shou Date: Wed, 25 Mar 2020 18:14:08 +0000 Subject: [PATCH 6/6] comment --- pkg/broker/eventutil/label.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/broker/eventutil/label.go b/pkg/broker/eventutil/label.go index 5d0a903dd9..2ce27adc3c 100644 --- a/pkg/broker/eventutil/label.go +++ b/pkg/broker/eventutil/label.go @@ -54,7 +54,7 @@ func (le *LabeledEvent) GetLabels() map[string]string { exts := le.event.Extensions() for k, v := range exts { if strings.HasPrefix(strings.ToLower(k), labelPrefix) { - m[strings.TrimPrefix(k, labelPrefix)] = v.(string) + m[strings.TrimPrefix(strings.ToLower(k), labelPrefix)] = v.(string) } } return m