Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiendpoint #63

Merged
merged 7 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
358 changes: 358 additions & 0 deletions grpcgcp/gcp_multiendpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,358 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package grpcgcp

import (
"context"
"fmt"
"sync"

"github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/protobuf/encoding/protojson"

pb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp"
)

var (
// To be redefined in tests.
grpcDial = func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return grpc.Dial(target, opts...)
}
)

var log = grpclog.Component("GCPMultiEndpoint")

type contextMEKey int

var meKey contextMEKey
nimf marked this conversation as resolved.
Show resolved Hide resolved

// NewMEContext returns a new Context that carries Multiendpoint name.
func NewMEContext(ctx context.Context, name string) context.Context {
return context.WithValue(ctx, meKey, name)
}

// FromMEContext returns the MultiEndpoint name stored in ctx, if any.
func FromMEContext(ctx context.Context) (string, bool) {
name, ok := ctx.Value(meKey).(string)
return name, ok
}

// GCPMultiEndpoint holds the state of MultiEndpoints-enabled gRPC client connection.
//
// The purposes of GcpMultiEndpoint are:
//
// - Fallback to an alternative endpoint (host:port) of a gRPC service when the original
// endpoint is completely unavailable.
// - Be able to route an RPC call to a specific group of endpoints.
// - Be able to reconfigure endpoints in runtime.
//
// A group of endpoints is called a [multiendpoint.MultiEndpoint] and is essentially a list of endpoints
// where priority is defined by the position in the list with the first endpoint having top
// priority. A MultiEndpoint tracks endpoints' availability. When a MultiEndpoint is picked for an
// RPC call, it picks the top priority endpoint that is currently available. More information on the
// [multiendpoint.MultiEndpoint].
//
// GCPMultiEndpoint can have one or more MultiEndpoint identified by its name -- arbitrary
// string provided in the [GCPMultiEndpointOptions] when configuring MultiEndpoints. This name
// can be used to route an RPC call to this MultiEndpoint by using the [NewMEContext].
//
// GCPMultiEndpoint uses [GCPMultiEndpointOptions] for initial configuration.
// An updated configuration can be provided at any time later using [UpdateMultiEndpoints].
//
// Example:
//
// Let's assume we have a service with read and write operations and the following backends:
//
// - service.example.com -- the main set of backends supporting all operations
// - service-fallback.example.com -- read-write replica supporting all operations
// - ro-service.example.com -- read-only replica supporting only read operations
//
// Example configuration:
//
// - MultiEndpoint named "default" with endpoints:
//
// 1. service.example.com:443
//
// 2. service-fallback.example.com:443
//
// - MultiEndpoint named "read" with endpoints:
//
// 1. ro-service.example.com:443
//
// 2. service-fallback.example.com:443
//
// 3. service.example.com:443
//
// With the configuration above GCPMultiEndpoint will use the "default" MultiEndpoint by
// default. It means that RPC calls by default will use the main endpoint and if it is not available
// then the read-write replica.
//
// To offload some read calls to the read-only replica we can specify "read" MultiEndpoint in the
// context. Then these calls will use the read-only replica endpoint and if it is not available
// then the read-write replica and if it is also not available then the main endpoint.
//
// GCPMultiEndpoint creates a [grpcgcp] connection pool for every unique
// endpoint. For the example above three connection pools will be created.
//
// [GCPMultiEndpoint] implements [grpc.ClientConnInterface] and can be used
// as a [grpc.ClientConn] when creating gRPC clients.
type GCPMultiEndpoint struct {
sync.Mutex

defaultName string
mes map[string]multiendpoint.MultiEndpoint
pools map[string]*monitoredConn
opts []grpc.DialOption
gcpConfig *pb.ApiConfig

grpc.ClientConnInterface
}

// Make sure GcpMultiEndpoint implements grpc.ClientConnInterface.
var _ grpc.ClientConnInterface = &GCPMultiEndpoint{}

func (gme *GCPMultiEndpoint) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
return gme.pickConn(ctx).Invoke(ctx, method, args, reply, opts...)
}

func (gme *GCPMultiEndpoint) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return gme.pickConn(ctx).NewStream(ctx, desc, method, opts...)
}

func (gme *GCPMultiEndpoint) pickConn(ctx context.Context) *grpc.ClientConn {
lasradoVinod marked this conversation as resolved.
Show resolved Hide resolved
name, ok := FromMEContext(ctx)
me, ook := gme.mes[name]
if !ok || !ook {
me = gme.mes[gme.defaultName]
}
return gme.pools[me.Current()].conn
}

func (gme *GCPMultiEndpoint) Close() error {
var errs multiError
for _, mc := range gme.pools {
mc.stopMonitoring()
if err := mc.conn.Close(); err != nil {
errs = append(errs, err)
}
}
return errs.Combine()
}

// GCPMultiEndpointOptions holds options to construct a MultiEndpoints-enabled gRPC client
// connection.
type GCPMultiEndpointOptions struct {
// Regular gRPC-GCP configuration to be applied to every endpoint.
GRPCgcpConfig *pb.ApiConfig
// Map of MultiEndpoints where key is the MultiEndpoint name.
MultiEndpoints map[string]*multiendpoint.MultiEndpointOptions
// Name of the default MultiEndpoint.
Default string
}

