diff --git a/grpcgcp/gcp_multiendpoint.go b/grpcgcp/gcp_multiendpoint.go new file mode 100644 index 0000000..457fa1d --- /dev/null +++ b/grpcgcp/gcp_multiendpoint.go @@ -0,0 +1,358 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpcgcp + +import ( + "context" + "fmt" + "sync" + + "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/grpclog" + "google.golang.org/protobuf/encoding/protojson" + + pb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp" +) + +var ( + // To be redefined in tests. + grpcDial = func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + return grpc.Dial(target, opts...) + } +) + +var log = grpclog.Component("GCPMultiEndpoint") + +type contextMEKey int + +var meKey contextMEKey + +// NewMEContext returns a new Context that carries Multiendpoint name. +func NewMEContext(ctx context.Context, name string) context.Context { + return context.WithValue(ctx, meKey, name) +} + +// FromMEContext returns the MultiEndpoint name stored in ctx, if any. +func FromMEContext(ctx context.Context) (string, bool) { + name, ok := ctx.Value(meKey).(string) + return name, ok +} + +// GCPMultiEndpoint holds the state of MultiEndpoints-enabled gRPC client connection. +// +// The purposes of GcpMultiEndpoint are: +// +// - Fallback to an alternative endpoint (host:port) of a gRPC service when the original +// endpoint is completely unavailable. +// - Be able to route an RPC call to a specific group of endpoints. +// - Be able to reconfigure endpoints in runtime. +// +// A group of endpoints is called a [multiendpoint.MultiEndpoint] and is essentially a list of endpoints +// where priority is defined by the position in the list with the first endpoint having top +// priority. A MultiEndpoint tracks endpoints' availability. When a MultiEndpoint is picked for an +// RPC call, it picks the top priority endpoint that is currently available. More information on the +// [multiendpoint.MultiEndpoint]. +// +// GCPMultiEndpoint can have one or more MultiEndpoint identified by its name -- arbitrary +// string provided in the [GCPMultiEndpointOptions] when configuring MultiEndpoints. This name +// can be used to route an RPC call to this MultiEndpoint by using the [NewMEContext]. +// +// GCPMultiEndpoint uses [GCPMultiEndpointOptions] for initial configuration. +// An updated configuration can be provided at any time later using [UpdateMultiEndpoints]. +// +// Example: +// +// Let's assume we have a service with read and write operations and the following backends: +// +// - service.example.com -- the main set of backends supporting all operations +// - service-fallback.example.com -- read-write replica supporting all operations +// - ro-service.example.com -- read-only replica supporting only read operations +// +// Example configuration: +// +// - MultiEndpoint named "default" with endpoints: +// +// 1. service.example.com:443 +// +// 2. service-fallback.example.com:443 +// +// - MultiEndpoint named "read" with endpoints: +// +// 1. ro-service.example.com:443 +// +// 2. service-fallback.example.com:443 +// +// 3. service.example.com:443 +// +// With the configuration above GCPMultiEndpoint will use the "default" MultiEndpoint by +// default. It means that RPC calls by default will use the main endpoint and if it is not available +// then the read-write replica. +// +// To offload some read calls to the read-only replica we can specify "read" MultiEndpoint in the +// context. Then these calls will use the read-only replica endpoint and if it is not available +// then the read-write replica and if it is also not available then the main endpoint. +// +// GCPMultiEndpoint creates a [grpcgcp] connection pool for every unique +// endpoint. For the example above three connection pools will be created. +// +// [GCPMultiEndpoint] implements [grpc.ClientConnInterface] and can be used +// as a [grpc.ClientConn] when creating gRPC clients. +type GCPMultiEndpoint struct { + sync.Mutex + + defaultName string + mes map[string]multiendpoint.MultiEndpoint + pools map[string]*monitoredConn + opts []grpc.DialOption + gcpConfig *pb.ApiConfig + + grpc.ClientConnInterface +} + +// Make sure GcpMultiEndpoint implements grpc.ClientConnInterface. +var _ grpc.ClientConnInterface = &GCPMultiEndpoint{} + +func (gme *GCPMultiEndpoint) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error { + return gme.pickConn(ctx).Invoke(ctx, method, args, reply, opts...) +} + +func (gme *GCPMultiEndpoint) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + return gme.pickConn(ctx).NewStream(ctx, desc, method, opts...) +} + +func (gme *GCPMultiEndpoint) pickConn(ctx context.Context) *grpc.ClientConn { + name, ok := FromMEContext(ctx) + me, ook := gme.mes[name] + if !ok || !ook { + me = gme.mes[gme.defaultName] + } + return gme.pools[me.Current()].conn +} + +func (gme *GCPMultiEndpoint) Close() error { + var errs multiError + for _, mc := range gme.pools { + mc.stopMonitoring() + if err := mc.conn.Close(); err != nil { + errs = append(errs, err) + } + } + return errs.Combine() +} + +// GCPMultiEndpointOptions holds options to construct a MultiEndpoints-enabled gRPC client +// connection. +type GCPMultiEndpointOptions struct { + // Regular gRPC-GCP configuration to be applied to every endpoint. + GRPCgcpConfig *pb.ApiConfig + // Map of MultiEndpoints where key is the MultiEndpoint name. + MultiEndpoints map[string]*multiendpoint.MultiEndpointOptions + // Name of the default MultiEndpoint. + Default string +} + +// NewGcpMultiEndpoint creates new [GCPMultiEndpoint] -- MultiEndpoints-enabled gRPC client +// connection. +// +// [GCPMultiEndpoint] implements [grpc.ClientConnInterface] and can be used +// as a [grpc.ClientConn] when creating gRPC clients. +func NewGcpMultiEndpoint(meOpts *GCPMultiEndpointOptions, opts ...grpc.DialOption) (*GCPMultiEndpoint, error) { + // Read config, create multiendpoints and pools. + o, err := makeOpts(meOpts, opts) + if err != nil { + return nil, err + } + gme := &GCPMultiEndpoint{ + mes: make(map[string]multiendpoint.MultiEndpoint), + pools: make(map[string]*monitoredConn), + defaultName: meOpts.Default, + opts: o, + gcpConfig: meOpts.GRPCgcpConfig, + } + if err := gme.UpdateMultiEndpoints(meOpts); err != nil { + return nil, err + } + return gme, nil +} + +func makeOpts(meOpts *GCPMultiEndpointOptions, opts []grpc.DialOption) ([]grpc.DialOption, error) { + grpcGCPjsonConfig, err := protojson.Marshal(meOpts.GRPCgcpConfig) + if err != nil { + return nil, err + } + o := append([]grpc.DialOption{}, opts...) + o = append(o, []grpc.DialOption{ + grpc.WithDisableServiceConfig(), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":%s}]}`, Name, string(grpcGCPjsonConfig))), + grpc.WithChainUnaryInterceptor(GCPUnaryClientInterceptor), + grpc.WithChainStreamInterceptor(GCPStreamClientInterceptor), + }...) + + return o, nil +} + +type monitoredConn struct { + endpoint string + conn *grpc.ClientConn + gme *GCPMultiEndpoint + cancel context.CancelFunc +} + +func (sm *monitoredConn) monitor() { + var ctx context.Context + ctx, sm.cancel = context.WithCancel(context.Background()) + currentState := sm.conn.GetState() + for sm.conn.WaitForStateChange(ctx, currentState) { + currentState = sm.conn.GetState() + // Inform all multiendpoints. + for _, me := range sm.gme.mes { + me.SetEndpointAvailability(sm.endpoint, currentState == connectivity.Ready) + } + } +} + +func (sm *monitoredConn) stopMonitoring() { + sm.cancel() +} + +// UpdateMultiEndpoints reconfigures MultiEndpoints. +// +// MultiEndpoints are matched with the current ones by name. +// +// - If a current MultiEndpoint is missing in the updated list, the MultiEndpoint will be +// removed. +// - A new MultiEndpoint will be created for every new name in the list. +// - For an existing MultiEndpoint only its endpoints will be updated (no recovery timeout +// change). +// +// Endpoints are matched by the endpoint address (usually in the form of address:port). +// +// - If an existing endpoint is not used by any MultiEndpoint in the updated list, then the +// connection poll for this endpoint will be shutdown. +// - A connection pool will be created for every new endpoint. +// - For an existing endpoint nothing will change (the connection pool will not be re-created, +// thus no connection credentials change, nor connection configuration change). +func (gme *GCPMultiEndpoint) UpdateMultiEndpoints(meOpts *GCPMultiEndpointOptions) error { + gme.Lock() + defer gme.Unlock() + if _, ok := meOpts.MultiEndpoints[meOpts.Default]; !ok { + return fmt.Errorf("default MultiEndpoint %q missing options", meOpts.Default) + } + + validPools := make(map[string]bool) + for _, meo := range meOpts.MultiEndpoints { + for _, e := range meo.Endpoints { + validPools[e] = true + } + } + + // Add missing pools. + for e := range validPools { + if _, ok := gme.pools[e]; !ok { + // This creates a ClientConn with the gRPC-GCP balancer managing connection pool. + conn, err := grpcDial(e, gme.opts...) + if err != nil { + return err + } + gme.pools[e] = &monitoredConn{ + endpoint: e, + conn: conn, + gme: gme, + } + go gme.pools[e].monitor() + } + } + + // Add new multi-endpoints and update existing. + for name, meo := range meOpts.MultiEndpoints { + if me, ok := gme.mes[name]; ok { + // Updating existing MultiEndpoint. + me.SetEndpoints(meo.Endpoints) + continue + } + + // Add new MultiEndpoint. + me, err := multiendpoint.NewMultiEndpoint(meo) + if err != nil { + return err + } + gme.mes[name] = me + } + gme.defaultName = meOpts.Default + + // Remove obsolete MultiEndpoints. + for name := range gme.mes { + if _, ok := meOpts.MultiEndpoints[name]; !ok { + delete(gme.mes, name) + } + } + + // Remove obsolete pools. + for e, mc := range gme.pools { + if _, ok := validPools[e]; !ok { + if err := mc.conn.Close(); err != nil { + // TODO: log error. + } + mc.stopMonitoring() + delete(gme.pools, e) + } + } + + // Trigger status update. + for e, mc := range gme.pools { + s := mc.conn.GetState() + for _, me := range gme.mes { + me.SetEndpointAvailability(e, s == connectivity.Ready) + } + } + return nil +} + +type multiError []error + +func (m multiError) Error() string { + s, n := "", 0 + for _, e := range m { + if e != nil { + if n == 0 { + s = e.Error() + } + n++ + } + } + switch n { + case 0: + return "(0 errors)" + case 1: + return s + case 2: + return s + " (and 1 other error)" + } + return fmt.Sprintf("%s (and %d other errors)", s, n-1) +} + +func (m multiError) Combine() error { + if len(m) == 0 { + return nil + } + + return m +} diff --git a/grpcgcp/multiendpoint/endpoint.go b/grpcgcp/multiendpoint/endpoint.go new file mode 100644 index 0000000..2a9c52e --- /dev/null +++ b/grpcgcp/multiendpoint/endpoint.go @@ -0,0 +1,54 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package multiendpoint + +import ( + "fmt" + "time" +) + +type status int + +// Status of an endpoint. +const ( + unavailable status = iota + available + recovering +) + +func (s status) String() string { + switch s { + case unavailable: + return "Unavailable" + case available: + return "Available" + case recovering: + return "Recovering" + default: + return fmt.Sprintf("%d", s) + } +} + +type endpoint struct { + id string + priority int + status status + lastChange time.Time + futureChange timerAlike +} diff --git a/grpcgcp/multiendpoint/multiendpoint.go b/grpcgcp/multiendpoint/multiendpoint.go new file mode 100644 index 0000000..757ac9b --- /dev/null +++ b/grpcgcp/multiendpoint/multiendpoint.go @@ -0,0 +1,304 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package multiendpoint implements multiendpoint feature. See [MultiEndpoint] +package multiendpoint + +import ( + "errors" + "fmt" + "sync" + "time" +) + +type timerAlike interface { + Reset(time.Duration) bool + Stop() bool +} + +// To be redefined in tests. +var ( + timeNow = func() time.Time { + return time.Now() + } + timeAfterFunc = func(d time.Duration, f func()) timerAlike { + return time.AfterFunc(d, f) + } +) + +// MultiEndpoint holds a list of endpoints, tracks their availability and defines the current +// endpoint. An endpoint has a priority defined by its position in the list (first item has top +// priority). +// +// The current endpoint is the highest available endpoint in the list. If no endpoint is available, +// MultiEndpoint sticks to the previously current endpoint. +// +// Sometimes switching between endpoints can be costly, and it is worth waiting for some time +// after current endpoint becomes unavailable. For this case, use +// [MultiEndpointOptions.RecoveryTimeout] to set the recovery timeout. MultiEndpoint will keep the +// current endpoint for up to recovery timeout after it became unavailable to give it some time to +// recover. +// +// The list of endpoints can be changed at any time with [MultiEndpoint.SetEndpoints] function. +// MultiEndpoint will: +// - remove obsolete endpoints; +// - preserve remaining endpoints and their states; +// - add new endpoints; +// - update all endpoints priority according to the new order; +// - change current endpoint if necessary. +// +// After updating the list of endpoints, MultiEndpoint will switch the current endpoint to the +// highest available endpoint in the list. If you have many processes using MultiEndpoint, this may +// lead to immediate shift of all traffic which may be undesired. To smooth this transfer, use +// [MultiEndpointOptions.SwitchingDelay] with randomized value to introduce a jitter. Each +// MultiEndpoint will delay switching from an available endpoint to another endpoint for this amount +// of time. This delay is only applicable when switching from a lower priority available endpoint to +// a higher priority available endpoint. +type MultiEndpoint interface { + // Current returns current endpoint. + // + // Note that the read is not synchronized and in case of a race condition there is a chance of + // getting an outdated current endpoint. + Current() string + + // SetEndpointAvailability informs MultiEndpoint when an endpoint becomes available or unavailable. + // This may change the current endpoint. + SetEndpointAvailability(e string, avail bool) + + // SetEndpoints updates a list of endpoints: + // - remove obsolete endpoints + // - preserve remaining endpoints and their states + // - add new endpoints + // - update all endpoints priority according to the new order + // This may change the current endpoint. + SetEndpoints(endpoints []string) error +} + +// MultiEndpointOptions is used for configuring [MultiEndpoint]. +type MultiEndpointOptions struct { + // A list of endpoints ordered by priority (first endpoint has top priority). + Endpoints []string + // RecoveryTimeout sets the amount of time MultiEndpoint keeps endpoint as current after it + // became unavailable. + RecoveryTimeout time.Duration + // When switching from a lower priority available endpoint to a higher priority available + // endpoint the MultiEndpoint will delay the switch for this duration. + SwitchingDelay time.Duration +} + +// NewMultiEndpoint validates options and creates a new [MultiEndpoint]. +func NewMultiEndpoint(b *MultiEndpointOptions) (MultiEndpoint, error) { + if len(b.Endpoints) == 0 { + return nil, fmt.Errorf("endpoints list cannot be empty") + } + + me := &multiEndpoint{ + recoveryTimeout: b.RecoveryTimeout, + switchingDelay: b.SwitchingDelay, + current: b.Endpoints[0], + } + eMap := make(map[string]*endpoint) + for i, e := range b.Endpoints { + eMap[e] = me.newEndpoint(e, i) + } + me.endpoints = eMap + return me, nil +} + +type multiEndpoint struct { + sync.Mutex + + endpoints map[string]*endpoint + recoveryTimeout time.Duration + switchingDelay time.Duration + current string + future string +} + +// Current returns current endpoint. +func (me *multiEndpoint) Current() string { + return me.current +} + +// SetEndpoints updates endpoints list: +// - remove obsolete endpoints; +// - preserve remaining endpoints and their states; +// - add new endpoints; +// - update all endpoints priority according to the new order; +// - change current endpoint if necessary. +func (me *multiEndpoint) SetEndpoints(endpoints []string) error { + me.Lock() + defer me.Unlock() + if len(endpoints) == 0 { + return errors.New("endpoints list cannot be empty") + } + newEndpoints := make(map[string]struct{}) + for _, v := range endpoints { + newEndpoints[v] = struct{}{} + } + // Remove obsolete endpoints. + for e := range me.endpoints { + if _, ok := newEndpoints[e]; !ok { + delete(me.endpoints, e) + } + } + // Add new endpoints and update priority. + for i, e := range endpoints { + if _, ok := me.endpoints[e]; !ok { + me.endpoints[e] = me.newEndpoint(e, i) + } else { + me.endpoints[e].priority = i + } + } + + me.maybeUpdateCurrent() + return nil +} + +// Updates current to the top-priority available endpoint unless the current endpoint is +// recovering. +// +// Must be run under me.Lock. +func (me *multiEndpoint) maybeUpdateCurrent() { + c, exists := me.endpoints[me.current] + var topA *endpoint + var top *endpoint + for _, e := range me.endpoints { + if e.status == available && (topA == nil || topA.priority > e.priority) { + topA = e + } + if top == nil || top.priority > e.priority { + top = e + } + } + + if exists && c.status == recovering && (topA == nil || topA.priority > c.priority) { + // Let current endpoint recover while no higher priority endpoints available. + return + } + + // Always prefer top available endpoint. + if topA != nil { + me.switchFromTo(c, topA) + return + } + + // If no current endpoint exists, resort to the top priority endpoint immediately. + if !exists { + me.current = top.id + } +} + +func (me *multiEndpoint) newEndpoint(id string, priority int) *endpoint { + s := unavailable + if me.recoveryTimeout > 0 { + s = recovering + } + e := &endpoint{ + id: id, + priority: priority, + status: s, + } + if e.status == recovering { + me.scheduleUnavailable(e) + } + return e +} + +// Changes or schedules a change of current to the endpoint t. +// +// Must be run under me.Lock. +func (me *multiEndpoint) switchFromTo(f, t *endpoint) { + if me.current == t.id { + return + } + + if me.switchingDelay == 0 || f == nil || f.status == unavailable { + // Switching immediately if no delay or no current or current is unavailable. + me.current = t.id + return + } + + me.future = t.id + timeAfterFunc(me.switchingDelay, func() { + me.Lock() + defer me.Unlock() + if e, ok := me.endpoints[me.future]; ok && e.status == available { + me.current = e.id + } + }) +} + +// SetEndpointAvailability updates the state of an endpoint. +func (me *multiEndpoint) SetEndpointAvailability(e string, avail bool) { + me.Lock() + defer me.Unlock() + me.setEndpointAvailability(e, avail) + me.maybeUpdateCurrent() +} + +// Must be run under me.Lock. +func (me *multiEndpoint) setEndpointAvailability(e string, avail bool) { + ee, ok := me.endpoints[e] + if !ok { + return + } + + if avail { + setState(ee, available) + return + } + + if ee.status != available { + return + } + + if me.recoveryTimeout == 0 { + setState(ee, unavailable) + return + } + + setState(ee, recovering) + me.scheduleUnavailable(ee) +} + +// Change the state of endpoint e to state s. +// +// Must be run under me.Lock. +func setState(e *endpoint, s status) { + if e.futureChange != nil { + e.futureChange.Stop() + } + e.status = s + e.lastChange = timeNow() +} + +// Schedule endpoint e to become unavailable after recoveryTimeout. +func (me *multiEndpoint) scheduleUnavailable(e *endpoint) { + stateChange := e.lastChange + e.futureChange = timeAfterFunc(me.recoveryTimeout, func() { + me.Lock() + defer me.Unlock() + if e.lastChange != stateChange { + // This timer is outdated. + return + } + setState(e, unavailable) + me.maybeUpdateCurrent() + }) +} diff --git a/grpcgcp/multiendpoint/multiendpoint_test.go b/grpcgcp/multiendpoint/multiendpoint_test.go new file mode 100644 index 0000000..b29daab --- /dev/null +++ b/grpcgcp/multiendpoint/multiendpoint_test.go @@ -0,0 +1,723 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package multiendpoint + +import ( + "math/rand" + "sort" + "sync" + "sync/atomic" + "testing" + "time" +) + +const ( + recoveryTO = time.Millisecond * 20 + switchDelay = time.Millisecond * 40 +) + +var ( + threeEndpoints = []string{"first", "second", "third"} + fourEndpoints = []string{"fourth", "first", "third", "second"} + now = time.Now() + fInd uint32 + pendingFns = make(map[time.Time][]uint32) + fns = make(map[uint32]func()) +) + +type FakeTimer struct { + time.Timer + + fnId uint32 +} + +func (ft *FakeTimer) Stop() bool { + delete(fns, ft.fnId) + return true +} + +func (ft *FakeTimer) Reset(d time.Duration) bool { + return true +} + +func init() { + timeNow = func() time.Time { + return now + } + timeAfterFunc = func(d time.Duration, f func()) timerAlike { + t := now.Add(d) + id := atomic.AddUint32(&fInd, 1) + if _, ok := pendingFns[t]; !ok { + pendingFns[t] = []uint32{id} + } + pendingFns[t] = append(pendingFns[t], id) + fns[id] = f + return &FakeTimer{fnId: id} + } +} + +func advanceTime(t *testing.T, d time.Duration) { + t.Helper() + now = now.Add(d) + + times := make([]time.Time, 0, len(pendingFns)) + for t2 := range pendingFns { + times = append(times, t2) + } + + sort.Slice(times, func(i, j int) bool { return times[i].Before(times[j]) }) + + for _, t2 := range times { + if t2.After(now) { + break + } + for _, fid := range pendingFns[t2] { + if fn, ok := fns[fid]; ok { + fn() + delete(fns, fid) + } + } + delete(pendingFns, t2) + } +} + +func advanceTimeConcurring(t *testing.T, d time.Duration, cfns []func()) { + t.Helper() + now = now.Add(d) + + times := make([]time.Time, 0, len(pendingFns)) + for t2 := range pendingFns { + times = append(times, t2) + } + + sort.Slice(times, func(i, j int) bool { return times[i].Before(times[j]) }) + + for _, t2 := range times { + if t2.After(now) { + break + } + for _, fid := range pendingFns[t2] { + if fn, ok := fns[fid]; ok { + cfn := func() {} + if len(cfns) > 0 { + cfn = cfns[0] + cfns = cfns[1:] + } + wg := &sync.WaitGroup{} + wg.Add(2) + r := rand.Int() + go func() { + if r%2 == 0 { + cfn() + } else { + fn() + } + wg.Done() + }() + go func() { + if r%2 == 0 { + fn() + } else { + cfn() + } + wg.Done() + }() + wg.Wait() + delete(fns, fid) + } + } + delete(pendingFns, t2) + } +} + +func initPlain(t *testing.T, es []string) MultiEndpoint { + t.Helper() + me, err := NewMultiEndpoint(&MultiEndpointOptions{ + Endpoints: es, + }) + if err != nil { + t.Fatalf("multiendpointBuilder.Build() returns unexpected error: %v", err) + } + return me +} + +func initWithDelays(t *testing.T, es []string, r time.Duration, d time.Duration) MultiEndpoint { + t.Helper() + me, err := NewMultiEndpoint(&MultiEndpointOptions{ + Endpoints: es, + RecoveryTimeout: r, + SwitchingDelay: d, + }) + if err != nil { + t.Fatalf("multiendpointBuilder.Build() returns unexpected error: %v", err) + } + return me +} + +func TestRestrictEmptyEndpoints(t *testing.T) { + b := &MultiEndpointOptions{ + RecoveryTimeout: 1, + SwitchingDelay: 2, + } + expectedErr := "endpoints list cannot be empty" + if _, err := NewMultiEndpoint(b); err == nil || err.Error() != expectedErr { + t.Errorf("multiendpointBuilder.Build() returns wrong err: %v, want: %v", err, expectedErr) + } +} + +func TestCurrentIsFirstAfterInit(t *testing.T) { + me, err := NewMultiEndpoint(&MultiEndpointOptions{ + Endpoints: threeEndpoints, + }) + if err != nil { + t.Fatalf("multiendpointBuilder.Build() returns unexpected error: %v", err) + } + if c := me.Current(); c != threeEndpoints[0] { + t.Errorf("Current() returns %q, want: %q", c, threeEndpoints[0]) + } +} + +func TestReturnsTopPriorityAvailableEndpointWithoutRecovery(t *testing.T) { + me := initPlain(t, threeEndpoints) + + // Returns first after creation. + if c, want := me.Current(), threeEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Second becomes available. + me.SetEndpointAvailability(threeEndpoints[1], true) + + // Second is the current as the only available. + if c, want := me.Current(), threeEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Third becomes available. + me.SetEndpointAvailability(threeEndpoints[2], true) + + // Second is still the current because it has higher priority. + if c, want := me.Current(), threeEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // First becomes available. + me.SetEndpointAvailability(threeEndpoints[0], true) + + // First becomes the current because it has higher priority. + if c, want := me.Current(), threeEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Second becomes unavailable. + me.SetEndpointAvailability(threeEndpoints[1], false) + + // Second becoming unavailable should not affect the current first. + if c, want := me.Current(), threeEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // First becomes unavailable. + me.SetEndpointAvailability(threeEndpoints[0], false) + + // Third becomes the current as the only remaining available. + if c, want := me.Current(), threeEndpoints[2]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Third becomes unavailable. + me.SetEndpointAvailability(threeEndpoints[2], false) + + // After all endpoints became unavailable the multiEndpoint sticks to the last used endpoint. + if c, want := me.Current(), threeEndpoints[2]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } +} + +func TestCurrentReturnsTopPriorityAvailableEndpointWithRecovery(t *testing.T) { + + me := initWithDelays(t, threeEndpoints, recoveryTO, 0) + + // Returns first after creation. + if c, want := me.Current(), threeEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Second becomes available. + me.SetEndpointAvailability(threeEndpoints[1], true) + + // First is still the current to allow it to become available within recovery timeout. + if c, want := me.Current(), threeEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // After recovery timeout has passed. + advanceTime(t, recoveryTO) + + // Second becomes current as an available endpoint with top priority. + if c, want := me.Current(), threeEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Third becomes available. + me.SetEndpointAvailability(threeEndpoints[2], true) + + // Second is still the current because it has higher priority. + if c, want := me.Current(), threeEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Second becomes unavailable. + me.SetEndpointAvailability(threeEndpoints[1], false) + + // Second is still current, allowing upto recoveryTimeout to recover. + if c, want := me.Current(), threeEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Halfway through recovery timeout the second recovers. + advanceTime(t, recoveryTO/2) + me.SetEndpointAvailability(threeEndpoints[1], true) + + // Second is the current. + if c, want := me.Current(), threeEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // After the initial recovery timeout, the second is still current. + advanceTime(t, recoveryTO/2) + if c, want := me.Current(), threeEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Second becomes unavailable. + me.SetEndpointAvailability(threeEndpoints[1], false) + + // After recovery timeout has passed. + advanceTime(t, recoveryTO) + + // Changes to an available endpoint -- third. + if c, want := me.Current(), threeEndpoints[2]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // First becomes available. + me.SetEndpointAvailability(threeEndpoints[0], true) + + // First becomes current immediately. + if c, want := me.Current(), threeEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // First becomes unavailable. + me.SetEndpointAvailability(threeEndpoints[0], false) + + // First is still current, allowing upto recoveryTimeout to recover. + if c, want := me.Current(), threeEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // After recovery timeout has passed. + advanceTime(t, recoveryTO) + + // Changes to an available endpoint -- third. + if c, want := me.Current(), threeEndpoints[2]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Third becomes unavailable + me.SetEndpointAvailability(threeEndpoints[2], false) + + // Third is still current, allowing upto recoveryTimeout to recover. + if c, want := me.Current(), threeEndpoints[2]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Halfway through recovery timeout the second becomes available. + advanceTime(t, recoveryTO/2) + me.SetEndpointAvailability(threeEndpoints[1], true) + + // Second becomes current immediately. + if c, want := me.Current(), threeEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Second becomes unavailable. + me.SetEndpointAvailability(threeEndpoints[1], false) + + // Second is still current, allowing upto recoveryTimeout to recover. + if c, want := me.Current(), threeEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // After recovery timeout has passed. + advanceTime(t, recoveryTO) + + // After all endpoints became unavailable the multiEndpoint sticks to the last used endpoint. + if c, want := me.Current(), threeEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } +} + +func TestSetEndpointsErrorWhenEmptyEndpoints(t *testing.T) { + me := initPlain(t, threeEndpoints) + expectedErr := "endpoints list cannot be empty" + if err := me.SetEndpoints([]string{}); err == nil || err.Error() != expectedErr { + t.Errorf("multiendpointBuilder.Build() returns wrong err: %v, want: %v", err, expectedErr) + } +} + +func TestSetEndpointsUpdatesEndpoints(t *testing.T) { + me := initPlain(t, threeEndpoints) + me.SetEndpoints(fourEndpoints) + + // "first" which is now under index 1 still current because no other available. + if c, want := me.Current(), fourEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } +} + +func TestSetEndpointsUpdatesEndpointsWithRecovery(t *testing.T) { + me := initWithDelays(t, threeEndpoints, recoveryTO, 0) + me.SetEndpoints(fourEndpoints) + + // "first" which is now under index 1 still current because no other available. + if c, want := me.Current(), fourEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } +} + +func TestSetEndpointsUpdatesEndpointsPreservingStates(t *testing.T) { + me := initPlain(t, threeEndpoints) + + // Second is available. + me.SetEndpointAvailability(threeEndpoints[1], true) + me.SetEndpoints(fourEndpoints) + + // "second" which is now under index 3 still must remain available. + if c, want := me.Current(), fourEndpoints[3]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } +} + +func TestSetEndpointsUpdatesEndpointsSwitchToTopPriorityAvailable(t *testing.T) { + me := initPlain(t, threeEndpoints) + + // Second and third is available. + me.SetEndpointAvailability(threeEndpoints[1], true) + me.SetEndpointAvailability(threeEndpoints[2], true) + + me.SetEndpoints(fourEndpoints) + + // "third" which is now under index 2 must become current, because "second" has lower priority. + if c, want := me.Current(), fourEndpoints[2]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } +} + +func TestSetEndpointsUpdatesEndpointsSwitchToTopPriorityAvailableWithRecovery(t *testing.T) { + me := initWithDelays(t, threeEndpoints, recoveryTO, 0) + + // After recovery timeout has passed. + advanceTime(t, recoveryTO) + + // Second and third is available. + me.SetEndpointAvailability(threeEndpoints[1], true) + me.SetEndpointAvailability(threeEndpoints[2], true) + + me.SetEndpoints(fourEndpoints) + + // "third" which is now under index 2 must become current, because "second" has lower priority. + if c, want := me.Current(), fourEndpoints[2]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } +} + +func TestSetEndpointsUpdatesEndpointsRemovesOnlyActiveEndpoint(t *testing.T) { + extraEndpoints := append(threeEndpoints, "extra") + me := initPlain(t, extraEndpoints) + + // Extra is available. + me.SetEndpointAvailability("extra", true) + + // Extra is current + if c, want := me.Current(), extraEndpoints[3]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Extra is removed. + me.SetEndpoints(fourEndpoints) + + // "fourth" which is under index 0 must become current, because no endpoints are available. + if c, want := me.Current(), fourEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } +} + +func TestSetEndpointsUpdatesEndpointsRemovesOnlyActiveEndpointWithRecovery(t *testing.T) { + extraEndpoints := append(threeEndpoints, "extra") + me := initWithDelays(t, extraEndpoints, recoveryTO, 0) + + // After recovery timeout has passed. + advanceTime(t, recoveryTO) + + // Extra is available. + me.SetEndpointAvailability("extra", true) + + // Extra is removed. + me.SetEndpoints(fourEndpoints) + + // "fourth" which is under index 0 must become current, because no endpoints available. + if c, want := me.Current(), fourEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } +} + +func TestSetEndpointsRecoveringEndpointGetsRemoved(t *testing.T) { + extraEndpoints := append(threeEndpoints, "extra") + me := initWithDelays(t, extraEndpoints, recoveryTO, 0) + + // After recovery timeout has passed. + advanceTime(t, recoveryTO) + + // Extra is available. + me.SetEndpointAvailability("extra", true) + + // Extra is recovering. + me.SetEndpointAvailability("extra", false) + + // Extra is removed. + me.SetEndpoints(fourEndpoints) + + // "fourth" which is under index 0 must become current, because no endpoints available. + if c, want := me.Current(), fourEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // After recovery timeout has passed. + advanceTime(t, recoveryTO) + + // "fourth" is still current. + if c, want := me.Current(), fourEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } +} + +func TestSetEndpointAvailableSubsequentUnavailableShouldNotExtendRecoveryTimeout(t *testing.T) { + // All endpoints are recovering. + me := initWithDelays(t, threeEndpoints, recoveryTO, 0) + + // Before recovery timeout repeat unavailable signal. + advanceTime(t, recoveryTO/2) + me.SetEndpointAvailability(threeEndpoints[0], false) + + // After the initial timeout it must become unavailable. + advanceTime(t, recoveryTO/2) + if c, want := me.(*multiEndpoint).endpoints[threeEndpoints[0]], unavailable; c.status != want { + t.Fatalf("%q endpoint state is %q, want: %q", threeEndpoints[0], c.status, want) + } +} + +func TestSetEndpointAvailableRecoveringUnavailableRace(t *testing.T) { + // All endpoints are recovering. + me := initWithDelays(t, threeEndpoints, recoveryTO, 0) + + // Set "second" available to have something to fallback to. + me.SetEndpointAvailability(threeEndpoints[1], true) + + for i := 0; i < 100; i++ { + // Right at the recovery timeout we enable the "first". This should race with the "first" + // becoming unavailable from its recovery timer. If this race condition is not covered then + // the test will most likely fail or at least be flaky. + advanceTimeConcurring(t, recoveryTO, []func(){ + func() { me.SetEndpointAvailability(threeEndpoints[0], true) }, + }) + + // It is expected that the scheduled Recovering->Unavailable state change (from the recovery + // timeout) and Recovering->Available from the above setEndpointAvailable are guarded by + // mutex and cannot run in parallel. Moreover, if Recovering->Unavailable is run after + // Recovering->Available, then Recovering->Unavailable has no effect because it was planned + // to move the endpoint to Unavailable after recovery timeout but the endpoint became + // Availalble a moment earlier. + // Thus in any case the "first" endpoint must be current at this moment. + if c, want := me.Current(), threeEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + advanceTime(t, recoveryTO) + // Make sure the "first" endpoint is still current after another recovery timeout. + if c, want := me.Current(), threeEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Send it back to recovery state and start recovery timer. + me.SetEndpointAvailability(threeEndpoints[0], false) + } +} + +func TestSetEndpointAvailableDoNotSwitchToUnavailableFromAvailable(t *testing.T) { + me := initWithDelays(t, threeEndpoints, recoveryTO, switchDelay) + // Second and third endpoint are available. + me.SetEndpointAvailability(threeEndpoints[1], true) + me.SetEndpointAvailability(threeEndpoints[2], true) + + advanceTime(t, recoveryTO) + // Second is current after recovery timeout. + if c, want := me.Current(), threeEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // First becomes available. + me.SetEndpointAvailability(threeEndpoints[0], true) + + // Switching is planned to "first" after switching delay. "second" is still current. + if c, want := me.Current(), threeEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Almost at switching delay the "first" endpoint becomes unavailable again. + advanceTime(t, switchDelay-(switchDelay/10)) + me.SetEndpointAvailability(threeEndpoints[0], false) + + // After switching delay the current must be "second". No switching to the recovering + // "first" should occur. + advanceTime(t, switchDelay/5) + if c, want := me.Current(), threeEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } +} + +func TestSetEndpointAvailableDoNotSwitchPreemptively(t *testing.T) { + me := initWithDelays(t, threeEndpoints, recoveryTO, switchDelay) + + // All unavailable after recovery timeout. + advanceTime(t, recoveryTO) + + // Only second endpoint is available. + me.SetEndpointAvailability(threeEndpoints[1], true) + + // After switching delay the second should be current. + advanceTime(t, switchDelay) + if c, want := me.Current(), threeEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Third becomes available. This shouldn't schedule the switch as second is still + // the most preferable. + me.SetEndpointAvailability(threeEndpoints[2], true) + + advanceTime(t, switchDelay/2) + // Halfway to switch delay the first endpoint becomes available. + me.SetEndpointAvailability(threeEndpoints[0], true) + + advanceTime(t, switchDelay/2) + // After complete switching delay since third become available, the second should still be + // current because we didn't schedule the switch when third became available. + if c, want := me.Current(), threeEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + advanceTime(t, switchDelay/2) + // But after switching delay passed since first became available it should become current. + if c, want := me.Current(), threeEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } +} + +func TestSetEndpointsSwitchingDelayed(t *testing.T) { + me := initWithDelays(t, threeEndpoints, recoveryTO, switchDelay) + // All endpoints are available. + for _, e := range threeEndpoints { + me.SetEndpointAvailability(e, true) + } + + // First is current. + if c, want := me.Current(), threeEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Prepend a new endpoint and make it available. + extraEndpoints := []string{"extra"} + extraEndpoints = append(extraEndpoints, threeEndpoints...) + + me.SetEndpoints(extraEndpoints) + me.SetEndpointAvailability(extraEndpoints[0], true) + + // The current endpoint should not change instantly. + if c, want := me.Current(), threeEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // But after switching delay it should. + advanceTime(t, switchDelay) + if c, want := me.Current(), extraEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Make current endpoint unavailable. + me.SetEndpointAvailability(extraEndpoints[0], false) + + // Should wait for recovery timeout. + if c, want := me.Current(), extraEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Should switch to a healthy endpoint after recovery timeout and not the switching delay. + advanceTime(t, recoveryTO) + if c, want := me.Current(), extraEndpoints[1]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Prepend another endpoint. + updatedEndpoints := []string{"extra2"} + updatedEndpoints = append(updatedEndpoints, extraEndpoints...) + + me.SetEndpoints(updatedEndpoints) + // Now the endpoints are: + // 0 extra2 UNAVAILABLE + // 1 extra UNAVAILABLE + // 2 first AVAILABLE <-- current + // 3 second AVAILABLE + // 4 third AVAILABLE + + // Make "extra" endpoint available. + me.SetEndpointAvailability("extra", true) + + // Should wait for the switching delay. + // Halfway it should be still "first" endpoint. + advanceTime(t, switchDelay/2) + if c, want := me.Current(), updatedEndpoints[2]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // Now another higher priority endpoint becomes available. + me.SetEndpointAvailability("extra2", true) + + // Still "first" endpoint is current because switching delay has not passed. + if c, want := me.Current(), updatedEndpoints[2]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } + + // After another half of the switching delay has passed it should switch to the "extra2" because + // it is a top priority available endpoint at the moment. + advanceTime(t, switchDelay/2) + if c, want := me.Current(), updatedEndpoints[0]; c != want { + t.Fatalf("Current() returns %q, want: %q", c, want) + } +} diff --git a/grpcgcp/test_grpc/gcp_multiendpoint_test.go b/grpcgcp/test_grpc/gcp_multiendpoint_test.go new file mode 100644 index 0000000..6db6d30 --- /dev/null +++ b/grpcgcp/test_grpc/gcp_multiendpoint_test.go @@ -0,0 +1,666 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package test_grpc + +import ( + "context" + "net" + "testing" + "time" + + "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp" + "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + configpb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp" + pb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/test_grpc/helloworld/helloworld" +) + +type tempIOError struct{} + +func (e *tempIOError) Error() string { return "simulated temporary IO error" } +func (e *tempIOError) Timeout() bool { return false } +func (e *tempIOError) Temporary() bool { return true } + +var ( + tempErr = &tempIOError{} + callTO = time.Second + waitTO = time.Second * 3 +) + +type faultyConn struct { + es endpointStats + endpoint string + conn net.Conn + + net.Conn +} + +func (f *faultyConn) faulty() bool { + return f.es.faulty(f.endpoint) +} + +func (f *faultyConn) Read(b []byte) (n int, err error) { + if f.faulty() { + f.conn.Close() + return f.conn.Read(b) + } + + return f.conn.Read(b) +} + +func (f *faultyConn) Write(b []byte) (n int, err error) { + if f.faulty() { + f.conn.Close() + return f.conn.Write(b) + } + + return f.conn.Write(b) +} + +func (f *faultyConn) Close() error { + return f.conn.Close() +} + +func (f *faultyConn) LocalAddr() net.Addr { + return f.conn.LocalAddr() +} + +func (f *faultyConn) RemoteAddr() net.Addr { + return f.conn.RemoteAddr() +} + +func (f *faultyConn) SetDeadline(t time.Time) error { + return f.conn.SetDeadline(t) +} + +func (f *faultyConn) SetReadDeadline(t time.Time) error { + return f.conn.SetReadDeadline(t) +} + +func (f *faultyConn) SetWriteDeadline(t time.Time) error { + return f.conn.SetWriteDeadline(t) +} + +type endpointStats map[string]bool + +func (es endpointStats) faulty(e string) bool { + return es[e] == false +} + +func (es endpointStats) dialer(ctx context.Context, s string) (net.Conn, error) { + if es.faulty(s) { + return nil, tempErr + } + + fConn := &faultyConn{ + endpoint: s, + es: es, + } + var err error + fConn.conn, err = net.Dial("tcp", s) + return fConn, err +} + +// Verifies that SayHello call is successful and went through the `expectedEndpoint`. +func (tc *testingClient) SayHelloWorks(ctx context.Context, expectedEndpoint string) { + tc.t.Helper() + ctx, cancel := context.WithTimeout(ctx, callTO) + defer cancel() + var header metadata.MD + if _, err := tc.c.SayHello(ctx, &pb.HelloRequest{Name: "world"}, grpc.Header(&header)); err != nil { + tc.t.Fatalf("could not greet: %v", err) + } + if got, want := header["authority-was"][0], expectedEndpoint; got != want { + tc.t.Fatalf("endpoint wanted %q, got %q", want, got) + } +} + +// Verifies that SayHello call fails with one of the `codes`. +func (tc *testingClient) SayHelloFails(ctx context.Context, codes ...codes.Code) { + tc.t.Helper() + ctx, cancel := context.WithTimeout(ctx, callTO) + defer cancel() + var header metadata.MD + _, err := tc.c.SayHello(ctx, &pb.HelloRequest{Name: "world"}, grpc.Header(&header)) + for _, c := range codes { + if status.Code(err) == c { + return + } + } + tc.t.Fatalf("SayHello() want error with codes: %v, got code: %v, got error: %v", codes, status.Code(err), err) +} + +// Verifies that SayHello call either fails with one of the `codes` or succeeds via the +// `expectedEndpoint`. +// Returns error if SayHello fails with one of the `codes` and nil if SayHello succeeds. +func (tc *testingClient) SayHelloWorksOrFailsWith(ctx context.Context, expectedEndpoint string, codes ...codes.Code) error { + tc.t.Helper() + ctx, cancel := context.WithTimeout(ctx, callTO) + defer cancel() + var header metadata.MD + _, err := tc.c.SayHello(ctx, &pb.HelloRequest{Name: "world"}, grpc.Header(&header)) + if err != nil { + for _, c := range codes { + if status.Code(err) == c { + return err + } + } + tc.t.Fatalf("SayHello() want error with codes: %v, got code: %v, got error: %v", codes, status.Code(err), err) + } + if got, want := header["authority-was"][0], expectedEndpoint; got != want { + tc.t.Fatalf("endpoint wanted %q, got %q", want, got) + } + return nil +} + +// Calls SayHello until it succeeds via the `expectedEndpoint` or timeouts after `to`. +// In the end verifies if SayHello succeeds via the `expectedEndpoint`. +func (tc *testingClient) SayHelloWorksWithin(ctx context.Context, expectedEndpoint string, to time.Duration) { + tc.t.Helper() + toCTX, cancel := context.WithTimeout(context.Background(), to) + defer cancel() + + type sayHelloResult struct { + err error + endpoint string + } + + c := make(chan *sayHelloResult) + defer close(c) + + go func() { + var header metadata.MD + for toCTX.Err() != nil { + ctx, cancel := context.WithTimeout(ctx, callTO) + defer cancel() + _, err := tc.c.SayHello(ctx, &pb.HelloRequest{Name: "world"}, grpc.Header(&header)) + c <- &sayHelloResult{ + err: err, + endpoint: header["authority-was"][0], + } + time.Sleep(time.Millisecond * 50) + } + }() + +loop: + for { + select { + case <-toCTX.Done(): + break loop + case r := <-c: + if r.err == nil && r.endpoint == expectedEndpoint { + break loop + } + } + } + + tc.SayHelloWorks(ctx, expectedEndpoint) +} + +// Calls SayHello as long as it fails with one of the `codes` and until it succeds but not longer than `to` +// Verifies that the successful call was via the `expectedEndpoint`. +func (tc *testingClient) SayHelloFailsThenWorks(ctx context.Context, expectedEndpoint string, to time.Duration, codes ...codes.Code) { + tc.t.Helper() + toCTX, cancel := context.WithTimeout(context.Background(), to) + defer cancel() + failedTimes := 0 + for toCTX.Err() == nil { + time.Sleep(time.Millisecond * 20) + ctx, cancel := context.WithTimeout(ctx, callTO) + defer cancel() + if err := tc.SayHelloWorksOrFailsWith(ctx, expectedEndpoint, codes...); err == nil { + // Worked via expectedEndpoint. + if failedTimes == 0 { + tc.t.Fatalf("SayHello didn't fail. Expected SayHello to fail at least one time with codes %v", codes) + } + return + } + failedTimes++ + } + + tc.SayHelloWorks(ctx, expectedEndpoint) +} + +type testingClient struct { + c pb.GreeterClient + t *testing.T +} + +func TestGcpMultiEndpoint(t *testing.T) { + + lEndpoint, fEndpoint := "localhost:50051", "127.0.0.3:50051" + newE, newE2 := "127.0.0.1:50051", "127.0.0.2:50051" + + defaultME, followerME := "default", "follower" + + // We start with leader unavailable. + eStats := endpointStats{ + lEndpoint: false, + fEndpoint: true, + newE: true, + newE2: true, + } + + apiCfg := &configpb.ApiConfig{ + ChannelPool: &configpb.ChannelPoolConfig{ + MinSize: 3, + MaxSize: 3, + }, + } + + conn, err := grpcgcp.NewGcpMultiEndpoint( + &grpcgcp.GCPMultiEndpointOptions{ + GRPCgcpConfig: apiCfg, + MultiEndpoints: map[string]*multiendpoint.MultiEndpointOptions{ + defaultME: { + Endpoints: []string{lEndpoint, fEndpoint}, + }, + followerME: { + Endpoints: []string{fEndpoint, lEndpoint}, + }, + }, + Default: defaultME, + }, + grpc.WithInsecure(), + grpc.WithContextDialer(eStats.dialer), + ) + + if err != nil { + t.Fatalf("NewMultiEndpointConn returns unexpected error: %v", err) + } + + defer conn.Close() + c := pb.NewGreeterClient(conn) + tc := &testingClient{ + c: c, + t: t, + } + + // First call to the default endpoint will fail because the leader endpoint is unavailable. + tc.SayHelloFails(context.Background(), codes.Unavailable) + + // But follower-first ME works from the beginning. + fCtx := grpcgcp.NewMEContext(context.Background(), followerME) + tc.SayHelloWorks(fCtx, fEndpoint) + + // Make sure default switched to follower in a few moments. + tc.SayHelloWorks(context.Background(), fEndpoint) + + // Enable the leader endpoint. + eStats[lEndpoint] = true + // Give some time to connect. Should work through leader endpoint. + tc.SayHelloWorksWithin(context.Background(), lEndpoint, waitTO) + + // make sure follower still uses follower endpoint. + tc.SayHelloWorks(fCtx, fEndpoint) + + // Disable follower endpoint. + eStats[fEndpoint] = false + // Expect first calls (by number of channels) to follower will fail after that. + // Give some time to detect breakage. Make sure follower switched to leader endpoint. + tc.SayHelloFailsThenWorks(fCtx, lEndpoint, waitTO, codes.Unavailable) + + // Enable follower endpoint. + eStats[fEndpoint] = true + // Give some time to connect/switch. Make sure follower switched back to follower endpoint. + tc.SayHelloWorksWithin(fCtx, fEndpoint, waitTO) + + // Add new endpoint newE to the follower ME. + newMEs := &grpcgcp.GCPMultiEndpointOptions{ + GRPCgcpConfig: apiCfg, + MultiEndpoints: map[string]*multiendpoint.MultiEndpointOptions{ + defaultME: { + Endpoints: []string{lEndpoint, fEndpoint}, + }, + followerME: { + Endpoints: []string{newE, fEndpoint, lEndpoint}, + }, + }, + Default: defaultME, + } + conn.UpdateMultiEndpoints(newMEs) + + // Give some time to connect. Make sure follower uses new endpoint in a few moments. + tc.SayHelloWorksWithin(fCtx, newE, waitTO) + + // Add the same endpoint to the default ME. + newMEs = &grpcgcp.GCPMultiEndpointOptions{ + GRPCgcpConfig: apiCfg, + MultiEndpoints: map[string]*multiendpoint.MultiEndpointOptions{ + defaultME: { + Endpoints: []string{newE, lEndpoint, fEndpoint}, + }, + followerME: { + Endpoints: []string{newE, fEndpoint, lEndpoint}, + }, + }, + Default: defaultME, + } + conn.UpdateMultiEndpoints(newMEs) + // Make sure the new endpoint is used immediately by the default ME + // (because it should know it is ready already). + tc.SayHelloWorks(context.Background(), newE) + + // Rearrange endpoints in the default ME, make sure new top endpoint is used immediately. + newMEs = &grpcgcp.GCPMultiEndpointOptions{ + GRPCgcpConfig: apiCfg, + MultiEndpoints: map[string]*multiendpoint.MultiEndpointOptions{ + defaultME: { + Endpoints: []string{fEndpoint, newE, lEndpoint}, + }, + followerME: { + Endpoints: []string{newE, fEndpoint, lEndpoint}, + }, + }, + Default: defaultME, + } + conn.UpdateMultiEndpoints(newMEs) + tc.SayHelloWorks(context.Background(), fEndpoint) + + // Renaming follower ME. + followerME2 := "follower2" + newMEs = &grpcgcp.GCPMultiEndpointOptions{ + GRPCgcpConfig: apiCfg, + MultiEndpoints: map[string]*multiendpoint.MultiEndpointOptions{ + defaultME: { + Endpoints: []string{fEndpoint, newE, lEndpoint}, + }, + followerME2: { + Endpoints: []string{newE, fEndpoint, lEndpoint}, + }, + }, + Default: defaultME, + } + conn.UpdateMultiEndpoints(newMEs) + f2Ctx := grpcgcp.NewMEContext(context.Background(), followerME2) + tc.SayHelloWorks(f2Ctx, newE) + + // Replace follower endpoint with a new endpoint (not connected). + newMEs = &grpcgcp.GCPMultiEndpointOptions{ + GRPCgcpConfig: apiCfg, + MultiEndpoints: map[string]*multiendpoint.MultiEndpointOptions{ + defaultME: { + Endpoints: []string{newE2, newE, lEndpoint}, + }, + followerME2: { + Endpoints: []string{newE, newE2, lEndpoint}, + }, + }, + Default: defaultME, + } + conn.UpdateMultiEndpoints(newMEs) + // Give some time to connect. newE2 must be used. + tc.SayHelloWorksWithin(context.Background(), newE2, waitTO) + + // Let the follower endpoint shutdown. + time.Sleep(time.Second) + + // Replace new endpoints with the follower endpoint. + newMEs = &grpcgcp.GCPMultiEndpointOptions{ + GRPCgcpConfig: apiCfg, + MultiEndpoints: map[string]*multiendpoint.MultiEndpointOptions{ + defaultME: { + Endpoints: []string{fEndpoint, lEndpoint}, + }, + followerME2: { + Endpoints: []string{fEndpoint, lEndpoint}, + }, + }, + Default: defaultME, + } + conn.UpdateMultiEndpoints(newMEs) + // leader should be used as follower was shutdown previously. + tc.SayHelloWorks(context.Background(), lEndpoint) + // Give some time to connect. Follower must be used. + tc.SayHelloWorksWithin(context.Background(), fEndpoint, waitTO) +} + +func TestGcpMultiEndpointWithDelays(t *testing.T) { + + recoveryTimeout := time.Millisecond * 500 + switchingDelay := time.Millisecond * 700 + margin := time.Millisecond * 50 + + lEndpoint, fEndpoint := "localhost:50051", "127.0.0.3:50051" + newE, newE2 := "127.0.0.1:50051", "127.0.0.2:50051" + + defaultME, followerME := "default", "follower" + + // We start with leader unavailable. + eStats := endpointStats{ + lEndpoint: false, + fEndpoint: true, + newE: true, + newE2: true, + } + + apiCfg := &configpb.ApiConfig{ + ChannelPool: &configpb.ChannelPoolConfig{ + MinSize: 3, + MaxSize: 3, + }, + } + + conn, err := grpcgcp.NewGcpMultiEndpoint( + &grpcgcp.GCPMultiEndpointOptions{ + GRPCgcpConfig: apiCfg, + MultiEndpoints: map[string]*multiendpoint.MultiEndpointOptions{ + defaultME: { + Endpoints: []string{lEndpoint, fEndpoint}, + RecoveryTimeout: recoveryTimeout, + SwitchingDelay: switchingDelay, + }, + followerME: { + Endpoints: []string{fEndpoint, lEndpoint}, + RecoveryTimeout: recoveryTimeout, + SwitchingDelay: switchingDelay, + }, + }, + Default: defaultME, + }, + grpc.WithInsecure(), + grpc.WithContextDialer(eStats.dialer), + ) + + if err != nil { + t.Fatalf("NewMultiEndpointConn returns unexpected error: %v", err) + } + + defer conn.Close() + c := pb.NewGreeterClient(conn) + tc := &testingClient{ + c: c, + t: t, + } + + start := time.Now() + + // First call to the default endpoint will fail because the leader endpoint is unavailable. + tc.SayHelloFails(context.Background(), codes.Unavailable) + + // But follower-first ME works from the beginning. + fCtx := grpcgcp.NewMEContext(context.Background(), followerME) + tc.SayHelloWorks(fCtx, fEndpoint) + + // Make sure default is not switched to the follower before recovery timeout. + time.Sleep(recoveryTimeout - time.Now().Sub(start) - margin) + tc.SayHelloWorksOrFailsWith(context.Background(), fEndpoint, codes.Unavailable) + + // Make sure default switched to follower after recovery timeout. + time.Sleep(recoveryTimeout - time.Now().Sub(start) + margin) + tc.SayHelloWorks(context.Background(), fEndpoint) + + // Enable the leader endpoint. + eStats[lEndpoint] = true + // Give some time to connect. Should work through leader endpoint. + tc.SayHelloWorksWithin(context.Background(), lEndpoint, 2*time.Second+switchingDelay) + + // make sure follower still uses follower endpoint. + tc.SayHelloWorks(fCtx, fEndpoint) + + // Disable follower endpoint. + eStats[fEndpoint] = false + // Expect first calls (by number of channels) to follower will fail after that. + // Give some time to detect breakage. Make sure follower switched to leader endpoint. + tc.SayHelloFailsThenWorks(fCtx, lEndpoint, waitTO, codes.Unavailable) + + // Enable follower endpoint. + eStats[fEndpoint] = true + // Give some time to connect/switch. Make sure follower switched back to follower endpoint. + tc.SayHelloWorksWithin(fCtx, fEndpoint, waitTO+recoveryTimeout) + + // add new endpoint to follower ME (first) + newMEs := &grpcgcp.GCPMultiEndpointOptions{ + GRPCgcpConfig: apiCfg, + MultiEndpoints: map[string]*multiendpoint.MultiEndpointOptions{ + defaultME: { + Endpoints: []string{lEndpoint, fEndpoint}, + }, + followerME: { + Endpoints: []string{newE, fEndpoint, lEndpoint}, + }, + }, + Default: defaultME, + } + conn.UpdateMultiEndpoints(newMEs) + + start = time.Now() + + // Make sure followerME has not switched to the new endpoint before switching delay. + time.Sleep(switchingDelay - margin) + tc.SayHelloWorks(fCtx, fEndpoint) + + // Make sure followerME has switched to the new endpoint after switching delay. + time.Sleep(switchingDelay - time.Now().Sub(start) + margin) + tc.SayHelloWorks(fCtx, newE) + + // Add the same endpoint to the default ME. + newMEs = &grpcgcp.GCPMultiEndpointOptions{ + GRPCgcpConfig: apiCfg, + MultiEndpoints: map[string]*multiendpoint.MultiEndpointOptions{ + defaultME: { + Endpoints: []string{newE, lEndpoint, fEndpoint}, + }, + followerME: { + Endpoints: []string{newE, fEndpoint, lEndpoint}, + }, + }, + Default: defaultME, + } + conn.UpdateMultiEndpoints(newMEs) + + // Even though the new endpoint is ready switching delay must kick in here. + start = time.Now() + + // Make sure defaultME has not switched to the new endpoint before switching delay. + time.Sleep(switchingDelay - margin) + tc.SayHelloWorks(context.Background(), lEndpoint) + + // Make sure followerME has switched to the new endpoint after switching delay. + time.Sleep(switchingDelay - time.Now().Sub(start) + margin) + tc.SayHelloWorks(context.Background(), newE) + + // Rearrange endpoints in the default ME, make sure new top endpoint is used after switching delay. + newMEs = &grpcgcp.GCPMultiEndpointOptions{ + GRPCgcpConfig: apiCfg, + MultiEndpoints: map[string]*multiendpoint.MultiEndpointOptions{ + defaultME: { + Endpoints: []string{fEndpoint, newE, lEndpoint}, + }, + followerME: { + Endpoints: []string{newE, fEndpoint, lEndpoint}, + }, + }, + Default: defaultME, + } + conn.UpdateMultiEndpoints(newMEs) + + start = time.Now() + + // Make sure defaultME has not switched to the new endpoint before switching delay. + time.Sleep(switchingDelay - margin) + tc.SayHelloWorks(context.Background(), newE) + + // Make sure followerME has switched to the new endpoint after switching delay. + time.Sleep(switchingDelay - time.Now().Sub(start) + margin) + tc.SayHelloWorks(context.Background(), fEndpoint) + + // Renaming follower ME. + followerME2 := "follower2" + newMEs = &grpcgcp.GCPMultiEndpointOptions{ + GRPCgcpConfig: apiCfg, + MultiEndpoints: map[string]*multiendpoint.MultiEndpointOptions{ + defaultME: { + Endpoints: []string{fEndpoint, newE, lEndpoint}, + }, + followerME2: { + Endpoints: []string{newE, fEndpoint, lEndpoint}, + }, + }, + Default: defaultME, + } + conn.UpdateMultiEndpoints(newMEs) + f2Ctx := grpcgcp.NewMEContext(context.Background(), followerME2) + tc.SayHelloWorks(f2Ctx, newE) + + // Replace follower endpoint with a new endpoint (not connected). + newMEs = &grpcgcp.GCPMultiEndpointOptions{ + GRPCgcpConfig: apiCfg, + MultiEndpoints: map[string]*multiendpoint.MultiEndpointOptions{ + defaultME: { + Endpoints: []string{newE2, newE, lEndpoint}, + }, + followerME2: { + Endpoints: []string{newE, newE2, lEndpoint}, + }, + }, + Default: defaultME, + } + conn.UpdateMultiEndpoints(newMEs) + // newE is still used as newE2 is not connected yet. + tc.SayHelloWorks(context.Background(), newE) + // Give some time to connect. newE2 must be used. + tc.SayHelloWorksWithin(context.Background(), newE2, waitTO) + + // Let the follower endpoint shutdown. + time.Sleep(time.Second) + + // Replace new endpoints with the follower endpoint. + newMEs = &grpcgcp.GCPMultiEndpointOptions{ + GRPCgcpConfig: apiCfg, + MultiEndpoints: map[string]*multiendpoint.MultiEndpointOptions{ + defaultME: { + Endpoints: []string{fEndpoint, lEndpoint}, + }, + followerME2: { + Endpoints: []string{fEndpoint, lEndpoint}, + }, + }, + Default: defaultME, + } + conn.UpdateMultiEndpoints(newMEs) + // leader should be used as follower was shut down previously. + tc.SayHelloWorks(context.Background(), lEndpoint) + // Give some time to connect and switching delay. Follower must be used. + tc.SayHelloWorksWithin(context.Background(), fEndpoint, waitTO+switchingDelay) +} diff --git a/grpcgcp/test_grpc/main_test.go b/grpcgcp/test_grpc/main_test.go index 6fcf7f9..484403f 100644 --- a/grpcgcp/test_grpc/main_test.go +++ b/grpcgcp/test_grpc/main_test.go @@ -1,3 +1,21 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package test_grpc import ( @@ -11,6 +29,7 @@ import ( "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" "google.golang.org/protobuf/encoding/protojson" configpb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp" @@ -93,6 +112,9 @@ type server struct { } func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { + m, _ := metadata.FromIncomingContext(ctx) + header := metadata.Pairs("authority-was", m[":authority"][0]) + grpc.SendHeader(ctx, header) return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil }