diff --git a/godeps.txt b/godeps.txt index 7a180436c..cfeddd38d 100644 --- a/godeps.txt +++ b/godeps.txt @@ -332,6 +332,7 @@ github.com/ServiceWeaver/weaver/internal/net/call errors fmt github.com/ServiceWeaver/weaver/internal/traceio + github.com/ServiceWeaver/weaver/metadata github.com/ServiceWeaver/weaver/runtime/codegen github.com/ServiceWeaver/weaver/runtime/logging github.com/ServiceWeaver/weaver/runtime/retry @@ -695,6 +696,9 @@ github.com/ServiceWeaver/weaver/internal/weaver sync/atomic syscall time +github.com/ServiceWeaver/weaver/metadata + context + maps github.com/ServiceWeaver/weaver/metrics github.com/ServiceWeaver/weaver/runtime/metrics github.com/ServiceWeaver/weaver/runtime/protos @@ -1063,9 +1067,11 @@ github.com/ServiceWeaver/weaver/weavertest/internal/simple errors fmt github.com/ServiceWeaver/weaver + github.com/ServiceWeaver/weaver/metadata github.com/ServiceWeaver/weaver/runtime/codegen go.opentelemetry.io/otel/codes go.opentelemetry.io/otel/trace + maps net/http os reflect diff --git a/internal/net/call/call.go b/internal/net/call/call.go index 7e2a4bf62..c5068ae10 100644 --- a/internal/net/call/call.go +++ b/internal/net/call/call.go @@ -78,17 +78,13 @@ import ( "sync/atomic" "time" + "github.com/ServiceWeaver/weaver/runtime/codegen" "github.com/ServiceWeaver/weaver/runtime/logging" "github.com/ServiceWeaver/weaver/runtime/retry" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" ) -const ( - // Size of the header included in each message. - msgHeaderSize = 16 + 8 + traceHeaderLen // handler_key + deadline + trace_context -) - // Connection allows a client to send RPCs. type Connection interface { // Call makes an RPC over a Connection. @@ -385,25 +381,29 @@ func (rc *reconnectingConnection) Call(ctx context.Context, h MethodKey, arg []b } func (rc *reconnectingConnection) callOnce(ctx context.Context, h MethodKey, arg []byte, opts CallOptions) ([]byte, error) { - var hdr [msgHeaderSize]byte - copy(hdr[0:], h[:]) + var micros int64 deadline, haveDeadline := ctx.Deadline() if haveDeadline { // Send the deadline in the header. We use the relative time instead // of absolute in case there is significant clock skew. This does mean // that we will not count transmission delay against the deadline. - micros := time.Until(deadline).Microseconds() + micros = time.Until(deadline).Microseconds() if micros <= 0 { // Fail immediately without attempting to send a zero or negative // deadline to the server which will be misinterpreted. <-ctx.Done() return nil, ctx.Err() } - binary.LittleEndian.PutUint64(hdr[16:], uint64(micros)) } - // Send trace information in the header. - writeTraceContext(ctx, hdr[24:]) + // Encode the header. + hdr := encodeHeader(ctx, h, micros) + + // Note that we send the header and the payload as follows: + // [header_length][encoded_header][payload] + var hdrLen [hdrLenLen]byte + binary.LittleEndian.PutUint32(hdrLen[:], uint32(len(hdr))) + hdrSlice := append(hdrLen[:], hdr...) rpc := &call{} rpc.doneSignal = make(chan struct{}) @@ -413,7 +413,7 @@ func (rc *reconnectingConnection) callOnce(ctx context.Context, h MethodKey, arg if err != nil { return nil, err } - if err := writeMessage(nc, &conn.wlock, requestMessage, rpc.id, hdr[:], arg, rc.opts.WriteFlattenLimit); err != nil { + if err := writeMessage(nc, &conn.wlock, requestMessage, rpc.id, hdrSlice, arg, rc.opts.WriteFlattenLimit); err != nil { conn.shutdown("client send request", err) conn.endCall(rpc) return nil, fmt.Errorf("%w: %s", CommunicationError, err) @@ -942,17 +942,24 @@ func (c *serverConnection) readRequests(ctx context.Context, hmap *HandlerMap, o // runHandler runs an application specified RPC handler at the server side. // The result (or error) from the handler is sent back to the client over c. func (c *serverConnection) runHandler(hmap *HandlerMap, id uint64, msg []byte) { - // Extract request header from front of payload. - if len(msg) < msgHeaderSize { + msgLen := uint32(len(msg)) + if msgLen < hdrLenLen { + c.shutdown("server handler", fmt.Errorf("missing request header length")) + return + } + + // Get the header length. + hdrLen := binary.LittleEndian.Uint32(msg[:hdrLenLen]) + hdrEndOffset := hdrLenLen + hdrLen + if msgLen < hdrEndOffset { c.shutdown("server handler", fmt.Errorf("missing request header")) return } - // Extract handler key. - var hkey MethodKey - copy(hkey[:], msg) + // Extracts header information. + ctx, hkey, micros, sc := decodeHeader(msg[hdrLenLen:hdrEndOffset]) - // Extract the method name + // Extracts the method name. methodName := hmap.names[hkey] if methodName == "" { methodName = "handler" @@ -960,17 +967,6 @@ func (c *serverConnection) runHandler(hmap *HandlerMap, id uint64, msg []byte) { methodName = logging.ShortenComponent(methodName) } - // Extract trace context and create a new child span to trace the method - // call on the server. - ctx := context.Background() - span := trace.SpanFromContext(ctx) // noop span - if sc := readTraceContext(msg[24:]); sc.IsValid() { - ctx, span = c.opts.Tracer.Start(trace.ContextWithSpanContext(ctx, sc), methodName, trace.WithSpanKind(trace.SpanKindServer)) - defer span.End() - } - - // Add deadline information from the header to the context. - micros := binary.LittleEndian.Uint64(msg[16:]) var cancelFunc func() if micros != 0 { deadline := time.Now().Add(time.Microsecond * time.Duration(micros)) @@ -984,8 +980,19 @@ func (c *serverConnection) runHandler(hmap *HandlerMap, id uint64, msg []byte) { } }() + // Create a new child span to trace the method call on the server. + span := trace.SpanFromContext(ctx) // noop span + if sc != nil { + if !sc.IsValid() { + c.shutdown("server handler", fmt.Errorf("invalid span context")) + return + } + ctx, span = c.opts.Tracer.Start(trace.ContextWithSpanContext(ctx, *sc), methodName, trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + } + // Call the handler passing it the payload. - payload := msg[msgHeaderSize:] + payload := msg[hdrEndOffset:] var err error var result []byte fn, ok := hmap.handlers[hkey] @@ -1051,6 +1058,40 @@ func (c *serverConnection) shutdown(details string, err error) { } } +// encodeHeader encodes the header information that is propagated by each message. +func encodeHeader(ctx context.Context, h MethodKey, micros int64) []byte { + enc := codegen.NewEncoder() + copy(enc.Grow(len(h)), h[:]) + enc.Int64(micros) + + // Send trace information in the header. + writeTraceContext(ctx, enc) + + // Send context metadata in the header. + writeContextMetadata(ctx, enc) + + return enc.Data() +} + +// decodeHeader extracts the encoded header information. +func decodeHeader(hdr []byte) (context.Context, MethodKey, int64, *trace.SpanContext) { + dec := codegen.NewDecoder(hdr) + + // Extract handler key. + var hkey MethodKey + copy(hkey[:], dec.Read(len(hkey))) + + // Extract deadline information. + micros := dec.Int64() + + // Extract trace context information. + sc := readTraceContext(dec) + + // Extract metadata context information if any. + ctx := readContextMetadata(context.Background(), dec) + return ctx, hkey, micros, sc +} + func logError(logger *slog.Logger, details string, err error) { if errors.Is(err, context.Canceled) || errors.Is(err, io.EOF) || diff --git a/internal/net/call/metadata.go b/internal/net/call/metadata.go new file mode 100644 index 000000000..afe5c4dd9 --- /dev/null +++ b/internal/net/call/metadata.go @@ -0,0 +1,54 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package call + +import ( + "context" + + "github.com/ServiceWeaver/weaver/metadata" + "github.com/ServiceWeaver/weaver/runtime/codegen" +) + +// writeContextMetadata serializes the context metadata (if any) into enc. +func writeContextMetadata(ctx context.Context, enc *codegen.Encoder) { + m, found := metadata.FromContext(ctx) + if !found { + enc.Bool(false) + return + } + enc.Bool(true) + enc.Len(len(m)) + for k, v := range m { + enc.String(k) + enc.String(v) + } +} + +// readContextMetadata returns the context metadata (if any) stored in dec. +func readContextMetadata(ctx context.Context, dec *codegen.Decoder) context.Context { + hasMeta := dec.Bool() + if !hasMeta { + return ctx + } + n := dec.Len() + res := make(map[string]string, n) + var k, v string + for i := 0; i < n; i++ { + k = dec.String() + v = dec.String() + res[k] = v + } + return metadata.NewContext(ctx, res) +} diff --git a/internal/net/call/msg.go b/internal/net/call/msg.go index 1dabe07a8..c7303a735 100644 --- a/internal/net/call/msg.go +++ b/internal/net/call/msg.go @@ -46,6 +46,8 @@ const ( const currentVersion = initialVersion +const hdrLenLen = uint32(4) // size of the header length included in each message + // # Message formats // // All messages have the following format: @@ -60,10 +62,19 @@ const currentVersion = initialVersion // version [4]byte // // requestMessage: -// headerKey [16]byte -- fingerprint of method name -// deadline [8]byte -- zero, or deadline in microseconds -// traceContext [25]byte -- zero, or trace context -// remainder -- call argument serialization +// headerLen [4]byte -- length of the encoded header +// header [headerLen]byte -- encoded header information +// payload -- call argument serialization +// +// The header is encoded using Service Weaver's encoding format for a type that +// looks like: +// +// struct header { +// MethodKey [16]byte +// Deadline int64 +// TraceContext [25]byte +// MetadataContext map[string]string +// } // // responseMessage: // payload holds call result serialization diff --git a/internal/net/call/trace.go b/internal/net/call/trace.go index a9a14900c..6362f2f9d 100644 --- a/internal/net/call/trace.go +++ b/internal/net/call/trace.go @@ -17,38 +17,46 @@ package call import ( "context" + "github.com/ServiceWeaver/weaver/runtime/codegen" "go.opentelemetry.io/otel/trace" ) -const traceHeaderLen = 25 - // writeTraceContext serializes the trace context (if any) contained in ctx -// into b. -// REQUIRES: len(b) >= traceHeaderLen -func writeTraceContext(ctx context.Context, b []byte) { +// into enc. +func writeTraceContext(ctx context.Context, enc *codegen.Encoder) { sc := trace.SpanContextFromContext(ctx) if !sc.IsValid() { + enc.Bool(false) return } + enc.Bool(true) // Send trace information in the header. // TODO(spetrovic): Confirm that we don't need to bother with TraceState, // which seems to be used for storing vendor-specific information. traceID := sc.TraceID() spanID := sc.SpanID() - copy(b, traceID[:]) - copy(b[16:], spanID[:]) - b[24] = byte(sc.TraceFlags()) + copy(enc.Grow(len(traceID)), traceID[:]) + copy(enc.Grow(len(spanID)), spanID[:]) + enc.Byte(byte(sc.TraceFlags())) } -// readTraceContext returns a span context with tracing information stored in b. -// REQUIRES: len(b) >= traceHeaderLen -func readTraceContext(b []byte) trace.SpanContext { +// readTraceContext returns a span context with tracing information stored in dec. +func readTraceContext(dec *codegen.Decoder) *trace.SpanContext { + hasTrace := dec.Bool() + if !hasTrace { + return nil + } + var traceID trace.TraceID + var spanID trace.SpanID + traceID = *(*trace.TraceID)(dec.Read(len(traceID))) + spanID = *(*trace.SpanID)(dec.Read(len(spanID))) cfg := trace.SpanContextConfig{ - TraceID: *(*trace.TraceID)(b[:16]), - SpanID: *(*trace.SpanID)(b[16:24]), - TraceFlags: trace.TraceFlags(b[24]), + TraceID: traceID, + SpanID: spanID, + TraceFlags: trace.TraceFlags(dec.Byte()), Remote: true, } - return trace.NewSpanContext(cfg) + trace := trace.NewSpanContext(cfg) + return &trace } diff --git a/internal/net/call/trace_test.go b/internal/net/call/trace_test.go index f257cf562..16d0133bd 100644 --- a/internal/net/call/trace_test.go +++ b/internal/net/call/trace_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "testing" + "github.com/ServiceWeaver/weaver/runtime/codegen" "github.com/google/uuid" "go.opentelemetry.io/otel/trace" ) @@ -36,14 +37,14 @@ func TestTraceSerialization(t *testing.T) { }) // Serialize the trace context. - var b [25]byte - writeTraceContext( - trace.ContextWithSpanContext(context.Background(), span), b[:]) + enc := codegen.NewEncoder() + writeTraceContext(trace.ContextWithSpanContext(context.Background(), span), enc) // Deserialize the trace context. - actual := readTraceContext(b[:]) + dec := codegen.NewDecoder(enc.Data()) + actual := readTraceContext(dec) expect := span.WithRemote(true) - if !expect.Equal(actual) { + if !expect.Equal(*actual) { want, _ := json.Marshal(expect) got, _ := json.Marshal(actual) t.Errorf("span context diff, want %q, got %q", want, got) diff --git a/metadata/metadata.go b/metadata/metadata.go new file mode 100644 index 000000000..dd84d2361 --- /dev/null +++ b/metadata/metadata.go @@ -0,0 +1,58 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package metadata provides support for the propagation of metadata information +// from a component method caller to the callee. The metadata is propagated to +// the callee even if the caller and callee are not colocated in the same process. +// +// The metadata is a map from string to string stored in context.Context. The map +// can be added to a context by calling NewContext. +// +// Example: +// +// To attach metadata with key "foo" and value "bar" to the context: +// +// ctx := context.Background() +// ctx = metadata.NewContext(ctx, map[string]string{"foo": "bar"}) +// +// To read the metadata value associated with a key "foo" in the context: +// +// meta, found := metadata.FromContext(ctx) +// if found { +// value := meta["foo"] +// } +package metadata + +import ( + "context" + "maps" +) + +// metaKey is an unexported type for the key that stores the metadata. +type metaKey struct{} + +// NewContext returns a new context that carries metadata meta. +func NewContext(ctx context.Context, meta map[string]string) context.Context { + return context.WithValue(ctx, metaKey{}, meta) +} + +// FromContext returns the metadata value stored in ctx, if any. +func FromContext(ctx context.Context) (map[string]string, bool) { + meta, ok := ctx.Value(metaKey{}).(map[string]string) + if !ok { + return nil, false + } + out := maps.Clone(meta) + return out, true +} diff --git a/metadata/metadata_test.go b/metadata/metadata_test.go new file mode 100644 index 000000000..cadfa0a20 --- /dev/null +++ b/metadata/metadata_test.go @@ -0,0 +1,94 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metadata + +import ( + "context" + "reflect" + "testing" +) + +func TestContextMetadata(t *testing.T) { + type testCase struct { + name string + meta map[string]string + isMetaExpected bool + want map[string]string + } + for _, test := range []testCase{ + { + name: "no metadata", + }, + { + name: "with empty metadata", + meta: map[string]string{}, + isMetaExpected: false, + }, + { + name: "with valid metadata", + meta: map[string]string{ + "foo": "bar", + "baz": "waldo", + }, + isMetaExpected: true, + want: map[string]string{ + "foo": "bar", + "baz": "waldo", + }, + }, + { + name: "with valid metadata and uppercase keys", + meta: map[string]string{ + "Foo": "bar", + "Baz": "waldo", + }, + isMetaExpected: true, + want: map[string]string{ + "Foo": "bar", + "Baz": "waldo", + }, + }, + { + name: "with valid metadata and uppercase values", + meta: map[string]string{ + "Foo": "Bar", + "Baz": "Waldo", + }, + isMetaExpected: true, + want: map[string]string{ + "Foo": "Bar", + "Baz": "Waldo", + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + if len(test.meta) > 0 { + ctx = NewContext(ctx, test.meta) + } + + got, found := FromContext(ctx) + if !reflect.DeepEqual(found, test.isMetaExpected) { + t.Errorf("ExtractMetadata: expecting %v, got %v", test.isMetaExpected, found) + } + if !found { + return + } + if !reflect.DeepEqual(test.want, got) { + t.Errorf("ExtractMetadata: expecting %v, got %v", test.want, got) + } + }) + } +} diff --git a/weavertest/internal/simple/simple.go b/weavertest/internal/simple/simple.go index 47884e804..682dab0c0 100644 --- a/weavertest/internal/simple/simple.go +++ b/weavertest/internal/simple/simple.go @@ -22,12 +22,14 @@ import ( "context" "errors" "fmt" + "maps" "net/http" "os" "strings" "sync" "github.com/ServiceWeaver/weaver" + "github.com/ServiceWeaver/weaver/metadata" ) //go:generate ../../../cmd/weaver/weaver generate @@ -50,6 +52,8 @@ type Destination interface { Record(_ context.Context, file, msg string) error GetAll(_ context.Context, file string) ([]string, error) RoutedRecord(_ context.Context, file, msg string) error + UpdateMetadata(_ context.Context) error + GetMetadata(_ context.Context) (map[string]string, error) } var ( @@ -67,7 +71,8 @@ func (r destRouter) RoutedRecord(_ context.Context, file, msg string) string { type destination struct { weaver.Implements[Destination] weaver.WithRouter[destRouter] - mu sync.Mutex + mu sync.Mutex + metadata map[string]string } var pid = os.Getpid() @@ -77,7 +82,7 @@ func (d *destination) Init(ctx context.Context) error { return nil } -func (d *destination) Getpid(_ context.Context) (int, error) { +func (d *destination) Getpid(context.Context) (int, error) { return pid, nil } @@ -113,6 +118,21 @@ func (d *destination) GetAll(_ context.Context, file string) ([]string, error) { return strings.Split(str, "\n"), nil } +func (d *destination) UpdateMetadata(ctx context.Context) error { + d.mu.Lock() + defer d.mu.Unlock() + if meta, found := metadata.FromContext(ctx); found { + d.metadata = maps.Clone(meta) + } + return nil +} + +func (d *destination) GetMetadata(_ context.Context) (map[string]string, error) { + d.mu.Lock() + defer d.mu.Unlock() + return d.metadata, nil +} + // Server is a component used to test Service Weaver listener handling. // An HTTP server is started when this component is initialized. // simple_test.go checks the functionality of the HTTP server by fetching diff --git a/weavertest/internal/simple/simple_test.go b/weavertest/internal/simple/simple_test.go index 53724b61b..5be0f5602 100644 --- a/weavertest/internal/simple/simple_test.go +++ b/weavertest/internal/simple/simple_test.go @@ -27,6 +27,7 @@ import ( "time" "github.com/ServiceWeaver/weaver/internal/traceio" + "github.com/ServiceWeaver/weaver/metadata" "github.com/ServiceWeaver/weaver/weavertest" "github.com/ServiceWeaver/weaver/weavertest/internal/simple" "github.com/google/uuid" @@ -53,11 +54,68 @@ func TestOneComponent(t *testing.T) { } } +func TestContextWithMetadata(t *testing.T) { + updateMetadata := func(ctx context.Context, dst simple.Destination, isMultiDeployer bool) error { + numCallsUpdateMeta := 1 + if isMultiDeployer { + // Note that the multi deployer will create two replicas for the dst + // component. Because the UpdateMetadata and GetMetadata methods are not + // routed, we can end up updating the metadata at replica 0, and reading + // the metadata from replica 1. To avoid this scenario we call the + // UpdateMetadata method 10 times, to make sure each replica has the metadata. + numCallsUpdateMeta = 10 + } + for i := 0; i < numCallsUpdateMeta; i++ { + if err := dst.UpdateMetadata(ctx); err != nil { + return err + } + } + return nil + } + for _, runner := range weavertest.AllRunners() { + runner.Test(t, func(t *testing.T, dst simple.Destination) { + ctx := context.Background() + + // Do not propagate any metadata. Verify that the returned metadata is empty. + if err := updateMetadata(ctx, dst, runner.Name == weavertest.Multi.Name); err != nil { + t.Fatal(err) + } + got, err := dst.GetMetadata(ctx) + if err != nil { + t.Fatal(err) + } + var want map[string]string + if !reflect.DeepEqual(want, got) { + t.Errorf("unexpected metadata : expecting %v, got %v", want, got) + } + + // Propagate valid metadata. Verify that the returned metadata is as expected. + want = map[string]string{ + "foo": "bar", + "baz": "waldo", + } + ctx = metadata.NewContext(ctx, want) + if err := updateMetadata(ctx, dst, runner.Name == weavertest.Multi.Name); err != nil { + t.Fatal(err) + } + got, err = dst.GetMetadata(ctx) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(want, got) { + t.Errorf("unexpected metadata : expecting %v, got %v", want, got) + } + }) + } +} + type fakeDest struct{ file, msg string } -func (f *fakeDest) Getpid(context.Context) (int, error) { return 100, nil } -func (f *fakeDest) GetAll(context.Context, string) ([]string, error) { return nil, nil } -func (f *fakeDest) RoutedRecord(context.Context, string, string) error { return nil } +func (f *fakeDest) Getpid(context.Context) (int, error) { return 100, nil } +func (f *fakeDest) GetAll(context.Context, string) ([]string, error) { return nil, nil } +func (f *fakeDest) RoutedRecord(context.Context, string, string) error { return nil } +func (f *fakeDest) UpdateMetadata(context.Context) error { return nil } +func (f *fakeDest) GetMetadata(context.Context) (map[string]string, error) { return nil, nil } func (f *fakeDest) Record(ctx context.Context, file, msg string) error { f.file = file f.msg = msg diff --git a/weavertest/internal/simple/weaver_gen.go b/weavertest/internal/simple/weaver_gen.go index b474d1089..58c29d0d9 100644 --- a/weavertest/internal/simple/weaver_gen.go +++ b/weavertest/internal/simple/weaver_gen.go @@ -19,12 +19,12 @@ func init() { Iface: reflect.TypeOf((*Destination)(nil)).Elem(), Impl: reflect.TypeOf(destination{}), Routed: true, - NoRetry: []int{2, 3}, + NoRetry: []int{3, 4}, LocalStubFn: func(impl any, caller string, tracer trace.Tracer) any { - return destination_local_stub{impl: impl.(Destination), tracer: tracer, getAllMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "GetAll", Remote: false, Generated: true}), getpidMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "Getpid", Remote: false, Generated: true}), recordMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "Record", Remote: false, Generated: true}), routedRecordMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "RoutedRecord", Remote: false, Generated: true})} + return destination_local_stub{impl: impl.(Destination), tracer: tracer, getAllMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "GetAll", Remote: false, Generated: true}), getMetadataMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "GetMetadata", Remote: false, Generated: true}), getpidMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "Getpid", Remote: false, Generated: true}), recordMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "Record", Remote: false, Generated: true}), routedRecordMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "RoutedRecord", Remote: false, Generated: true}), updateMetadataMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "UpdateMetadata", Remote: false, Generated: true})} }, ClientStubFn: func(stub codegen.Stub, caller string) any { - return destination_client_stub{stub: stub, getAllMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "GetAll", Remote: true, Generated: true}), getpidMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "Getpid", Remote: true, Generated: true}), recordMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "Record", Remote: true, Generated: true}), routedRecordMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "RoutedRecord", Remote: true, Generated: true})} + return destination_client_stub{stub: stub, getAllMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "GetAll", Remote: true, Generated: true}), getMetadataMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "GetMetadata", Remote: true, Generated: true}), getpidMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "Getpid", Remote: true, Generated: true}), recordMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "Record", Remote: true, Generated: true}), routedRecordMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "RoutedRecord", Remote: true, Generated: true}), updateMetadataMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weavertest/internal/simple/Destination", Method: "UpdateMetadata", Remote: true, Generated: true})} }, ServerStubFn: func(impl any, addLoad func(uint64, float64)) codegen.Server { return destination_server_stub{impl: impl.(Destination), addLoad: addLoad} @@ -92,24 +92,30 @@ type __destination_destRouter_if_youre_seeing_this_you_probably_forgot_to_run_we type __destination_destRouter_embedding struct{} -func (__destination_destRouter_embedding) GetAll() {} -func (__destination_destRouter_embedding) Getpid() {} -func (__destination_destRouter_embedding) Record() {} +func (__destination_destRouter_embedding) GetAll() {} +func (__destination_destRouter_embedding) GetMetadata() {} +func (__destination_destRouter_embedding) Getpid() {} +func (__destination_destRouter_embedding) Record() {} +func (__destination_destRouter_embedding) UpdateMetadata() {} -var _ func(_ context.Context, file string, msg string) string = (&destRouter{}).RoutedRecord // routed -var _ = (&__destination_destRouter_if_youre_seeing_this_you_probably_forgot_to_run_weaver_generate{}).GetAll // unrouted -var _ = (&__destination_destRouter_if_youre_seeing_this_you_probably_forgot_to_run_weaver_generate{}).Getpid // unrouted -var _ = (&__destination_destRouter_if_youre_seeing_this_you_probably_forgot_to_run_weaver_generate{}).Record // unrouted +var _ func(_ context.Context, file string, msg string) string = (&destRouter{}).RoutedRecord // routed +var _ = (&__destination_destRouter_if_youre_seeing_this_you_probably_forgot_to_run_weaver_generate{}).GetAll // unrouted +var _ = (&__destination_destRouter_if_youre_seeing_this_you_probably_forgot_to_run_weaver_generate{}).GetMetadata // unrouted +var _ = (&__destination_destRouter_if_youre_seeing_this_you_probably_forgot_to_run_weaver_generate{}).Getpid // unrouted +var _ = (&__destination_destRouter_if_youre_seeing_this_you_probably_forgot_to_run_weaver_generate{}).Record // unrouted +var _ = (&__destination_destRouter_if_youre_seeing_this_you_probably_forgot_to_run_weaver_generate{}).UpdateMetadata // unrouted // Local stub implementations. type destination_local_stub struct { - impl Destination - tracer trace.Tracer - getAllMetrics *codegen.MethodMetrics - getpidMetrics *codegen.MethodMetrics - recordMetrics *codegen.MethodMetrics - routedRecordMetrics *codegen.MethodMetrics + impl Destination + tracer trace.Tracer + getAllMetrics *codegen.MethodMetrics + getMetadataMetrics *codegen.MethodMetrics + getpidMetrics *codegen.MethodMetrics + recordMetrics *codegen.MethodMetrics + routedRecordMetrics *codegen.MethodMetrics + updateMetadataMetrics *codegen.MethodMetrics } // Check that destination_local_stub implements the Destination interface. @@ -135,6 +141,26 @@ func (s destination_local_stub) GetAll(ctx context.Context, a0 string) (r0 []str return s.impl.GetAll(ctx, a0) } +func (s destination_local_stub) GetMetadata(ctx context.Context) (r0 map[string]string, err error) { + // Update metrics. + begin := s.getMetadataMetrics.Begin() + defer func() { s.getMetadataMetrics.End(begin, err != nil, 0, 0) }() + span := trace.SpanFromContext(ctx) + if span.SpanContext().IsValid() { + // Create a child span for this method. + ctx, span = s.tracer.Start(ctx, "simple.Destination.GetMetadata", trace.WithSpanKind(trace.SpanKindInternal)) + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + } + + return s.impl.GetMetadata(ctx) +} + func (s destination_local_stub) Getpid(ctx context.Context) (r0 int, err error) { // Update metrics. begin := s.getpidMetrics.Begin() @@ -195,6 +221,26 @@ func (s destination_local_stub) RoutedRecord(ctx context.Context, a0 string, a1 return s.impl.RoutedRecord(ctx, a0, a1) } +func (s destination_local_stub) UpdateMetadata(ctx context.Context) (err error) { + // Update metrics. + begin := s.updateMetadataMetrics.Begin() + defer func() { s.updateMetadataMetrics.End(begin, err != nil, 0, 0) }() + span := trace.SpanFromContext(ctx) + if span.SpanContext().IsValid() { + // Create a child span for this method. + ctx, span = s.tracer.Start(ctx, "simple.Destination.UpdateMetadata", trace.WithSpanKind(trace.SpanKindInternal)) + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + } + + return s.impl.UpdateMetadata(ctx) +} + type server_local_stub struct { impl Server tracer trace.Tracer @@ -298,11 +344,13 @@ func (s source_local_stub) Emit(ctx context.Context, a0 string, a1 string) (err // Client stub implementations. type destination_client_stub struct { - stub codegen.Stub - getAllMetrics *codegen.MethodMetrics - getpidMetrics *codegen.MethodMetrics - recordMetrics *codegen.MethodMetrics - routedRecordMetrics *codegen.MethodMetrics + stub codegen.Stub + getAllMetrics *codegen.MethodMetrics + getMetadataMetrics *codegen.MethodMetrics + getpidMetrics *codegen.MethodMetrics + recordMetrics *codegen.MethodMetrics + routedRecordMetrics *codegen.MethodMetrics + updateMetadataMetrics *codegen.MethodMetrics } // Check that destination_client_stub implements the Destination interface. @@ -364,6 +412,53 @@ func (s destination_client_stub) GetAll(ctx context.Context, a0 string) (r0 []st return } +func (s destination_client_stub) GetMetadata(ctx context.Context) (r0 map[string]string, err error) { + // Update metrics. + var requestBytes, replyBytes int + begin := s.getMetadataMetrics.Begin() + defer func() { s.getMetadataMetrics.End(begin, err != nil, requestBytes, replyBytes) }() + + span := trace.SpanFromContext(ctx) + if span.SpanContext().IsValid() { + // Create a child span for this method. + ctx, span = s.stub.Tracer().Start(ctx, "simple.Destination.GetMetadata", trace.WithSpanKind(trace.SpanKindClient)) + } + + defer func() { + // Catch and return any panics detected during encoding/decoding/rpc. + if err == nil { + err = codegen.CatchPanics(recover()) + if err != nil { + err = errors.Join(weaver.RemoteCallError, err) + } + } + + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + + }() + + var shardKey uint64 + + // Call the remote method. + var results []byte + results, err = s.stub.Run(ctx, 1, nil, shardKey) + replyBytes = len(results) + if err != nil { + err = errors.Join(weaver.RemoteCallError, err) + return + } + + // Decode the results. + dec := codegen.NewDecoder(results) + r0 = serviceweaver_dec_map_string_string_219dd46d(dec) + err = dec.Error() + return +} + func (s destination_client_stub) Getpid(ctx context.Context) (r0 int, err error) { // Update metrics. var requestBytes, replyBytes int @@ -397,7 +492,7 @@ func (s destination_client_stub) Getpid(ctx context.Context) (r0 int, err error) // Call the remote method. var results []byte - results, err = s.stub.Run(ctx, 1, nil, shardKey) + results, err = s.stub.Run(ctx, 2, nil, shardKey) replyBytes = len(results) if err != nil { err = errors.Join(weaver.RemoteCallError, err) @@ -455,7 +550,7 @@ func (s destination_client_stub) Record(ctx context.Context, a0 string, a1 strin // Call the remote method. requestBytes = len(enc.Data()) var results []byte - results, err = s.stub.Run(ctx, 2, enc.Data(), shardKey) + results, err = s.stub.Run(ctx, 3, enc.Data(), shardKey) replyBytes = len(results) if err != nil { err = errors.Join(weaver.RemoteCallError, err) @@ -515,7 +610,53 @@ func (s destination_client_stub) RoutedRecord(ctx context.Context, a0 string, a1 // Call the remote method. requestBytes = len(enc.Data()) var results []byte - results, err = s.stub.Run(ctx, 3, enc.Data(), shardKey) + results, err = s.stub.Run(ctx, 4, enc.Data(), shardKey) + replyBytes = len(results) + if err != nil { + err = errors.Join(weaver.RemoteCallError, err) + return + } + + // Decode the results. + dec := codegen.NewDecoder(results) + err = dec.Error() + return +} + +func (s destination_client_stub) UpdateMetadata(ctx context.Context) (err error) { + // Update metrics. + var requestBytes, replyBytes int + begin := s.updateMetadataMetrics.Begin() + defer func() { s.updateMetadataMetrics.End(begin, err != nil, requestBytes, replyBytes) }() + + span := trace.SpanFromContext(ctx) + if span.SpanContext().IsValid() { + // Create a child span for this method. + ctx, span = s.stub.Tracer().Start(ctx, "simple.Destination.UpdateMetadata", trace.WithSpanKind(trace.SpanKindClient)) + } + + defer func() { + // Catch and return any panics detected during encoding/decoding/rpc. + if err == nil { + err = codegen.CatchPanics(recover()) + if err != nil { + err = errors.Join(weaver.RemoteCallError, err) + } + } + + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + + }() + + var shardKey uint64 + + // Call the remote method. + var results []byte + results, err = s.stub.Run(ctx, 5, nil, shardKey) replyBytes = len(results) if err != nil { err = errors.Join(weaver.RemoteCallError, err) @@ -781,12 +922,16 @@ func (s destination_server_stub) GetStubFn(method string) func(ctx context.Conte switch method { case "GetAll": return s.getAll + case "GetMetadata": + return s.getMetadata case "Getpid": return s.getpid case "Record": return s.record case "RoutedRecord": return s.routedRecord + case "UpdateMetadata": + return s.updateMetadata default: return nil } @@ -817,6 +962,26 @@ func (s destination_server_stub) getAll(ctx context.Context, args []byte) (res [ return enc.Data(), nil } +func (s destination_server_stub) getMetadata(ctx context.Context, args []byte) (res []byte, err error) { + // Catch and return any panics detected during encoding/decoding/rpc. + defer func() { + if err == nil { + err = codegen.CatchPanics(recover()) + } + }() + + // TODO(rgrandl): The deferred function above will recover from panics in the + // user code: fix this. + // Call the local method. + r0, appErr := s.impl.GetMetadata(ctx) + + // Encode the results. + enc := codegen.NewEncoder() + serviceweaver_enc_map_string_string_219dd46d(enc, r0) + enc.Error(appErr) + return enc.Data(), nil +} + func (s destination_server_stub) getpid(ctx context.Context, args []byte) (res []byte, err error) { // Catch and return any panics detected during encoding/decoding/rpc. defer func() { @@ -891,6 +1056,25 @@ func (s destination_server_stub) routedRecord(ctx context.Context, args []byte) return enc.Data(), nil } +func (s destination_server_stub) updateMetadata(ctx context.Context, args []byte) (res []byte, err error) { + // Catch and return any panics detected during encoding/decoding/rpc. + defer func() { + if err == nil { + err = codegen.CatchPanics(recover()) + } + }() + + // TODO(rgrandl): The deferred function above will recover from panics in the + // user code: fix this. + // Call the local method. + appErr := s.impl.UpdateMetadata(ctx) + + // Encode the results. + enc := codegen.NewEncoder() + enc.Error(appErr) + return enc.Data(), nil +} + type server_server_stub struct { impl Server addLoad func(key uint64, load float64) @@ -1030,6 +1214,11 @@ func (s destination_reflect_stub) GetAll(ctx context.Context, a0 string) (r0 []s return } +func (s destination_reflect_stub) GetMetadata(ctx context.Context) (r0 map[string]string, err error) { + err = s.caller("GetMetadata", ctx, []any{}, []any{&r0}) + return +} + func (s destination_reflect_stub) Getpid(ctx context.Context) (r0 int, err error) { err = s.caller("Getpid", ctx, []any{}, []any{&r0}) return @@ -1045,6 +1234,11 @@ func (s destination_reflect_stub) RoutedRecord(ctx context.Context, a0 string, a return } +func (s destination_reflect_stub) UpdateMetadata(ctx context.Context) (err error) { + err = s.caller("UpdateMetadata", ctx, []any{}, []any{}) + return +} + type server_reflect_stub struct { caller func(string, context.Context, []any, []any) error } @@ -1119,3 +1313,31 @@ func serviceweaver_dec_slice_string_4af10117(dec *codegen.Decoder) []string { } return res } + +func serviceweaver_enc_map_string_string_219dd46d(enc *codegen.Encoder, arg map[string]string) { + if arg == nil { + enc.Len(-1) + return + } + enc.Len(len(arg)) + for k, v := range arg { + enc.String(k) + enc.String(v) + } +} + +func serviceweaver_dec_map_string_string_219dd46d(dec *codegen.Decoder) map[string]string { + n := dec.Len() + if n == -1 { + return nil + } + res := make(map[string]string, n) + var k string + var v string + for i := 0; i < n; i++ { + k = dec.String() + v = dec.String() + res[k] = v + } + return res +}