// NewGcpMultiEndpoint creates new [GCPMultiEndpoint] -- MultiEndpoints-enabled gRPC client
// connection.
//
// [GCPMultiEndpoint] implements [grpc.ClientConnInterface] and can be used
// as a [grpc.ClientConn] when creating gRPC clients.
func NewGcpMultiEndpoint(meOpts *GCPMultiEndpointOptions, opts ...grpc.DialOption) (*GCPMultiEndpoint, error) {
// Read config, create multiendpoints and pools.
o, err := makeOpts(meOpts, opts)
if err != nil {
return nil, err
}
gme := &GCPMultiEndpoint{
mes: make(map[string]multiendpoint.MultiEndpoint),
pools: make(map[string]*monitoredConn),
defaultName: meOpts.Default,
opts: o,
gcpConfig: meOpts.GRPCgcpConfig,
}
if err := gme.UpdateMultiEndpoints(meOpts); err != nil {
return nil, err
}
return gme, nil
}

func makeOpts(meOpts *GCPMultiEndpointOptions, opts []grpc.DialOption) ([]grpc.DialOption, error) {
grpcGCPjsonConfig, err := protojson.Marshal(meOpts.GRPCgcpConfig)
if err != nil {
return nil, err
}
o := append([]grpc.DialOption{}, opts...)
o = append(o, []grpc.DialOption{
grpc.WithDisableServiceConfig(),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":%s}]}`, Name, string(grpcGCPjsonConfig))),
grpc.WithChainUnaryInterceptor(GCPUnaryClientInterceptor),
grpc.WithChainStreamInterceptor(GCPStreamClientInterceptor),
}...)

return o, nil
}

type monitoredConn struct {
endpoint string
conn *grpc.ClientConn
gme *GCPMultiEndpoint
cancel context.CancelFunc
}

func (sm *monitoredConn) monitor() {
var ctx context.Context
ctx, sm.cancel = context.WithCancel(context.Background())
currentState := sm.conn.GetState()
for sm.conn.WaitForStateChange(ctx, currentState) {
currentState = sm.conn.GetState()
// Inform all multiendpoints.
for _, me := range sm.gme.mes {
me.SetEndpointAvailability(sm.endpoint, currentState == connectivity.Ready)
}
}
}

func (sm *monitoredConn) stopMonitoring() {
sm.cancel()
}

// UpdateMultiEndpoints reconfigures MultiEndpoints.
//
// MultiEndpoints are matched with the current ones by name.
//
// - If a current MultiEndpoint is missing in the updated list, the MultiEndpoint will be
// removed.
// - A new MultiEndpoint will be created for every new name in the list.
// - For an existing MultiEndpoint only its endpoints will be updated (no recovery timeout
// change).
//
// Endpoints are matched by the endpoint address (usually in the form of address:port).
//
// - If an existing endpoint is not used by any MultiEndpoint in the updated list, then the
// connection poll for this endpoint will be shutdown.
// - A connection pool will be created for every new endpoint.
// - For an existing endpoint nothing will change (the connection pool will not be re-created,
// thus no connection credentials change, nor connection configuration change).
func (gme *GCPMultiEndpoint) UpdateMultiEndpoints(meOpts *GCPMultiEndpointOptions) error {
gme.Lock()
defer gme.Unlock()
if _, ok := meOpts.MultiEndpoints[meOpts.Default]; !ok {
return fmt.Errorf("default MultiEndpoint %q missing options", meOpts.Default)
}

validPools := make(map[string]bool)
for _, meo := range meOpts.MultiEndpoints {
for _, e := range meo.Endpoints {
validPools[e] = true
}
}

// Add missing pools.
for e := range validPools {
if _, ok := gme.pools[e]; !ok {
// This creates a ClientConn with the gRPC-GCP balancer managing connection pool.
conn, err := grpcDial(e, gme.opts...)
if err != nil {
return err
}
gme.pools[e] = &monitoredConn{
endpoint: e,
conn: conn,
gme: gme,
}
go gme.pools[e].monitor()
}
}

// Add new multi-endpoints and update existing.
for name, meo := range meOpts.MultiEndpoints {
if me, ok := gme.mes[name]; ok {
// Updating existing MultiEndpoint.
me.SetEndpoints(meo.Endpoints)
continue
}

// Add new MultiEndpoint.
me, err := multiendpoint.NewMultiEndpoint(meo)
if err != nil {
return err
}
gme.mes[name] = me
}
gme.defaultName = meOpts.Default

// Remove obsolete MultiEndpoints.
for name := range gme.mes {
if _, ok := meOpts.MultiEndpoints[name]; !ok {
delete(gme.mes, name)
}
}

// Remove obsolete pools.
for e, mc := range gme.pools {
if _, ok := validPools[e]; !ok {
if err := mc.conn.Close(); err != nil {
// TODO: log error.
}
mc.stopMonitoring()
delete(gme.pools, e)
}
}

// Trigger status update.
for e, mc := range gme.pools {
s := mc.conn.GetState()
for _, me := range gme.mes {
me.SetEndpointAvailability(e, s == connectivity.Ready)
}
}
return nil
}

type multiError []error
lasradoVinod marked this conversation as resolved.
Show resolved Hide resolved

func (m multiError) Error() string {
s, n := "", 0
for _, e := range m {
if e != nil {
if n == 0 {
s = e.Error()
}
n++
}
}
switch n {
case 0:
return "(0 errors)"
case 1:
return s
case 2:
return s + " (and 1 other error)"
}
return fmt.Sprintf("%s (and %d other errors)", s, n-1)
}

func (m multiError) Combine() error {
if len(m) == 0 {
return nil
}

return m
}
Loading