diff --git a/Documentation/dev-guide/grpc_naming.md b/Documentation/dev-guide/grpc_naming.md index 4c7c01d5c1c..1750be9be6a 100644 --- a/Documentation/dev-guide/grpc_naming.md +++ b/Documentation/dev-guide/grpc_naming.md @@ -18,7 +18,7 @@ import ( cli, cerr := clientv3.NewFromURL("http://localhost:2379") etcdResolver, err := resolver.NewBuilder(clus.RandClient()); -conn, gerr := grpc.Dial("etcd://foo/bar/my-service", grpc.WithResolvers(etcdResolver)) +conn, gerr := grpc.Dial("etcd:///foo/bar/my-service", grpc.WithResolvers(etcdResolver)) ``` ## Managing service endpoints @@ -84,4 +84,4 @@ em := endpoints.NewManager(c, "foo") err := em.Update(context.TODO(), []*endpoints.UpdateWithOpts{ endpoints.NewDeleteUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.4"}), endpoints.NewAddUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.14"})}) -``` \ No newline at end of file +``` diff --git a/clientv3/client.go b/clientv3/client.go index 77ebb2f6cf6..9c1c9c4816d 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -26,18 +26,19 @@ import ( "time" "github.com/google/uuid" - "go.etcd.io/etcd/clientv3/balancer" - "go.etcd.io/etcd/clientv3/balancer/picker" - "go.etcd.io/etcd/clientv3/balancer/resolver/endpoint" - "go.etcd.io/etcd/clientv3/credentials" - "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" - "go.etcd.io/etcd/pkg/logutil" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" grpccredentials "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" + + "go.etcd.io/etcd/clientv3/balancer" + "go.etcd.io/etcd/clientv3/balancer/picker" + "go.etcd.io/etcd/clientv3/balancer/resolver/endpoint" + "go.etcd.io/etcd/clientv3/credentials" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + "go.etcd.io/etcd/pkg/logutil" ) var ( @@ -95,7 +96,8 @@ type Client struct { callOpts []grpc.CallOption - lg *zap.Logger + lgMu *sync.RWMutex + lg *zap.Logger } // New creates a new etcdv3 client from a given configuration. @@ -112,7 +114,7 @@ func New(cfg Config) (*Client, error) { // service interface implementations and do not need connection management. func NewCtxClient(ctx context.Context) *Client { cctx, cancel := context.WithCancel(ctx) - return &Client{ctx: cctx, cancel: cancel} + return &Client{ctx: cctx, cancel: cancel, lgMu: new(sync.RWMutex), lg: zap.NewNop()} } // NewFromURL creates a new etcdv3 client from a URL. @@ -125,6 +127,23 @@ func NewFromURLs(urls []string) (*Client, error) { return New(Config{Endpoints: urls}) } +// WithLogger sets a logger +func (c *Client) WithLogger(lg *zap.Logger) *Client { + c.lgMu.Lock() + c.lg = lg + c.lgMu.Unlock() + return c +} + +// GetLogger gets the logger. +// NOTE: This method is for internal use of etcd-client library and should not be used as general-purpose logger. +func (c *Client) GetLogger() *zap.Logger { + c.lgMu.RLock() + l := c.lg + c.lgMu.RUnlock() + return l +} + // Close shuts down the client's etcd connections. func (c *Client) Close() error { c.cancel() @@ -423,6 +442,7 @@ func newClient(cfg *Config) (*Client, error) { cancel: cancel, mu: new(sync.RWMutex), callOpts: defaultCallOpts, + lgMu: new(sync.RWMutex), } lcfg := logutil.DefaultZapLoggerConfig diff --git a/clientv3/integration/naming/endpoints_test.go b/clientv3/integration/naming/endpoints_test.go index be59479eb9f..76748032b3f 100644 --- a/clientv3/integration/naming/endpoints_test.go +++ b/clientv3/integration/naming/endpoints_test.go @@ -27,8 +27,6 @@ import ( ) func TestEndpointManager(t *testing.T) { - t.Skip("Not implemented yet") - defer testutil.AfterTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) @@ -54,10 +52,10 @@ func TestEndpointManager(t *testing.T) { us := <-w if us == nil { - t.Fatal("failed to get update", err) + t.Fatal("failed to get update") } - wu := endpoints.Update{ + wu := &endpoints.Update{ Op: endpoints.Add, Key: "foo/a1", Endpoint: e1, @@ -69,21 +67,21 @@ func TestEndpointManager(t *testing.T) { err = em.DeleteEndpoint(context.TODO(), "foo/a1") if err != nil { - t.Fatalf("failed to udpate %v", err) + t.Fatalf("failed to update %v", err) } us = <-w - if err != nil { - t.Fatalf("failed to get udpate %v", err) + if us == nil { + t.Fatal("failed to get update") } - wu = endpoints.Update{ + wu = &endpoints.Update{ Op: endpoints.Delete, Key: "foo/a1", } - if !reflect.DeepEqual(us, wu) { - t.Fatalf("up = %#v, want %#v", us[1], wu) + if !reflect.DeepEqual(us[0], wu) { + t.Fatalf("up = %#v, want %#v", us[0], wu) } } @@ -91,8 +89,6 @@ func TestEndpointManager(t *testing.T) { // correctly with multiple hosts and correctly receive multiple // updates in a single revision. func TestEndpointManagerAtomicity(t *testing.T) { - t.Skip("Not implemented yet") - defer testutil.AfterTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) @@ -133,3 +129,84 @@ func TestEndpointManagerAtomicity(t *testing.T) { t.Fatalf("expected two delete updates, got %+v", updates) } } + +func TestEndpointManagerCRUD(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + em, err := endpoints.NewManager(clus.RandClient(), "foo") + if err != nil { + t.Fatal("failed to create EndpointManager", err) + } + + // Add + k1 := "foo/a1" + e1 := endpoints.Endpoint{Addr: "127.0.0.1", Metadata: "metadata1"} + err = em.AddEndpoint(context.TODO(), k1, e1) + if err != nil { + t.Fatal("failed to add", k1, err) + } + + k2 := "foo/a2" + e2 := endpoints.Endpoint{Addr: "127.0.0.2", Metadata: "metadata2"} + err = em.AddEndpoint(context.TODO(), k2, e2) + if err != nil { + t.Fatal("failed to add", k2, err) + } + + eps, err := em.List(context.TODO()) + if err != nil { + t.Fatal("failed to list foo") + } + if len(eps) != 2 { + t.Fatalf("unexpected the number of endpoints: %d", len(eps)) + } + if !reflect.DeepEqual(eps[k1], e1) { + t.Fatalf("unexpected endpoints: %s", k1) + } + if !reflect.DeepEqual(eps[k2], e2) { + t.Fatalf("unexpected endpoints: %s", k2) + } + + // Delete + err = em.DeleteEndpoint(context.TODO(), k1) + if err != nil { + t.Fatal("failed to delete", k2, err) + } + + eps, err = em.List(context.TODO()) + if err != nil { + t.Fatal("failed to list foo") + } + if len(eps) != 1 { + t.Fatalf("unexpected the number of endpoints: %d", len(eps)) + } + if !reflect.DeepEqual(eps[k2], e2) { + t.Fatalf("unexpected endpoints: %s", k2) + } + + // Update + k3 := "foo/a3" + e3 := endpoints.Endpoint{Addr: "127.0.0.3", Metadata: "metadata3"} + updates := []*endpoints.UpdateWithOpts{ + {Update: endpoints.Update{Op: endpoints.Add, Key: k3, Endpoint: e3}}, + {Update: endpoints.Update{Op: endpoints.Delete, Key: k2}}, + } + err = em.Update(context.TODO(), updates) + if err != nil { + t.Fatal("failed to update", err) + } + + eps, err = em.List(context.TODO()) + if err != nil { + t.Fatal("failed to list foo") + } + if len(eps) != 1 { + t.Fatalf("unexpected the number of endpoints: %d", len(eps)) + } + if !reflect.DeepEqual(eps[k3], e3) { + t.Fatalf("unexpected endpoints: %s", k3) + } +} diff --git a/clientv3/integration/naming/resolver_test.go b/clientv3/integration/naming/resolver_test.go index ecb90d59108..85951b56808 100644 --- a/clientv3/integration/naming/resolver_test.go +++ b/clientv3/integration/naming/resolver_test.go @@ -15,56 +15,112 @@ package naming import ( + "bytes" "context" "testing" + "time" "google.golang.org/grpc" + testpb "google.golang.org/grpc/test/grpc_testing" "go.etcd.io/etcd/clientv3/naming/endpoints" "go.etcd.io/etcd/clientv3/naming/resolver" "go.etcd.io/etcd/integration" + grpctest "go.etcd.io/etcd/pkg/grpc_testing" "go.etcd.io/etcd/pkg/testutil" ) // This test mimics scenario described in grpc_naming.md doc. func TestEtcdGrpcResolver(t *testing.T) { - t.Skip("Not implemented yet") - defer testutil.AfterTest(t) + s1PayloadBody := []byte{'1'} + s1 := newDummyStubServer(s1PayloadBody) + if err := s1.Start(nil); err != nil { + t.Fatal("failed to start dummy grpc server (s1)", err) + } + defer s1.Stop() - // s1 := // TODO: Dummy GRPC service listening on 127.0.0.1:20000 - // s2 := // TODO: Dummy GRPC service listening on 127.0.0.1:20001 + s2PayloadBody := []byte{'2'} + s2 := newDummyStubServer(s2PayloadBody) + if err := s2.Start(nil); err != nil { + t.Fatal("failed to start dummy grpc server (s2)", err) + } + defer s2.Stop() - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) - em, err := endpoints.NewManager(clus.RandClient(), "foo") + em, err := endpoints.NewManager(clus.Client(0), "foo") if err != nil { t.Fatal("failed to create EndpointManager", err) } - e1 := endpoints.Endpoint{Addr: "127.0.0.1:20000"} - e2 := endpoints.Endpoint{Addr: "127.0.0.1:20001"} + e1 := endpoints.Endpoint{Addr: s1.Addr()} + e2 := endpoints.Endpoint{Addr: s2.Addr()} err = em.AddEndpoint(context.TODO(), "foo/e1", e1) if err != nil { t.Fatal("failed to add foo", err) } - etcdResolver, err := resolver.NewBuilder(clus.RandClient()) - conn, err := grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver)) + b, err := resolver.NewBuilder(clus.Client(1)) if err != nil { - t.Fatal("failed to connect to foo (e1)", err) + t.Fatal("failed to new resolver builder", err) } + conn, err := grpc.Dial("etcd:///foo", grpc.WithInsecure(), grpc.WithResolvers(b)) + if err != nil { + t.Fatal("failed to connect to foo", err) + } + defer conn.Close() - // TODO: send requests to conn, ensure s1 received it. + c := testpb.NewTestServiceClient(conn) + resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true)) + if err != nil { + t.Fatal("failed to invoke rpc to foo (e1)", err) + } + if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s1PayloadBody) { + t.Fatalf("unexpected response from foo (e1): %s", resp.GetPayload().GetBody()) + } em.DeleteEndpoint(context.TODO(), "foo/e1") em.AddEndpoint(context.TODO(), "foo/e2", e2) - // TODO: Send requests to conn and make sure s2 receive it. - // Might require restarting s1 to break the existing (open) connection. + // We use a loop with deadline of 30s to avoid test getting flake + // as it's asynchronous for gRPC Client to update underlying connections. + maxRetries := 300 + retryPeriod := 100 * time.Millisecond + retries := 0 + for { + time.Sleep(retryPeriod) + retries++ + + resp, err = c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}) + if err != nil { + if retries < maxRetries { + continue + } + t.Fatal("failed to invoke rpc to foo (e2)", err) + } + if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s2PayloadBody) { + if retries < maxRetries { + continue + } + t.Fatalf("unexpected response from foo (e2): %s", resp.GetPayload().GetBody()) + } + break + } +} - conn.GetState() // this line is to avoid compiler warning that conn is unused. +func newDummyStubServer(body []byte) *grpctest.StubServer { + return &grpctest.StubServer{ + UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{ + Payload: &testpb.Payload{ + Type: testpb.PayloadType_COMPRESSABLE, + Body: body, + }, + }, nil + }, + } } diff --git a/clientv3/naming/endpoints/endpoints.go b/clientv3/naming/endpoints/endpoints.go index f0afaf31ea7..6621037b284 100644 --- a/clientv3/naming/endpoints/endpoints.go +++ b/clientv3/naming/endpoints/endpoints.go @@ -53,7 +53,7 @@ type Update struct { } // WatchChannel is used to deliver notifications about endpoints updates. -type WatchChannel chan []*Update +type WatchChannel <-chan []*Update // Key2EndpointMap maps etcd key into struct describing the endpoint. type Key2EndpointMap map[string]Endpoint diff --git a/clientv3/naming/endpoints/endpoints_impl.go b/clientv3/naming/endpoints/endpoints_impl.go index beeb58eea41..ffba6a925d5 100644 --- a/clientv3/naming/endpoints/endpoints_impl.go +++ b/clientv3/naming/endpoints/endpoints_impl.go @@ -18,37 +18,66 @@ package endpoints import ( "context" - "fmt" + "encoding/json" + "errors" + "strings" + + "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/naming/endpoints/internal" ) type endpointManager struct { - // TODO: To be implemented, tracked by: https://github.com/etcd-io/etcd/issues/12652 + client *clientv3.Client + target string } func NewManager(client *clientv3.Client, target string) (Manager, error) { - // To be implemented (https://github.com/etcd-io/etcd/issues/12652) - return nil, fmt.Errorf("Not implemented yet") + if client == nil { + return nil, errors.New("invalid etcd client") + } + + if target == "" { + return nil, errors.New("invalid target") + } + + em := &endpointManager{ + client: client, + target: target, + } + return em, nil } -func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) error { - // TODO: For loop in a single transaction: - internalUpdate := &internal.Update{} // translate UpdateWithOpts into json format. - switch internalUpdate.Op { - //case internal.Add: - // var v []byte - // if v, err = json.Marshal(internalUpdate); err != nil { - // return status.Error(codes.InvalidArgument, err.Error()) - // } - // _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...) - //case internal.Delete: - // _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...) - //default: - // return status.Error(codes.InvalidArgument, "naming: bad naming op") +func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) (err error) { + ops := make([]clientv3.Op, 0, len(updates)) + for _, update := range updates { + if !strings.HasPrefix(update.Key, m.target+"/") { + return status.Errorf(codes.InvalidArgument, "endpoints: endpoint key should be prefixed with '%s/' got: '%s'", m.target, update.Key) + } + switch update.Op { + case Add: + internalUpdate := &internal.Update{ + Op: internal.Add, + Addr: update.Endpoint.Addr, + Metadata: update.Endpoint.Metadata, + } + + var v []byte + if v, err = json.Marshal(internalUpdate); err != nil { + return status.Error(codes.InvalidArgument, err.Error()) + } + ops = append(ops, clientv3.OpPut(update.Key, string(v), update.Opts...)) + case Delete: + ops = append(ops, clientv3.OpDelete(update.Key, update.Opts...)) + default: + return status.Error(codes.InvalidArgument, "endpoints: bad update op") + } } - return fmt.Errorf("Not implemented yet") + _, err = m.client.KV.Txn(ctx).Then(ops...).Commit() + return err } func (m *endpointManager) AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error { @@ -60,76 +89,98 @@ func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts . } func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) { - return nil, fmt.Errorf("Not implemented yet") - - // TODO: Implementation to be inspired by: - // Next gets the next set of updates from the etcd resolver. - //// Calls to Next should be serialized; concurrent calls are not safe since - //// there is no way to reconcile the update ordering. - //func (gw *gRPCWatcher) Next() ([]*naming.Update, error) { - // if gw.wch == nil { - // // first Next() returns all addresses - // return gw.firstNext() - // } - // if gw.err != nil { - // return nil, gw.err - // } - // - // // process new events on target/* - // wr, ok := <-gw.wch - // if !ok { - // gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error()) - // return nil, gw.err - // } - // if gw.err = wr.Err(); gw.err != nil { - // return nil, gw.err - // } - // - // updates := make([]*naming.Update, 0, len(wr.Events)) - // for _, e := range wr.Events { - // var jupdate naming.Update - // var err error - // switch e.Type { - // case etcd.EventTypePut: - // err = json.Unmarshal(e.Kv.Value, &jupdate) - // jupdate.Op = naming.Add - // case etcd.EventTypeDelete: - // err = json.Unmarshal(e.PrevKv.Value, &jupdate) - // jupdate.Op = naming.Delete - // default: - // continue - // } - // if err == nil { - // updates = append(updates, &jupdate) - // } - // } - // return updates, nil - //} - // - //func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) { - // // Use serialized request so resolution still works if the target etcd - // // server is partitioned away from the quorum. - // resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable()) - // if gw.err = err; err != nil { - // return nil, err - // } - // - // updates := make([]*naming.Update, 0, len(resp.Kvs)) - // for _, kv := range resp.Kvs { - // var jupdate naming.Update - // if err := json.Unmarshal(kv.Value, &jupdate); err != nil { - // continue - // } - // updates = append(updates, &jupdate) - // } - // - // opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()} - // gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...) - // return updates, nil - //} + resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable()) + if err != nil { + return nil, err + } + + lg := m.client.GetLogger() + initUpdates := make([]*Update, 0, len(resp.Kvs)) + for _, kv := range resp.Kvs { + var iup internal.Update + if err := json.Unmarshal(kv.Value, &iup); err != nil { + lg.Warn("unmarshal endpoint update failed", zap.String("key", string(kv.Key)), zap.Error(err)) + continue + } + up := &Update{ + Op: Add, + Key: string(kv.Key), + Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}, + } + initUpdates = append(initUpdates, up) + } + + upch := make(chan []*Update, 1) + if len(initUpdates) > 0 { + upch <- initUpdates + } + go m.watch(ctx, resp.Header.Revision+1, upch) + return upch, nil +} + +func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Update) { + defer close(upch) + + lg := m.client.GetLogger() + opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()} + wch := m.client.Watch(ctx, m.target, opts...) + for { + select { + case <-ctx.Done(): + return + case wresp, ok := <-wch: + if !ok { + lg.Warn("watch closed", zap.String("target", m.target)) + return + } + if wresp.Err() != nil { + lg.Warn("watch failed", zap.String("target", m.target), zap.Error(wresp.Err())) + return + } + + deltaUps := make([]*Update, 0, len(wresp.Events)) + for _, e := range wresp.Events { + var iup internal.Update + var err error + var op Operation + switch e.Type { + case clientv3.EventTypePut: + err = json.Unmarshal(e.Kv.Value, &iup) + op = Add + if err != nil { + lg.Warn("unmarshal endpoint update failed", zap.String("key", string(e.Kv.Key)), zap.Error(err)) + continue + } + case clientv3.EventTypeDelete: + iup = internal.Update{Op: internal.Delete} + op = Delete + default: + continue + } + up := &Update{Op: op, Key: string(e.Kv.Key), Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}} + deltaUps = append(deltaUps, up) + } + if len(deltaUps) > 0 { + upch <- deltaUps + } + } + } } func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) { - // TODO: Implementation - return nil, fmt.Errorf("Not implemented yet") + resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable()) + if err != nil { + return nil, err + } + + eps := make(Key2EndpointMap) + for _, kv := range resp.Kvs { + var iup internal.Update + if err := json.Unmarshal(kv.Value, &iup); err != nil { + continue + } + + eps[string(kv.Key)] = Endpoint{Addr: iup.Addr, Metadata: iup.Metadata} + } + return eps, nil } diff --git a/clientv3/naming/grpc.go b/clientv3/naming/grpc.go deleted file mode 100644 index 7eed84bfb18..00000000000 --- a/clientv3/naming/grpc.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package naming - -import ( - "context" - "encoding/json" - "fmt" - - etcd "go.etcd.io/etcd/clientv3" - - "google.golang.org/grpc/codes" - "google.golang.org/grpc/naming" - "google.golang.org/grpc/status" -) - -var ErrWatcherClosed = fmt.Errorf("naming: watch closed") - -// GRPCResolver creates a grpc.Watcher for a target to track its resolution changes. -type GRPCResolver struct { - // Client is an initialized etcd client. - Client *etcd.Client -} - -func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) { - switch nm.Op { - case naming.Add: - var v []byte - if v, err = json.Marshal(nm); err != nil { - return status.Error(codes.InvalidArgument, err.Error()) - } - _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...) - case naming.Delete: - _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...) - default: - return status.Error(codes.InvalidArgument, "naming: bad naming op") - } - return err -} - -func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) { - ctx, cancel := context.WithCancel(context.Background()) - w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel} - return w, nil -} - -type gRPCWatcher struct { - c *etcd.Client - target string - ctx context.Context - cancel context.CancelFunc - wch etcd.WatchChan - err error -} - -// Next gets the next set of updates from the etcd resolver. -// Calls to Next should be serialized; concurrent calls are not safe since -// there is no way to reconcile the update ordering. -func (gw *gRPCWatcher) Next() ([]*naming.Update, error) { - if gw.wch == nil { - // first Next() returns all addresses - return gw.firstNext() - } - if gw.err != nil { - return nil, gw.err - } - - // process new events on target/* - wr, ok := <-gw.wch - if !ok { - gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error()) - return nil, gw.err - } - if gw.err = wr.Err(); gw.err != nil { - return nil, gw.err - } - - updates := make([]*naming.Update, 0, len(wr.Events)) - for _, e := range wr.Events { - var jupdate naming.Update - var err error - switch e.Type { - case etcd.EventTypePut: - err = json.Unmarshal(e.Kv.Value, &jupdate) - jupdate.Op = naming.Add - case etcd.EventTypeDelete: - err = json.Unmarshal(e.PrevKv.Value, &jupdate) - jupdate.Op = naming.Delete - default: - continue - } - if err == nil { - updates = append(updates, &jupdate) - } - } - return updates, nil -} - -func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) { - // Use serialized request so resolution still works if the target etcd - // server is partitioned away from the quorum. - resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable()) - if gw.err = err; err != nil { - return nil, err - } - - updates := make([]*naming.Update, 0, len(resp.Kvs)) - for _, kv := range resp.Kvs { - var jupdate naming.Update - if err := json.Unmarshal(kv.Value, &jupdate); err != nil { - continue - } - updates = append(updates, &jupdate) - } - - opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()} - gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...) - return updates, nil -} - -func (gw *gRPCWatcher) Close() { gw.cancel() } diff --git a/clientv3/naming/grpc_test.go b/clientv3/naming/grpc_test.go deleted file mode 100644 index 0041a89a807..00000000000 --- a/clientv3/naming/grpc_test.go +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package naming_test - -import ( - "context" - "encoding/json" - "reflect" - "testing" - - etcd "go.etcd.io/etcd/clientv3" - namingv3 "go.etcd.io/etcd/clientv3/naming" - "go.etcd.io/etcd/integration" - "go.etcd.io/etcd/pkg/testutil" - - "google.golang.org/grpc/naming" -) - -func TestGRPCResolver(t *testing.T) { - defer testutil.AfterTest(t) - - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer clus.Terminate(t) - - r := namingv3.GRPCResolver{ - Client: clus.RandClient(), - } - - w, err := r.Resolve("foo") - if err != nil { - t.Fatal("failed to resolve foo", err) - } - defer w.Close() - - addOp := naming.Update{Op: naming.Add, Addr: "127.0.0.1", Metadata: "metadata"} - err = r.Update(context.TODO(), "foo", addOp) - if err != nil { - t.Fatal("failed to add foo", err) - } - - us, err := w.Next() - if err != nil { - t.Fatal("failed to get udpate", err) - } - - wu := &naming.Update{ - Op: naming.Add, - Addr: "127.0.0.1", - Metadata: "metadata", - } - - if !reflect.DeepEqual(us[0], wu) { - t.Fatalf("up = %#v, want %#v", us[0], wu) - } - - delOp := naming.Update{Op: naming.Delete, Addr: "127.0.0.1"} - err = r.Update(context.TODO(), "foo", delOp) - if err != nil { - t.Fatalf("failed to udpate %v", err) - } - - us, err = w.Next() - if err != nil { - t.Fatalf("failed to get udpate %v", err) - } - - wu = &naming.Update{ - Op: naming.Delete, - Addr: "127.0.0.1", - Metadata: "metadata", - } - - if !reflect.DeepEqual(us[0], wu) { - t.Fatalf("up = %#v, want %#v", us[0], wu) - } -} - -// TestGRPCResolverMulti ensures the resolver will initialize -// correctly with multiple hosts and correctly receive multiple -// updates in a single revision. -func TestGRPCResolverMulti(t *testing.T) { - defer testutil.AfterTest(t) - - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer clus.Terminate(t) - c := clus.RandClient() - - v, verr := json.Marshal(naming.Update{Addr: "127.0.0.1", Metadata: "md"}) - if verr != nil { - t.Fatal(verr) - } - if _, err := c.Put(context.TODO(), "foo/host", string(v)); err != nil { - t.Fatal(err) - } - if _, err := c.Put(context.TODO(), "foo/host2", string(v)); err != nil { - t.Fatal(err) - } - - r := namingv3.GRPCResolver{c} - - w, err := r.Resolve("foo") - if err != nil { - t.Fatal("failed to resolve foo", err) - } - defer w.Close() - - updates, nerr := w.Next() - if nerr != nil { - t.Fatal(nerr) - } - if len(updates) != 2 { - t.Fatalf("expected two updates, got %+v", updates) - } - - _, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit() - if err != nil { - t.Fatal(err) - } - - updates, nerr = w.Next() - if nerr != nil { - t.Fatal(nerr) - } - if len(updates) != 2 || (updates[0].Op != naming.Delete && updates[1].Op != naming.Delete) { - t.Fatalf("expected two updates, got %+v", updates) - } -} diff --git a/clientv3/naming/resolver/resolver.go b/clientv3/naming/resolver/resolver.go index f3bace6e5c0..f2da05b4e63 100644 --- a/clientv3/naming/resolver/resolver.go +++ b/clientv3/naming/resolver/resolver.go @@ -15,25 +15,107 @@ package resolver import ( - "google.golang.org/grpc/resolver" + "context" + "sync" + + "google.golang.org/grpc/codes" + gresolver "google.golang.org/grpc/resolver" + "google.golang.org/grpc/status" "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/clientv3/naming/endpoints" ) type builder struct { - // ... + c *clientv3.Client } -func (b builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - // To be implemented... - // Using endpoints.NewWatcher() to subscribe for endpoints changes. - return nil, nil +func (b builder) Build(target gresolver.Target, cc gresolver.ClientConn, opts gresolver.BuildOptions) (gresolver.Resolver, error) { + r := &resolver{ + c: b.c, + target: target.Endpoint, + cc: cc, + } + r.ctx, r.cancel = context.WithCancel(context.Background()) + + em, err := endpoints.NewManager(r.c, r.target) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "resolver: failed to new endpoint manager: %s", err) + } + r.wch, err = em.NewWatchChannel(r.ctx) + if err != nil { + return nil, status.Errorf(codes.Internal, "resolver: failed to new watch channer: %s", err) + } + + r.wg.Add(1) + go r.watch() + return r, nil } func (b builder) Scheme() string { return "etcd" } -func NewBuilder(client *clientv3.Client) (resolver.Builder, error) { - return builder{}, nil +// NewBuilder creates a resolver builder. +func NewBuilder(client *clientv3.Client) (gresolver.Builder, error) { + return builder{c: client}, nil +} + +type resolver struct { + c *clientv3.Client + target string + cc gresolver.ClientConn + wch endpoints.WatchChannel + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +func (r *resolver) watch() { + defer r.wg.Done() + + allUps := make(map[string]*endpoints.Update) + for { + select { + case <-r.ctx.Done(): + return + case ups, ok := <-r.wch: + if !ok { + return + } + + for _, up := range ups { + switch up.Op { + case endpoints.Add: + allUps[up.Key] = up + case endpoints.Delete: + delete(allUps, up.Key) + } + } + + addrs := convertToGRPCAddress(allUps) + r.cc.UpdateState(gresolver.State{Addresses: addrs}) + } + } +} + +func convertToGRPCAddress(ups map[string]*endpoints.Update) []gresolver.Address { + var addrs []gresolver.Address + for _, up := range ups { + addr := gresolver.Address{ + Addr: up.Endpoint.Addr, + Metadata: up.Endpoint.Metadata, + } + addrs = append(addrs, addr) + } + return addrs +} + +// ResolveNow is a no-op here. +// It's just a hint, resolver can ignore this if it's not necessary. +func (r *resolver) ResolveNow(gresolver.ResolveNowOptions) {} + +func (r *resolver) Close() { + r.cancel() + r.wg.Wait() } diff --git a/pkg/grpc_testing/stub_server.go b/pkg/grpc_testing/stub_server.go new file mode 100644 index 00000000000..b2892ef0d97 --- /dev/null +++ b/pkg/grpc_testing/stub_server.go @@ -0,0 +1,100 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpc_testing + +import ( + "context" + "fmt" + "net" + + "google.golang.org/grpc" + testpb "google.golang.org/grpc/test/grpc_testing" +) + +// StubServer is borrowed from the interal package of grpc-go. +// See https://github.com/grpc/grpc-go/blob/master/internal/stubserver/stubserver.go +// Since it cannot be imported directly, we have to copy and paste it here, +// and useless code for our testing is removed. + +// StubServer is a server that is easy to customize within individual test +// cases. +type StubServer struct { + // Guarantees we satisfy this interface; panics if unimplemented methods are called. + testpb.TestServiceServer + + EmptyCallF func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) + UnaryCallF func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) + FullDuplexCallF func(stream testpb.TestService_FullDuplexCallServer) error + + s *grpc.Server + + // Network and Address are parameters for Listen. Defaults will be used if these are empty before Start. + Network string + Address string + + cleanups []func() // Lambdas executed in Stop(); populated by Start(). +} + +// EmptyCall is the handler for testpb.EmptyCall. +func (ss *StubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return ss.EmptyCallF(ctx, in) +} + +// UnaryCall is the handler for testpb.UnaryCall. +func (ss *StubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return ss.UnaryCallF(ctx, in) +} + +// FullDuplexCall is the handler for testpb.FullDuplexCall. +func (ss *StubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { + return ss.FullDuplexCallF(stream) +} + +// Start starts the server and creates a client connected to it. +func (ss *StubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error { + if ss.Network == "" { + ss.Network = "tcp" + } + if ss.Address == "" { + ss.Address = "localhost:0" + } + + lis, err := net.Listen(ss.Network, ss.Address) + if err != nil { + return fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err) + } + ss.Address = lis.Addr().String() + ss.cleanups = append(ss.cleanups, func() { lis.Close() }) + + s := grpc.NewServer(sopts...) + testpb.RegisterTestServiceServer(s, ss) + go s.Serve(lis) + ss.cleanups = append(ss.cleanups, s.Stop) + ss.s = s + + return nil +} + +// Stop stops ss and cleans up all resources it consumed. +func (ss *StubServer) Stop() { + for i := len(ss.cleanups) - 1; i >= 0; i-- { + ss.cleanups[i]() + } +} + +// Addr gets the address the server listening on. +func (ss *StubServer) Addr() string { + return ss.Address +} diff --git a/proxy/grpcproxy/cluster.go b/proxy/grpcproxy/cluster.go index 7e5059cfb31..338827d4644 100644 --- a/proxy/grpcproxy/cluster.go +++ b/proxy/grpcproxy/cluster.go @@ -22,12 +22,10 @@ import ( "sync" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/clientv3/naming" - "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + "go.etcd.io/etcd/clientv3/naming/endpoints" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "golang.org/x/time/rate" - gnaming "google.golang.org/grpc/naming" ) // allow maximum 1 retry per second @@ -36,35 +34,46 @@ const resolveRetryRate = 1 type clusterProxy struct { clus clientv3.Cluster ctx context.Context - gr *naming.GRPCResolver // advertise client URL advaddr string prefix string + em endpoints.Manager + umu sync.RWMutex - umap map[string]gnaming.Update + umap map[string]endpoints.Endpoint } // NewClusterProxy takes optional prefix to fetch grpc-proxy member endpoints. // The returned channel is closed when there is grpc-proxy endpoint registered // and the client's context is canceled so the 'register' loop returns. +// TODO: Expand the API to report creation errors func NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) { + var em endpoints.Manager + if advaddr != "" && prefix != "" { + var err error + if em, err = endpoints.NewManager(c, prefix); err != nil { + plog.Errorf("failed to provision endpointsManager %q (%v)", prefix, err) + return nil, nil + } + } + cp := &clusterProxy{ clus: c.Cluster, ctx: c.Ctx(), - gr: &naming.GRPCResolver{Client: c}, advaddr: advaddr, prefix: prefix, - umap: make(map[string]gnaming.Update), + umap: make(map[string]endpoints.Endpoint), + em: em, } donec := make(chan struct{}) - if advaddr != "" && prefix != "" { + if em != nil { go func() { defer close(donec) - cp.resolve(prefix) + cp.establishEndpointWatch(prefix) }() return cp, donec } @@ -73,38 +82,36 @@ func NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.Clus return cp, donec } -func (cp *clusterProxy) resolve(prefix string) { +func (cp *clusterProxy) establishEndpointWatch(prefix string) { rm := rate.NewLimiter(rate.Limit(resolveRetryRate), resolveRetryRate) for rm.Wait(cp.ctx) == nil { - wa, err := cp.gr.Resolve(prefix) + wc, err := cp.em.NewWatchChannel(cp.ctx) if err != nil { - plog.Warningf("failed to resolve %q (%v)", prefix, err) + plog.Warningf("failed to establish endpoint watch %q (%v)", prefix, err) continue } - cp.monitor(wa) + cp.monitor(wc) } } -func (cp *clusterProxy) monitor(wa gnaming.Watcher) { - for cp.ctx.Err() == nil { - ups, err := wa.Next() - if err != nil { - plog.Warningf("clusterProxy watcher error (%v)", err) - if rpctypes.ErrorDesc(err) == naming.ErrWatcherClosed.Error() { - return - } - } - - cp.umu.Lock() - for i := range ups { - switch ups[i].Op { - case gnaming.Add: - cp.umap[ups[i].Addr] = *ups[i] - case gnaming.Delete: - delete(cp.umap, ups[i].Addr) +func (cp *clusterProxy) monitor(wc endpoints.WatchChannel) { + for { + select { + case <-cp.ctx.Done(): + plog.Info("watching endpoints interrupted (%v)", cp.ctx.Err()) + return + case updates := <-wc: + cp.umu.Lock() + for _, up := range updates { + switch up.Op { + case endpoints.Add: + cp.umap[up.Endpoint.Addr] = up.Endpoint + case endpoints.Delete: + delete(cp.umap, up.Endpoint.Addr) + } } + cp.umu.Unlock() } - cp.umu.Unlock() } } diff --git a/proxy/grpcproxy/register.go b/proxy/grpcproxy/register.go index ba628c3ebca..b02faeb83ba 100644 --- a/proxy/grpcproxy/register.go +++ b/proxy/grpcproxy/register.go @@ -20,10 +20,9 @@ import ( "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" - "go.etcd.io/etcd/clientv3/naming" + "go.etcd.io/etcd/clientv3/naming/endpoints" "golang.org/x/time/rate" - gnaming "google.golang.org/grpc/naming" ) // allow maximum 1 retry per second @@ -67,8 +66,12 @@ func registerSession(c *clientv3.Client, prefix string, addr string, ttl int) (* return nil, err } - gr := &naming.GRPCResolver{Client: c} - if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr, Metadata: getMeta()}, clientv3.WithLease(ss.Lease())); err != nil { + em, err := endpoints.NewManager(c, prefix) + if err != nil { + return nil, err + } + endpoint := endpoints.Endpoint{Addr: addr, Metadata: getMeta()} + if err = em.AddEndpoint(c.Ctx(), prefix+"/"+addr, endpoint, clientv3.WithLease(ss.Lease())); err != nil { return nil, err } diff --git a/proxy/grpcproxy/register_test.go b/proxy/grpcproxy/register_test.go index 33b01547c0e..0a15bcda563 100644 --- a/proxy/grpcproxy/register_test.go +++ b/proxy/grpcproxy/register_test.go @@ -19,11 +19,9 @@ import ( "time" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/clientv3/naming" + "go.etcd.io/etcd/clientv3/naming/endpoints" "go.etcd.io/etcd/integration" "go.etcd.io/etcd/pkg/testutil" - - gnaming "google.golang.org/grpc/naming" ) func TestRegister(t *testing.T) { @@ -35,26 +33,16 @@ func TestRegister(t *testing.T) { paddr := clus.Members[0].GRPCAddr() testPrefix := "test-name" - wa := createWatcher(t, cli, testPrefix) - ups, err := wa.Next() - if err != nil { - t.Fatal(err) - } - if len(ups) != 0 { - t.Fatalf("len(ups) expected 0, got %d (%v)", len(ups), ups) - } + wa := mustCreateWatcher(t, cli, testPrefix) donec := Register(cli, testPrefix, paddr, 5) - ups, err = wa.Next() - if err != nil { - t.Fatal(err) - } + ups := <-wa if len(ups) != 1 { t.Fatalf("len(ups) expected 1, got %d (%v)", len(ups), ups) } - if ups[0].Addr != paddr { - t.Fatalf("ups[0].Addr expected %q, got %q", paddr, ups[0].Addr) + if ups[0].Endpoint.Addr != paddr { + t.Fatalf("ups[0].Addr expected %q, got %q", paddr, ups[0].Endpoint.Addr) } cli.Close() @@ -66,11 +54,14 @@ func TestRegister(t *testing.T) { } } -func createWatcher(t *testing.T, c *clientv3.Client, prefix string) gnaming.Watcher { - gr := &naming.GRPCResolver{Client: c} - watcher, err := gr.Resolve(prefix) +func mustCreateWatcher(t *testing.T, c *clientv3.Client, prefix string) endpoints.WatchChannel { + em, err := endpoints.NewManager(c, prefix) + if err != nil { + t.Fatalf("failed to create endpoints.Manager: %v", err) + } + wc, err := em.NewWatchChannel(c.Ctx()) if err != nil { t.Fatalf("failed to resolve %q (%v)", prefix, err) } - return watcher + return wc } diff --git a/test b/test index 31a686dbe39..d90f1133e3f 100755 --- a/test +++ b/test @@ -192,7 +192,8 @@ function integration_pass { INTEGTESTPKG=("${REPO_PATH}/integration" "${REPO_PATH}/client/integration" "${REPO_PATH}/clientv3/integration/..." - "${REPO_PATH}/contrib/raftexample") + "${REPO_PATH}/contrib/raftexample" + "${REPO_PATH}/proxy/grpcproxy") else INTEGTESTPKG=("${TEST[@]}") fi @@ -205,6 +206,7 @@ function integration_extra { go test -timeout 25m -v ${RACE} -cpu "${TEST_CPUS}" "$@" "${REPO_PATH}/clientv3/integration/..." go test -timeout 1m -v -cpu "${TEST_CPUS}" "$@" "${REPO_PATH}/contrib/raftexample" go test -timeout 5m -v ${RACE} -tags v2v3 "$@" "${REPO_PATH}/etcdserver/api/v2store" + go test -timeout 5m -v ${RACE} -cpu "${TEST_CPUS}" "$@" "${REPO_PATH}/proxy/grpcproxy" go test -timeout 1m -v ${RACE} -cpu "${TEST_CPUS}" -run=Example "$@" "${TEST[@]}" }