From 44fac5186037f43b5c9b25e73a479de066ff4af9 Mon Sep 17 00:00:00 2001 From: changxinzhang Date: Wed, 9 Aug 2023 21:35:22 -0500 Subject: [PATCH 1/7] first commit --- control_plane.go | 119 +++++++++++- go.mod | 56 +++++- go.sum | 14 -- pkg/controller/crd_watcher.go | 72 +++++-- pkg/controller/k8s_operator.go | 11 +- pkg/model/XDsConnection.go | 62 ++++++ pkg/model/model.go | 50 ++++- pkg/test/grpcClient.go | 46 +++++ pkg/transport/grpc/connection.go | 48 ++--- pkg/transport/grpc/server.go | 15 +- pkg/transport/grpc/xDSServer.go | 313 +++++++++++++++++++++++++++++++ pkg/util/nonce.go | 10 + pkg/util/set_util.go | 45 +++++ 13 files changed, 782 insertions(+), 79 deletions(-) create mode 100644 pkg/model/XDsConnection.go create mode 100644 pkg/test/grpcClient.go create mode 100644 pkg/transport/grpc/xDSServer.go create mode 100644 pkg/util/nonce.go create mode 100644 pkg/util/set_util.go diff --git a/control_plane.go b/control_plane.go index 73c31b7..c8af3ca 100644 --- a/control_plane.go +++ b/control_plane.go @@ -15,21 +15,29 @@ package opensergo import ( + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/opensergo/opensergo-control-plane/pkg/util" + "google.golang.org/protobuf/types/known/anypb" "log" "os" + "strconv" + "strings" "sync" "github.com/opensergo/opensergo-control-plane/pkg/controller" "github.com/opensergo/opensergo-control-plane/pkg/model" trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1" + "github.com/opensergo/opensergo-control-plane/pkg/transport/grpc" transport "github.com/opensergo/opensergo-control-plane/pkg/transport/grpc" "github.com/pkg/errors" ) -type ControlPlane struct { - operator *controller.KubernetesOperator - server *transport.Server +const delimiter = "/" +type ControlPlane struct { + operator *controller.KubernetesOperator + server *transport.Server + xdsServer *transport.DiscoveryServer protoDesc *trpb.ControlPlaneDesc mux sync.RWMutex @@ -38,12 +46,13 @@ type ControlPlane struct { func NewControlPlane() (*ControlPlane, error) { cp := &ControlPlane{} - operator, err := controller.NewKubernetesOperator(cp.sendMessage) + operator, err := controller.NewKubernetesOperator(cp.sendMessage, cp.pushXds) if err != nil { return nil, err } cp.server = transport.NewServer(uint32(10246), []model.SubscribeRequestHandler{cp.handleSubscribeRequest}) + cp.xdsServer = transport.NewDiscoveryServer(uint32(8002), []model.SubscribeXDsRequestHandler{cp.handleXDsSubscribeRequest}) cp.operator = operator hostname, herr := os.Hostname() @@ -67,6 +76,10 @@ func (c *ControlPlane) Start() error { if err != nil { return err } + err = c.xdsServer.Run() + if err != nil { + return err + } return nil } @@ -106,6 +119,58 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream }) } +// cxz +func (c *ControlPlane) handleXDsSubscribeRequest(req *discovery.DiscoveryRequest, con *model.XDsConnection) error { + if req.TypeUrl != model.ExtensionConfigType { + return nil + } + shouldRespond, delta := grpc.ShouldRespond(con, req) + + subscribed := delta.Subscribed + unsubscribed := delta.Unsubscribed + if !shouldRespond { + return nil + } + if len(subscribed) != 0 { + for resourcename := range subscribed { + request := strings.Split(resourcename, delimiter) + crdWatcher, err := c.operator.RegisterWatcher(model.SubscribeTarget{ + Namespace: request[0], + AppName: request[1], + Kind: request[2], + }) + // TODO: unhandled err + if err != nil { + continue + } + + c.xdsServer.AddConnectioonToMap(request[0], request[1], request[2], con) + + rules, version := crdWatcher.GetRules(model.NamespacedApp{ + Namespace: request[0], + App: request[1], + }) + if len(rules) > 0 { + err := c.pushXdsToStream(con, con.Watched(req.TypeUrl), version, rules) + if err != nil { + // TODO: log here + log.Printf("sendMessageToStream failed, err=%s\n", err.Error()) + } + } + + } + } + + if len(unsubscribed) != 0 { + for resourcename := range subscribed { + request := strings.Split(resourcename, delimiter) + c.xdsServer.RemoveConnectionFromMap(model.NamespacedApp{request[0], request[1]}, request[2], con.Identifier) + } + } + + return nil +} + func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error { // var labels []model.LabelKV // if request.Target.Labels != nil { @@ -135,7 +200,8 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent } continue } - _ = c.server.ConnectionManager().Add(request.Target.Namespace, request.Target.App, kind, transport.NewConnection(clientIdentifier, stream)) + curConnection := transport.NewConnection(clientIdentifier, stream) + _ = c.server.ConnectionManager().Add(request.Target.Namespace, request.Target.App, kind, curConnection, curConnection.Identifier()) // send if the watcher cache is not empty rules, version := crdWatcher.GetRules(model.NamespacedApp{ Namespace: request.Target.Namespace, @@ -160,3 +226,46 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent } return nil } + +// cxz +func (c *ControlPlane) pushXdsToStream(con *model.XDsConnection, w *model.WatchedResource, version int64, rules []*anypb.Any) error { + + res := &discovery.DiscoveryResponse{ + TypeUrl: w.TypeUrl, + VersionInfo: strconv.FormatInt(version, 10), + + // TODO: RECORD THE NONCE AND CHECK THE NONCE + Nonce: util.Nonce(), + Resources: rules, + } + // set nonce + con.Lock() + if con.WatchedResources[model.ExtensionConfigType] == nil { + con.WatchedResources[res.TypeUrl] = &model.WatchedResource{TypeUrl: res.TypeUrl} + } + con.WatchedResources[res.TypeUrl].NonceSent = res.Nonce + con.Unlock() + + return con.Stream.Send(res) +} + +func (c *ControlPlane) pushXds(namespace, app, kind string, rules []*anypb.Any, version int64) error { + connections, exists := c.xdsServer.XDSConnectionManeger.Get(namespace, app, kind) + if !exists || connections == nil { + return errors.New("There is no connection for this kind") + } + + for _, connection := range connections { + if connection == nil { + // TODO: log.Debug + continue + } + err := c.pushXdsToStream(connection, connection.WatchedResources[model.ExtensionConfigType], version, rules) + if err != nil { + // TODO: should not short-break here. Handle partial failure here. + return err + } + } + + return nil +} diff --git a/go.mod b/go.mod index 61ecf71..e4fd20b 100644 --- a/go.mod +++ b/go.mod @@ -1,24 +1,68 @@ module github.com/opensergo/opensergo-control-plane -go 1.14 +go 1.18 require ( github.com/alibaba/sentinel-golang v1.0.3 - github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc github.com/envoyproxy/go-control-plane v0.10.3-0.20221109183938-2935a23e638f github.com/envoyproxy/protoc-gen-validate v0.6.7 github.com/go-logr/logr v0.4.0 github.com/golang/protobuf v1.5.2 - github.com/json-iterator/go v1.1.12 // indirect - github.com/kr/pretty v0.3.0 // indirect + github.com/google/uuid v1.1.2 github.com/pkg/errors v0.9.1 - github.com/rogpeppe/go-internal v1.8.0 // indirect go.uber.org/atomic v1.7.0 google.golang.org/genproto v0.0.0-20220329172620-7be39ac1afc7 google.golang.org/grpc v1.51.0 google.golang.org/protobuf v1.28.1 - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect k8s.io/apimachinery v0.21.4 k8s.io/client-go v0.21.4 sigs.k8s.io/controller-runtime v0.9.7 ) + +require ( + cloud.google.com/go v0.65.0 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.1.1 // indirect + github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/evanphx/json-patch v4.11.0+incompatible // indirect + github.com/fsnotify/fsnotify v1.4.9 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect + github.com/google/go-cmp v0.5.7 // indirect + github.com/google/gofuzz v1.1.0 // indirect + github.com/googleapis/gnostic v0.5.5 // indirect + github.com/hashicorp/golang-lru v0.5.4 // indirect + github.com/imdario/mergo v0.3.12 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/kr/pretty v0.3.0 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/prometheus/client_golang v1.11.0 // indirect + github.com/prometheus/client_model v0.3.0 // indirect + github.com/prometheus/common v0.26.0 // indirect + github.com/prometheus/procfs v0.6.0 // indirect + github.com/rogpeppe/go-internal v1.8.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect + golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect + golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect + golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect + golang.org/x/text v0.4.0 // indirect + golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect + gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect + google.golang.org/appengine v1.6.7 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect + k8s.io/api v0.21.4 // indirect + k8s.io/apiextensions-apiserver v0.21.4 // indirect + k8s.io/component-base v0.21.4 // indirect + k8s.io/klog/v2 v2.8.0 // indirect + k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 // indirect + k8s.io/utils v0.0.0-20210802155522-efc7438f0176 // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.1.2 // indirect + sigs.k8s.io/yaml v1.2.0 // indirect +) diff --git a/go.sum b/go.sum index 736c4bc..b795a73 100644 --- a/go.sum +++ b/go.sum @@ -72,7 +72,6 @@ github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:l github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= -github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -85,7 +84,6 @@ github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -100,7 +98,6 @@ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XP github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc h1:PYXxkRUBGUMa5xgMVMDl62vEklZvKpVaxQeN9ie7Hfk= github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= @@ -142,7 +139,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= -github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/go-control-plane v0.10.3-0.20221109183938-2935a23e638f h1:WM6jD/5NGnwG5ZiZIZtYldAt0j+Q7xOvEEEMQtbuk5M= github.com/envoyproxy/go-control-plane v0.10.3-0.20221109183938-2935a23e638f/go.mod h1:ufpOdMVWU+v42FYQiIBUhSWglFcK3S1Ml8bbzLwkdcE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= @@ -539,7 +535,6 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= @@ -584,7 +579,6 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -620,8 +614,6 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.1-0.20200828183125-ce943fd02449/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -660,7 +652,6 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210224082022-3d97a244fca7/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= @@ -684,7 +675,6 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -741,10 +731,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210817190340-bfb29a6856f2/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= @@ -827,7 +815,6 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -934,7 +921,6 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= diff --git a/pkg/controller/crd_watcher.go b/pkg/controller/crd_watcher.go index bcc42ad..4f8864c 100644 --- a/pkg/controller/crd_watcher.go +++ b/pkg/controller/crd_watcher.go @@ -18,6 +18,8 @@ import ( "context" "log" "net/http" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "strconv" "sync" @@ -33,8 +35,6 @@ import ( "google.golang.org/protobuf/types/known/anypb" k8sApiError "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" ) // CRDWatcher watches a specific kind of CRD. @@ -49,14 +49,18 @@ type CRDWatcher struct { crdCache *CRDCache // subscribedList consists of all subscribed target of current kind of CRD. - subscribedList map[model.SubscribeTarget]bool + subscribedList map[model.SubscribeTarget]bool + subscribedNamespaces map[string]bool - subscribedApps map[model.NamespacedApp]bool + //cxz + numberOfAppsInNamedspaces map[string]int + + subscribedApps map[model.NamespacedApp]bool crdGenerator func() client.Object sendDataHandler model.DataEntirePushHandler - - updateMux sync.RWMutex + xDSPushHandler model.XDSPushHandler + updateMux sync.RWMutex } const ( @@ -88,16 +92,46 @@ func (r *CRDWatcher) AddSubscribeTarget(target model.SubscribeTarget) error { } r.updateMux.Lock() defer r.updateMux.Unlock() - r.subscribedList[target] = true r.subscribedNamespaces[target.Namespace] = true r.subscribedApps[target.NamespacedApp()] = true - + //cxz + r.numberOfAppsInNamedspaces[target.Namespace]++ return nil } +// cxz func (r *CRDWatcher) RemoveSubscribeTarget(target model.SubscribeTarget) error { // TODO: implement me + if target.Kind != r.kind { + return errors.New("target kind mismatch, expected: " + target.Kind + ", r.kind: " + r.kind) + } + r.updateMux.Lock() + defer r.updateMux.Unlock() + + // remove target from subscribedList + curNamespace := target.Namespace + if _, ok := r.subscribedList[target]; !ok { + // not exist + return errors.New("didn't susbscribe this object before") + } + r.subscribedList[target] = false + + // remove subscribed namespace + numberofApps := r.numberOfAppsInNamedspaces[curNamespace] + if numberofApps == 1 { + r.subscribedNamespaces[curNamespace] = false + } + if numberofApps > 0 { + r.numberOfAppsInNamedspaces[curNamespace]-- + } + + // remove namedspaceApp + if _, ok := r.subscribedApps[target.NamespacedApp()]; !ok { + // not exist + return errors.New("didn't susbscribe this object before") + } + r.subscribedApps[target.NamespacedApp()] = false return nil } @@ -192,10 +226,16 @@ func (r *CRDWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu Details: nil, } dataWithVersion := &trpb.DataWithVersion{Data: rules, Version: version} - err := r.sendDataHandler(req.Namespace, app, r.kind, dataWithVersion, status, "") - if err != nil { + if err := r.sendDataHandler(req.Namespace, app, r.kind, dataWithVersion, status, ""); err != nil { logger.Error(err, "Failed to send rules", "kind", r.kind) } + + //cxz + // push xds rules + if err := r.xDSPushHandler(req.Namespace, app, r.kind, rules, version); err != nil { + logger.Error(err, "Failed to pushxds rules", "kind", r.kind) + } + return ctrl.Result{}, nil } @@ -325,10 +365,9 @@ func (r *CRDWatcher) translateCrdToProto(object client.Object) (*anypb.Any, erro return nil, err } return packRule, nil - } -func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerator func() client.Object, sendDataHandler model.DataEntirePushHandler) *CRDWatcher { +func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerator func() client.Object, sendDataHandler model.DataEntirePushHandler, xdspushhandler model.XDSPushHandler) *CRDWatcher { return &CRDWatcher{ kind: kind, Client: crdManager.GetClient(), @@ -337,8 +376,11 @@ func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerat subscribedList: make(map[model.SubscribeTarget]bool, 4), subscribedNamespaces: make(map[string]bool), subscribedApps: make(map[model.NamespacedApp]bool), - crdGenerator: crdGenerator, - crdCache: NewCRDCache(kind), - sendDataHandler: sendDataHandler, + //cxz + numberOfAppsInNamedspaces: make(map[string]int), + crdGenerator: crdGenerator, + crdCache: NewCRDCache(kind), + sendDataHandler: sendDataHandler, + xDSPushHandler: xdspushhandler, } } diff --git a/pkg/controller/k8s_operator.go b/pkg/controller/k8s_operator.go index 6c72356..5a53c80 100644 --- a/pkg/controller/k8s_operator.go +++ b/pkg/controller/k8s_operator.go @@ -73,12 +73,12 @@ type KubernetesOperator struct { started atomic.Value sendDataHandler model.DataEntirePushHandler - - controllerMux sync.RWMutex + XDspushHandler model.XDSPushHandler + controllerMux sync.RWMutex } // NewKubernetesOperator creates a OpenSergo Kubernetes operator. -func NewKubernetesOperator(sendDataHandler model.DataEntirePushHandler) (*KubernetesOperator, error) { +func NewKubernetesOperator(sendDataHandler model.DataEntirePushHandler, xdspushhandler model.XDSPushHandler) (*KubernetesOperator, error) { ctrl.SetLogger(&k8SLogger{ l: logging.GetGlobalLogger(), level: logging.GetGlobalLoggerLevel(), @@ -107,6 +107,7 @@ func NewKubernetesOperator(sendDataHandler model.DataEntirePushHandler) (*Kubern ctx: ctx, ctxCancel: cancel, sendDataHandler: sendDataHandler, + XDspushHandler: xdspushhandler, } return k, nil } @@ -145,7 +146,7 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD return nil, errors.New("CRD not supported: " + target.Kind) } // This kind of CRD has never been watched. - crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler) + crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler, k.XDspushHandler) err = crdWatcher.AddSubscribeTarget(target) if err != nil { return nil, err @@ -178,7 +179,7 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error { if !crdSupports { return errors.New("CRD not supported: " + target.Kind) } - crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler) + crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler, k.XDspushHandler) err = crdWatcher.AddSubscribeTarget(target) if err != nil { return err diff --git a/pkg/model/XDsConnection.go b/pkg/model/XDsConnection.go new file mode 100644 index 0000000..c60710f --- /dev/null +++ b/pkg/model/XDsConnection.go @@ -0,0 +1,62 @@ +package model + +import ( + core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "sync" + "time" +) + +type XDsConnection struct { + sync.RWMutex + // peerAddr is the address of the client, from network layer. + peerAddr string + + // WatchedResources contains the list of watched resources for the proxy, keyed by the DiscoveryRequest TypeUrl. + WatchedResources map[string]*WatchedResource + + // Time of connection, for debugging + connectedAt time.Time + + // conID is the connection conID, used as a key in the connection table. + // Currently based on the node name and a counter. + Identifier ClientIdentifier + + // Both ADS and SDS streams implement this interface + Stream DiscoveryStream + + // Original node metadata, to avoid unmarshal/marshal. + // This is included in internal events. + Node *core.Node + + // initialized channel will be closed when proxy is initialized. Pushes, or anything accessing + // the proxy, should not be started until this channel is closed. + Initialized chan struct{} + + // stop can be used to end the connection manually via debug endpoints. Only to be used for testing. + Stop chan struct{} + + // reqChan is used to receive discovery requests for this connection. + ReqChan chan *discovery.DiscoveryRequest +} + +func NewConnection(peerAddr string, stream DiscoveryStream) *XDsConnection { + return &XDsConnection{ + + Initialized: make(chan struct{}), + Stop: make(chan struct{}), + ReqChan: make(chan *discovery.DiscoveryRequest, 1), + peerAddr: peerAddr, + connectedAt: time.Now(), + Stream: stream, + } +} + +func (conn *XDsConnection) Watched(typeUrl string) *WatchedResource { + conn.RLock() + defer conn.RUnlock() + if conn.WatchedResources != nil && conn.WatchedResources[typeUrl] != nil { + return conn.WatchedResources[typeUrl] + } + return nil +} diff --git a/pkg/model/model.go b/pkg/model/model.go index 29140e4..f87c3c1 100644 --- a/pkg/model/model.go +++ b/pkg/model/model.go @@ -15,14 +15,13 @@ package model import ( + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + extension "github.com/envoyproxy/go-control-plane/envoy/service/extension/v3" trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1" + "github.com/opensergo/opensergo-control-plane/pkg/util" + "google.golang.org/protobuf/types/known/anypb" ) -type NamespacedApp struct { - Namespace string - App string -} - // ClientIdentifier represents a unique identifier for an OpenSergo client. type ClientIdentifier string @@ -30,4 +29,45 @@ type OpenSergoTransportStream = trpb.OpenSergoUniversalTransportService_Subscrib type SubscribeRequestHandler func(ClientIdentifier, *trpb.SubscribeRequest, OpenSergoTransportStream) error +type SubscribeXDsRequestHandler func(*discovery.DiscoveryRequest, *XDsConnection) error + +const ExtensionConfigType = "type.googleapis.com/envoy.config.core.v3.TypedExtensionConfig" + type DataEntirePushHandler func(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error + +type XDSPushHandler func(namespace, app, kind string, rules []*anypb.Any, version int64) error + +type DiscoveryStream = extension.ExtensionConfigDiscoveryService_StreamExtensionConfigsServer + +// WatchedResource tracks an active DiscoveryRequest subscription. +type WatchedResource struct { + // TypeUrl is copied from the DiscoveryRequest.TypeUrl that initiated watching this resource. + // nolint + TypeUrl string + + // ResourceNames tracks the list of resources that are actively watched. + // For LDS and CDS, all resources of the TypeUrl type are watched if it is empty. + // For endpoints the resource names will have list of clusters and for clusters it is empty. + // For Delta Xds, all resources of the TypeUrl that a client has subscribed to. + ResourceNames []string + + // NonceSent is the nonce sent in the last sent response. If it is equal with NonceAcked, the + // last message has been processed. If empty: we never sent a message of this type. + NonceSent string + + // NonceAcked is the last acked message. + NonceAcked string +} + +// ResourceDelta records the difference in requested resources by an XDS client +type ResourceDelta struct { + // Subscribed indicates the client requested these additional resources + Subscribed util.String + // Unsubscribed indicates the client no longer requires these resources + Unsubscribed util.String +} + +type NamespacedApp struct { + Namespace string + App string +} diff --git a/pkg/test/grpcClient.go b/pkg/test/grpcClient.go new file mode 100644 index 0000000..e312e69 --- /dev/null +++ b/pkg/test/grpcClient.go @@ -0,0 +1,46 @@ +package main + +import ( + "context" + "fmt" + v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/opensergo/opensergo-control-plane/pkg/model" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "log" +) + +func main() { + conn, err := grpc.Dial(":8002", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatal("服务端出错,连接不上", err) + } + discoveryClient := discovery.NewAggregatedDiscoveryServiceClient(conn) + + // build request + request := discovery.DiscoveryRequest{ + TypeUrl: model.ExtensionConfigType, + Node: &v3.Node{ + Id: "testNode", + }, + ResourceNames: []string{"testns/testapp/RateLimitStrategy", "testns/testapp/FaultToleranceRule", "testns/testapp/ConcurrencyLimitStrategy"}, + } + + streamAggregatedResourcesClient, err := discoveryClient.StreamAggregatedResources(context.Background()) + if err != nil { + log.Fatal("得不到streamAggregatedResourcesClient", err) + } + + if err := streamAggregatedResourcesClient.Send(&request); err != nil { + log.Fatal("发送消息错误", err) + } + for { + discoveryResponse, err := streamAggregatedResourcesClient.Recv() + if err != nil { + log.Fatal("回收消息错误", err) + } + fmt.Println(discoveryResponse) + } + +} diff --git a/pkg/transport/grpc/connection.go b/pkg/transport/grpc/connection.go index f8f9d56..539e797 100644 --- a/pkg/transport/grpc/connection.go +++ b/pkg/transport/grpc/connection.go @@ -43,7 +43,12 @@ func (c *Connection) IsValid() bool { return c.stream != nil && c.valid } -type ConnectionMap map[model.ClientIdentifier]*Connection +type mapValue interface { + interface { + *Connection | *model.XDsConnection + } +} +type ConnectionMap[T mapValue] map[model.ClientIdentifier]T func NewConnection(identifier model.ClientIdentifier, stream OpenSergoTransportStream) *Connection { return &Connection{ @@ -53,10 +58,10 @@ func NewConnection(identifier model.ClientIdentifier, stream OpenSergoTransportS } } -type ConnectionManager struct { +type ConnectionManager[T mapValue] struct { // connectionMap is used to save the connections which subscribed to the same namespace, app and kind. // (namespace+app, (kind, connections...)) - connectionMap map[model.NamespacedApp]map[string]ConnectionMap + connectionMap map[model.NamespacedApp]map[string]ConnectionMap[T] // identifier: NamespaceApp: kinds // The identifier is used to distinguish the requested process instance and remove stream when disconnected identifierMap map[model.ClientIdentifier]map[model.NamespacedApp][]string @@ -64,7 +69,7 @@ type ConnectionManager struct { updateMux sync.RWMutex } -func (c *ConnectionManager) Add(namespace, app, kind string, connection *Connection) error { +func (c *ConnectionManager[T]) Add(namespace, app, kind string, connection T, identifier model.ClientIdentifier) error { if connection == nil { return errors.New("nil connection") } @@ -77,27 +82,27 @@ func (c *ConnectionManager) Add(namespace, app, kind string, connection *Connect App: app, } if c.connectionMap[nsa] == nil { - c.connectionMap[nsa] = make(map[string]ConnectionMap) + c.connectionMap[nsa] = make(map[string]ConnectionMap[T]) } connectionMap := c.connectionMap[nsa][kind] if connectionMap == nil { - connectionMap = make(ConnectionMap) + connectionMap = make(ConnectionMap[T]) c.connectionMap[nsa][kind] = connectionMap } - if connectionMap[connection.identifier] == nil { - connectionMap[connection.identifier] = connection + if connectionMap[identifier] == nil { + connectionMap[identifier] = connection } // TODO: legacy logic, rearrange it later - if c.identifierMap[connection.identifier] == nil { - c.identifierMap[connection.identifier] = make(map[model.NamespacedApp][]string) + if c.identifierMap[identifier] == nil { + c.identifierMap[identifier] = make(map[model.NamespacedApp][]string) } - c.identifierMap[connection.identifier][nsa] = append(c.identifierMap[connection.identifier][nsa], kind) + c.identifierMap[identifier][nsa] = append(c.identifierMap[identifier][nsa], kind) return nil } -func (c *ConnectionManager) Get(namespace, app, kind string) ([]*Connection, bool) { +func (c *ConnectionManager[T]) Get(namespace, app, kind string) ([]T, bool) { c.updateMux.RLock() defer c.updateMux.RUnlock() @@ -112,18 +117,19 @@ func (c *ConnectionManager) Get(namespace, app, kind string) ([]*Connection, boo if !exists || connectionMap == nil { return nil, false } - connectionList := make([]*Connection, len(connectionMap)) + + connectionList := make([]T, len(connectionMap)) for _, conn := range connectionMap { - if conn.IsValid() { - connectionList = append(connectionList, conn) - } + connectionList = append(connectionList, conn) } return connectionList, true } -func (c *ConnectionManager) removeInternal(n model.NamespacedApp, kind string, identifier model.ClientIdentifier) error { +func (c *ConnectionManager[mapValue]) removeInternal(n model.NamespacedApp, kind string, identifier model.ClientIdentifier) error { // Guarded in the outer function, if a lock is added here, it will deadlock kindMap, exists := c.connectionMap[n] + + // TODO: handle error if !exists || kindMap == nil { return nil } @@ -135,7 +141,7 @@ func (c *ConnectionManager) removeInternal(n model.NamespacedApp, kind string, i return nil } -func (c *ConnectionManager) RemoveByIdentifier(identifier model.ClientIdentifier) error { +func (c *ConnectionManager[mapValue]) RemoveByIdentifier(identifier model.ClientIdentifier) error { c.updateMux.Lock() defer c.updateMux.Unlock() @@ -154,9 +160,9 @@ func (c *ConnectionManager) RemoveByIdentifier(identifier model.ClientIdentifier return nil } -func NewConnectionManager() *ConnectionManager { - return &ConnectionManager{ - connectionMap: make(map[model.NamespacedApp]map[string]ConnectionMap), +func NewConnectionManager[T mapValue]() *ConnectionManager[T] { + return &ConnectionManager[T]{ + connectionMap: make(map[model.NamespacedApp]map[string]ConnectionMap[T]), identifierMap: make(map[model.ClientIdentifier]map[model.NamespacedApp][]string), } } diff --git a/pkg/transport/grpc/server.go b/pkg/transport/grpc/server.go index ba94169..d4214d2 100644 --- a/pkg/transport/grpc/server.go +++ b/pkg/transport/grpc/server.go @@ -33,17 +33,16 @@ const ( // Server represents the transport server of OpenSergo universal transport service (OUTS). type Server struct { - transportServer *TransportServer - grpcServer *grpc.Server - - connectionManager *ConnectionManager + transportServer *TransportServer + grpcServer *grpc.Server + connectionManager *ConnectionManager[*Connection] port uint32 started *atomic.Bool } func NewServer(port uint32, subscribeHandlers []model.SubscribeRequestHandler) *Server { - connectionManager := NewConnectionManager() + connectionManager := NewConnectionManager[*Connection]() return &Server{ transportServer: newTransportServer(connectionManager, subscribeHandlers), port: port, @@ -53,7 +52,7 @@ func NewServer(port uint32, subscribeHandlers []model.SubscribeRequestHandler) * } } -func (s *Server) ConnectionManager() *ConnectionManager { +func (s *Server) ConnectionManager() *ConnectionManager[*Connection] { return s.connectionManager } @@ -81,7 +80,7 @@ func (s *Server) Run() error { type TransportServer struct { trpb.OpenSergoUniversalTransportServiceServer - connectionManager *ConnectionManager + connectionManager *ConnectionManager[*Connection] subscribeHandlers []model.SubscribeRequestHandler } @@ -155,7 +154,7 @@ func (s *TransportServer) SubscribeConfig(stream trpb.OpenSergoUniversalTranspor } } -func newTransportServer(connectionManager *ConnectionManager, subscribeHandlers []model.SubscribeRequestHandler) *TransportServer { +func newTransportServer(connectionManager *ConnectionManager[*Connection], subscribeHandlers []model.SubscribeRequestHandler) *TransportServer { return &TransportServer{ connectionManager: connectionManager, subscribeHandlers: subscribeHandlers, diff --git a/pkg/transport/grpc/xDSServer.go b/pkg/transport/grpc/xDSServer.go new file mode 100644 index 0000000..29eda87 --- /dev/null +++ b/pkg/transport/grpc/xDSServer.go @@ -0,0 +1,313 @@ +package grpc + +import ( + context "context" + "fmt" + core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + extension "github.com/envoyproxy/go-control-plane/envoy/service/extension/v3" + "github.com/opensergo/opensergo-control-plane/pkg/model" + "github.com/opensergo/opensergo-control-plane/pkg/util" + "google.golang.org/grpc" + "net" + + "google.golang.org/grpc/peer" + "google.golang.org/protobuf/types/known/anypb" + "log" + "strconv" + "sync" + "sync/atomic" +) + +var connectionNumber = int64(0) + +type DiscoveryServer struct { + port uint32 + // adsClients reflect active gRPC channels, for both ADS and EDS. + ecdsClients map[model.ClientIdentifier]*model.XDsConnection + ecdsClientsMutex sync.RWMutex + grpcServer *grpc.Server + // cxz + // map from to xdsconnection + // namespace appname and kind + XDSConnectionManeger *ConnectionManager[*model.XDsConnection] + subscribeHandlers []model.SubscribeXDsRequestHandler +} + +func (s *DiscoveryServer) StreamExtensionConfigs(stream model.DiscoveryStream) error { + return s.Stream(stream) +} + +//func (s *DiscoveryServer) WaitForRequestLimit(ctx context.Context) error { +// if s.RequestRateLimit.Limit() == 0 { +// // Allow opt out when rate limiting is set to 0qps +// return nil +// } +// // Give a bit of time for queue to clear out, but if not fail fast. Client will connect to another +// // instance in best case, or retry with backoff. +// wait, cancel := context.WithTimeout(ctx, time.Second) +// defer cancel() +// return s.RequestRateLimit.Wait(wait) +//} + +func (s *DiscoveryServer) Stream(stream model.DiscoveryStream) error { + ctx := stream.Context() + peerAddr := "0.0.0.0" + if peerInfo, ok := peer.FromContext(ctx); ok { + peerAddr = peerInfo.Addr.String() + } + + // TODO: Rate Limit + //if err := s.WaitForRequestLimit(stream.Context()); err != nil { + // //log.Warnf("ADS: %q exceeded rate limit: %v", peerAddr, err) + // return status.Errorf(codes.ResourceExhausted, "request rate limit exceeded: %v", err) + //} + + con := model.NewConnection(peerAddr, stream) + + go s.receive(con) + + // Wait for the proxy to be fully initialized before we start serving traffic. Because + // initialization doesn't have dependencies that will block, there is no need to add any timeout + // here. Prior to this explicit wait, we were implicitly waiting by receive() not sending to + // reqChannel and the connection not being enqueued for pushes to pushChannel until the + // initialization is complete. + <-con.Initialized + + for { + // Go select{} statements are not ordered; the same channel can be chosen many times. + // For requests, these are higher priority (client may be blocked on startup until these are done) + // and often very cheap to handle (simple ACK), so we check it first. + select { + case req, ok := <-con.ReqChan: + if ok { + for _, handler := range s.subscribeHandlers { + err := handler(req, con) + if err != nil { + // TODO: handle error + log.Printf("Failed to handle SubscribeRequest, err=%s\n", err.Error()) + } + } + } + case <-con.Stop: + return nil + default: + } + + } + + return nil +} + +func (s *DiscoveryServer) receive(con *model.XDsConnection) { + defer func() { + + close(con.ReqChan) + // Close the initialized channel, if its not already closed, to prevent blocking the stream. + select { + case <-con.Initialized: + default: + close(con.Initialized) + } + }() + + firstRequest := true + for { + req, err := con.Stream.Recv() + log.Printf("Stream received message failed, err=%s\n", err.Error()) + + if firstRequest { + firstRequest = false + if err := s.initConnection(req.Node, con); err != nil { + log.Printf("initConnection failed, err=%s\n", err.Error()) + return + } + } + select { + case con.ReqChan <- req: + case <-con.Stream.Context().Done(): + //log.Infof("ADS: %q %s terminated with stream closed", con.peerAddr, con.conID) + return + } + } +} + +func (s *DiscoveryServer) initConnection(node *core.Node, con *model.XDsConnection) error { + // First request so initialize connection id and start tracking it. + con.Identifier = connectionID(node.Id) + con.Node = node + + // Register the connection. this allows pushes to be triggered for the proxy. Note: the timing of + // this and initializeProxy important. While registering for pushes *after* initialization is complete seems like + // a better choice, it introduces a race condition; If we complete initialization of a new push + // context between initializeProxy and addCon, we would not get any pushes triggered for the new + // push context, leading the proxy to have a stale state until the next full push. + s.addCon(con.Identifier, con) + + defer close(con.Initialized) + + return nil +} + +func connectionID(node string) model.ClientIdentifier { + id := atomic.AddInt64(&connectionNumber, 1) + return model.ClientIdentifier(node + "-" + strconv.FormatInt(id, 10)) +} + +func (s *DiscoveryServer) addCon(identifier model.ClientIdentifier, con *model.XDsConnection) { + s.ecdsClientsMutex.Lock() + defer s.ecdsClientsMutex.Unlock() + s.ecdsClients[identifier] = con + +} + +func (s *DiscoveryServer) removeCon(conID model.ClientIdentifier) { + s.ecdsClientsMutex.Lock() + defer s.ecdsClientsMutex.Unlock() + + if _, exist := s.ecdsClients[conID]; !exist { + //log.Errorf("ADS: Removing connection for non-existing node:%v.", conID) + + } else { + delete(s.ecdsClients, conID) + } +} + +func shouldUnsubscribe(request *discovery.DiscoveryRequest) bool { + return len(request.ResourceNames) == 0 +} + +var emptyResourceDelta = model.ResourceDelta{} + +func ShouldRespond(con *model.XDsConnection, request *discovery.DiscoveryRequest) (bool, model.ResourceDelta) { + + // NACK + if request.ErrorDetail != nil { + //LOG + return false, emptyResourceDelta + } + + if shouldUnsubscribe(request) { + con.Lock() + delete(con.WatchedResources, request.TypeUrl) + con.Unlock() + return false, emptyResourceDelta + } + + con.RLock() + previousInfo := con.WatchedResources[request.TypeUrl] + con.RUnlock() + + // We should always respond with the current resource names. + if request.ResponseNonce == "" || previousInfo == nil { + log.Println("ECDS: INIT/RECONNECT %s %s %s", con.Identifier, request.VersionInfo, request.ResponseNonce) + con.Lock() + con.WatchedResources[request.TypeUrl] = &model.WatchedResource{TypeUrl: request.TypeUrl, ResourceNames: request.ResourceNames} + con.Unlock() + return true, model.ResourceDelta{ + Subscribed: util.New(request.ResourceNames...), + Unsubscribed: util.String{}, + } + } + + // If there is mismatch in the nonce, that is a case of expired/stale nonce. + // A nonce becomes stale following a newer nonce being sent to Envoy. + // previousInfo.NonceSent can be empty if we previously had shouldRespond=true but didn't send any resources. + if request.ResponseNonce != previousInfo.NonceSent { + + log.Println("ECDS: REQ %s Expired nonce received %s, sent %s", + con.Identifier, request.ResponseNonce, previousInfo.NonceSent) + return false, emptyResourceDelta + } + + // If it comes here, that means nonce match. + con.Lock() + previousResources := con.WatchedResources[request.TypeUrl].ResourceNames + con.WatchedResources[request.TypeUrl].NonceAcked = request.ResponseNonce + con.WatchedResources[request.TypeUrl].ResourceNames = request.ResourceNames + con.Unlock() + + // Envoy can send two DiscoveryRequests with same version and nonce. + // when it detects a new resource. We should respond if they change. + prev := util.New(previousResources...) + cur := util.New(request.ResourceNames...) + removed := prev.Difference(cur) + added := cur.Difference(prev) + + if len(removed) == 0 && len(added) == 0 { + // this is an ack nonce matched + return false, emptyResourceDelta + } + + return true, model.ResourceDelta{ + Subscribed: added, + Unsubscribed: removed, + } +} + +func (s *DiscoveryServer) pushXds(con *model.XDsConnection, w *model.WatchedResource, version int64, rules []*anypb.Any) error { + + resp := &discovery.DiscoveryResponse{ + TypeUrl: w.TypeUrl, + VersionInfo: strconv.FormatInt(version, 10), + Nonce: util.Nonce(), + Resources: rules, + } + + return con.Stream.Send(resp) +} + +// cxz +func (s *DiscoveryServer) AddConnectioonToMap(namespace, appname, kind string, con *model.XDsConnection) { + s.ecdsClientsMutex.Lock() + defer s.ecdsClientsMutex.Unlock() + + s.XDSConnectionManeger.Add(namespace, appname, kind, con, con.Identifier) + +} + +func (s *DiscoveryServer) RemoveConnectionFromMap(n model.NamespacedApp, kind string, identifier model.ClientIdentifier) error { + s.ecdsClientsMutex.Lock() + defer s.ecdsClientsMutex.Unlock() + // TODO: HANDLE Error + if err := s.XDSConnectionManeger.removeInternal(n, kind, identifier); err != nil { + return err + } + return nil + +} + +func NewDiscoveryServer(port uint32, subscribeHandlers []model.SubscribeXDsRequestHandler) *DiscoveryServer { + connectionManager := NewConnectionManager[*model.XDsConnection]() + return &DiscoveryServer{ + port: port, + ecdsClients: make(map[model.ClientIdentifier]*model.XDsConnection), + grpcServer: grpc.NewServer(), + XDSConnectionManeger: connectionManager, + subscribeHandlers: subscribeHandlers, + } +} + +// TODO : Unimplemented +func (s *DiscoveryServer) DeltaExtensionConfigs(stream extension.ExtensionConfigDiscoveryService_DeltaExtensionConfigsServer) error { + return nil +} + +func (s *DiscoveryServer) FetchExtensionConfigs(context.Context, *discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) { + return &discovery.DiscoveryResponse{}, nil +} + +func (s *DiscoveryServer) Run() error { + + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port)) + if err != nil { + return err + } + + extension.RegisterExtensionConfigDiscoveryServiceServer(s.grpcServer, s) + err = s.grpcServer.Serve(listener) + if err != nil { + return err + } + return nil +} diff --git a/pkg/util/nonce.go b/pkg/util/nonce.go new file mode 100644 index 0000000..e48c42e --- /dev/null +++ b/pkg/util/nonce.go @@ -0,0 +1,10 @@ +package util + +import ( + "github.com/google/uuid" + "strings" +) + +func Nonce() string { + return strings.Replace(uuid.New().String(), "-", "", -1) +} diff --git a/pkg/util/set_util.go b/pkg/util/set_util.go new file mode 100644 index 0000000..1e1e91d --- /dev/null +++ b/pkg/util/set_util.go @@ -0,0 +1,45 @@ +package util + +type Set[T comparable] map[T]struct{} + +type String = Set[string] + +func NewWithLength[T comparable](l int) Set[T] { + return make(Set[T], l) +} + +// New creates a new Set with the given items. +func New[T comparable](items ...T) Set[T] { + s := NewWithLength[T](len(items)) + return s.InsertAll(items...) +} + +// InsertAll adds the items to this Set. +func (s Set[T]) InsertAll(items ...T) Set[T] { + for _, item := range items { + s[item] = struct{}{} + } + return s +} + +func (s Set[T]) Difference(s2 Set[T]) Set[T] { + result := New[T]() + for key := range s { + if !s2.Contains(key) { + result.Insert(key) + } + } + return result +} + +// Contains returns whether the given item is in the set. +func (s Set[T]) Contains(item T) bool { + _, ok := s[item] + return ok +} + +// Insert a single item to this Set. +func (s Set[T]) Insert(item T) Set[T] { + s[item] = struct{}{} + return s +} From b7073e67a42bc308e8438ebd42e7cc7321168613 Mon Sep 17 00:00:00 2001 From: changxinzhang Date: Tue, 26 Sep 2023 09:00:55 -0400 Subject: [PATCH 2/7] final pr --- control_plane.go | 131 +++++++++++++-------- k8s/rbac/rbac.yaml | 1 - pkg/controller/crd_cache.go | 1 + pkg/controller/crd_watcher.go | 52 ++++----- pkg/main/main.go | 5 +- pkg/model/XDsConnection.go | 62 ---------- pkg/model/model.go | 14 +-- pkg/model/xDS_connection.go | 67 +++++++++++ pkg/test/grpcClient.go | 46 -------- pkg/test/init_connection_test.go | 189 +++++++++++++++++++++++++++++++ pkg/test/xDSClient_test.go | 40 +++++++ pkg/transport/grpc/connection.go | 2 +- pkg/transport/grpc/xDSClient.go | 113 ++++++++++++++++++ pkg/transport/grpc/xDSServer.go | 82 +++++++------- samples/test.yaml | 11 ++ 15 files changed, 580 insertions(+), 236 deletions(-) delete mode 100644 pkg/model/XDsConnection.go create mode 100644 pkg/model/xDS_connection.go delete mode 100644 pkg/test/grpcClient.go create mode 100644 pkg/test/init_connection_test.go create mode 100644 pkg/test/xDSClient_test.go create mode 100644 pkg/transport/grpc/xDSClient.go create mode 100644 samples/test.yaml diff --git a/control_plane.go b/control_plane.go index c8af3ca..5aed23b 100644 --- a/control_plane.go +++ b/control_plane.go @@ -20,7 +20,6 @@ import ( "google.golang.org/protobuf/types/known/anypb" "log" "os" - "strconv" "strings" "sync" @@ -52,7 +51,7 @@ func NewControlPlane() (*ControlPlane, error) { } cp.server = transport.NewServer(uint32(10246), []model.SubscribeRequestHandler{cp.handleSubscribeRequest}) - cp.xdsServer = transport.NewDiscoveryServer(uint32(8002), []model.SubscribeXDsRequestHandler{cp.handleXDsSubscribeRequest}) + cp.xdsServer = transport.NewDiscoveryServer(uint32(8002), []model.SubscribeXDsRequestHandler{cp.handleXDSSubscribeRequest}) cp.operator = operator hostname, herr := os.Hostname() @@ -68,17 +67,23 @@ func NewControlPlane() (*ControlPlane, error) { func (c *ControlPlane) Start() error { // Run the Kubernetes operator err := c.operator.Run() + if err != nil { return err } - // Run the transport server - err = c.server.Run() - if err != nil { - return err - } - err = c.xdsServer.Run() - if err != nil { - return err + + if model.GlobalBoolVariable { + //Run the transport server + err = c.server.Run() + if err != nil { + return err + } + } else { + //Run the xDS Server + err = c.xdsServer.Run() + if err != nil { + return err + } } return nil @@ -119,51 +124,72 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream }) } -// cxz -func (c *ControlPlane) handleXDsSubscribeRequest(req *discovery.DiscoveryRequest, con *model.XDsConnection) error { +// handleXDSSubscribeRequest handles the XDS subscription request. +func (c *ControlPlane) handleXDSSubscribeRequest(req *discovery.DiscoveryRequest, con *model.XDSConnection) error { + // Check if the request is for ExtensionConfigType. if req.TypeUrl != model.ExtensionConfigType { return nil } - shouldRespond, delta := grpc.ShouldRespond(con, req) + // Determine whether to respond and calculate the delta. + shouldRespond, delta := grpc.ShouldRespond(con, req) subscribed := delta.Subscribed unsubscribed := delta.Unsubscribed + if !shouldRespond { + // If there's no need to respond, return early. return nil } + if len(subscribed) != 0 { + var rules []*anypb.Any for resourcename := range subscribed { + // Split the resource name into its components. request := strings.Split(resourcename, delimiter) + + // Register a watcher for the specified resource. crdWatcher, err := c.operator.RegisterWatcher(model.SubscribeTarget{ - Namespace: request[0], - AppName: request[1], - Kind: request[2], + Namespace: request[4], + AppName: request[3], + Kind: request[0] + delimiter + request[1] + delimiter + request[2], }) - // TODO: unhandled err + if err != nil { + // Log the error and continue to the next resource. + log.Printf("Error registering watcher for resource %s: %s\n", resourcename, err.Error()) continue } - c.xdsServer.AddConnectioonToMap(request[0], request[1], request[2], con) + // Add the connection to the connection map. + c.xdsServer.AddConnectionToMap(request[4], request[3], request[0]+"/"+request[1]+"/"+request[2], con) - rules, version := crdWatcher.GetRules(model.NamespacedApp{ - Namespace: request[0], - App: request[1], + // Get the current rules for the resource. + curRules, _ := crdWatcher.GetRules(model.NamespacedApp{ + Namespace: request[4], + App: request[3], }) - if len(rules) > 0 { - err := c.pushXdsToStream(con, con.Watched(req.TypeUrl), version, rules) - if err != nil { - // TODO: log here - log.Printf("sendMessageToStream failed, err=%s\n", err.Error()) - } + + if len(curRules) > 0 { + // Append the current rules to the rules slice. + rules = append(rules, curRules...) } + } + // Push XDS rules to the connection. + err := c.pushXdsToStream(con, con.Watched(req.TypeUrl), rules) + if err != nil { + // Log the error if pushing XDS rules fails. + log.Printf("Failed to push XDS rules to connection: %s\n", err.Error()) } } if len(unsubscribed) != 0 { - for resourcename := range subscribed { + // Handle unsubscribed resources. + for resourcename := range unsubscribed { + // Split the resource name into its components. request := strings.Split(resourcename, delimiter) + + // Remove the connection from the connection map. c.xdsServer.RemoveConnectionFromMap(model.NamespacedApp{request[0], request[1]}, request[2], con.Identifier) } } @@ -172,15 +198,6 @@ func (c *ControlPlane) handleXDsSubscribeRequest(req *discovery.DiscoveryRequest } func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error { - // var labels []model.LabelKV - // if request.Target.Labels != nil { - // for _, label := range request.Target.Labels { - // labels = append(labels, model.LabelKV{ - // Key: label.Key, - // Value: label.Value, - // }) - // } - // } for _, kind := range request.Target.Kinds { crdWatcher, err := c.operator.RegisterWatcher(model.SubscribeTarget{ Namespace: request.Target.Namespace, @@ -227,18 +244,17 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent return nil } -// cxz -func (c *ControlPlane) pushXdsToStream(con *model.XDsConnection, w *model.WatchedResource, version int64, rules []*anypb.Any) error { - +func (c *ControlPlane) pushXdsToStream(con *model.XDSConnection, w *model.WatchedResource, rules []*anypb.Any) error { res := &discovery.DiscoveryResponse{ TypeUrl: w.TypeUrl, - VersionInfo: strconv.FormatInt(version, 10), + VersionInfo: c.xdsServer.NextVersion(), // TODO: RECORD THE NONCE AND CHECK THE NONCE Nonce: util.Nonce(), Resources: rules, } - // set nonce + + // Set nonce in the XDSConnection's WatchedResource con.Lock() if con.WatchedResources[model.ExtensionConfigType] == nil { con.WatchedResources[res.TypeUrl] = &model.WatchedResource{TypeUrl: res.TypeUrl} @@ -246,26 +262,45 @@ func (c *ControlPlane) pushXdsToStream(con *model.XDsConnection, w *model.Watche con.WatchedResources[res.TypeUrl].NonceSent = res.Nonce con.Unlock() - return con.Stream.Send(res) + // Send the DiscoveryResponse over the stream + err := con.Stream.Send(res) + if err != nil { + // Handle the error, e.g., log it or return it + // TODO: You can log the error or handle it as needed. + log.Println("Failed to send DiscoveryResponse:", err) + return err + } + + return nil } -func (c *ControlPlane) pushXds(namespace, app, kind string, rules []*anypb.Any, version int64) error { - connections, exists := c.xdsServer.XDSConnectionManeger.Get(namespace, app, kind) +func (c *ControlPlane) pushXds(namespace, app, kind string, rules []*anypb.Any) error { + // Retrieve the XDS connections for the specified namespace, app, and kind. + connections, exists := c.xdsServer.XDSConnectionManager.Get(namespace, app, kind) if !exists || connections == nil { + // Log that there is no connection for this kind. + // Replace this with your actual logging mechanism. + log.Println("No XDS connection found for namespace:", namespace, "app:", app, "kind:", kind) return errors.New("There is no connection for this kind") } for _, connection := range connections { if connection == nil { - // TODO: log.Debug + // Log a debug message for a nil connection. + // Replace this with your actual logging mechanism. + log.Println("Encountered a nil XDS connection") continue } - err := c.pushXdsToStream(connection, connection.WatchedResources[model.ExtensionConfigType], version, rules) + err := c.pushXdsToStream(connection, connection.WatchedResources[model.ExtensionConfigType], rules) if err != nil { - // TODO: should not short-break here. Handle partial failure here. + // Log an error and return it if there is an error pushing XDS rules. + // Replace this with your actual logging mechanism. + log.Println("Failed to push XDS rules to connection:", err) + // TODO: You might want to consider handling partial failures here. return err } } + // Return nil to indicate success. return nil } diff --git a/k8s/rbac/rbac.yaml b/k8s/rbac/rbac.yaml index af578d3..0e114ad 100644 --- a/k8s/rbac/rbac.yaml +++ b/k8s/rbac/rbac.yaml @@ -1,4 +1,3 @@ - apiVersion: v1 kind: ServiceAccount metadata: diff --git a/pkg/controller/crd_cache.go b/pkg/controller/crd_cache.go index b343d28..3a130aa 100644 --- a/pkg/controller/crd_cache.go +++ b/pkg/controller/crd_cache.go @@ -27,6 +27,7 @@ type CRDObjectsHolder struct { version int64 } +// TODO: need use generic to support two kinds of connection xDS Connection and normal Connection // CRDCache caches versioned CRD objects in local. type CRDCache struct { kind string diff --git a/pkg/controller/crd_watcher.go b/pkg/controller/crd_watcher.go index 4f8864c..ff781f0 100644 --- a/pkg/controller/crd_watcher.go +++ b/pkg/controller/crd_watcher.go @@ -16,6 +16,7 @@ package controller import ( "context" + trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1" "log" "net/http" ctrl "sigs.k8s.io/controller-runtime" @@ -28,7 +29,6 @@ import ( crdv1alpha1traffic "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1/traffic" "github.com/opensergo/opensergo-control-plane/pkg/model" pb "github.com/opensergo/opensergo-control-plane/pkg/proto/fault_tolerance/v1" - trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1" "github.com/opensergo/opensergo-control-plane/pkg/util" "github.com/pkg/errors" "google.golang.org/protobuf/proto" @@ -52,7 +52,7 @@ type CRDWatcher struct { subscribedList map[model.SubscribeTarget]bool subscribedNamespaces map[string]bool - //cxz + numberOfAppsInNamedspaces map[string]int subscribedApps map[model.NamespacedApp]bool @@ -95,12 +95,10 @@ func (r *CRDWatcher) AddSubscribeTarget(target model.SubscribeTarget) error { r.subscribedList[target] = true r.subscribedNamespaces[target.Namespace] = true r.subscribedApps[target.NamespacedApp()] = true - //cxz r.numberOfAppsInNamedspaces[target.Namespace]++ return nil } -// cxz func (r *CRDWatcher) RemoveSubscribeTarget(target model.SubscribeTarget) error { // TODO: implement me if target.Kind != r.kind { @@ -218,22 +216,25 @@ func (r *CRDWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu Namespace: req.Namespace, App: app, } - // TODO: Now we can do something for the crd object! + //// TODO: Now we can do something for the crd object! rules, version := r.GetRules(nsa) - status := &trpb.Status{ - Code: int32(200), - Message: "Get and send rule success", - Details: nil, - } - dataWithVersion := &trpb.DataWithVersion{Data: rules, Version: version} - if err := r.sendDataHandler(req.Namespace, app, r.kind, dataWithVersion, status, ""); err != nil { - logger.Error(err, "Failed to send rules", "kind", r.kind) - } - //cxz - // push xds rules - if err := r.xDSPushHandler(req.Namespace, app, r.kind, rules, version); err != nil { - logger.Error(err, "Failed to pushxds rules", "kind", r.kind) + // If GlobalBoolVariable is True send data to native server else send data to xdsserver + if model.GlobalBoolVariable { + status := &trpb.Status{ + Code: int32(200), + Message: "Get and send rule success", + Details: nil, + } + dataWithVersion := &trpb.DataWithVersion{Data: rules, Version: version} + if err := r.sendDataHandler(req.Namespace, app, r.kind, dataWithVersion, status, ""); err != nil { + logger.Error(err, "Failed to send rules", "kind", r.kind) + } + } else { + // push xds rules + if err := r.xDSPushHandler(req.Namespace, app, r.kind, rules); err != nil { + logger.Error(err, "Failed to pushxds rules", "kind", r.kind) + } } return ctrl.Result{}, nil @@ -369,14 +370,13 @@ func (r *CRDWatcher) translateCrdToProto(object client.Object) (*anypb.Any, erro func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerator func() client.Object, sendDataHandler model.DataEntirePushHandler, xdspushhandler model.XDSPushHandler) *CRDWatcher { return &CRDWatcher{ - kind: kind, - Client: crdManager.GetClient(), - logger: ctrl.Log.WithName("controller").WithName(kind), - scheme: crdManager.GetScheme(), - subscribedList: make(map[model.SubscribeTarget]bool, 4), - subscribedNamespaces: make(map[string]bool), - subscribedApps: make(map[model.NamespacedApp]bool), - //cxz + kind: kind, + Client: crdManager.GetClient(), + logger: ctrl.Log.WithName("controller").WithName(kind), + scheme: crdManager.GetScheme(), + subscribedList: make(map[model.SubscribeTarget]bool, 4), + subscribedNamespaces: make(map[string]bool), + subscribedApps: make(map[model.NamespacedApp]bool), numberOfAppsInNamedspaces: make(map[string]int), crdGenerator: crdGenerator, crdCache: NewCRDCache(kind), diff --git a/pkg/main/main.go b/pkg/main/main.go index d58df5a..66ffb46 100644 --- a/pkg/main/main.go +++ b/pkg/main/main.go @@ -15,9 +15,8 @@ package main import ( - "log" - "github.com/opensergo/opensergo-control-plane" + "log" ) func main() { @@ -25,7 +24,9 @@ func main() { if err != nil { log.Fatal(err) } + err = cp.Start() + if err != nil { log.Fatal(err) } diff --git a/pkg/model/XDsConnection.go b/pkg/model/XDsConnection.go deleted file mode 100644 index c60710f..0000000 --- a/pkg/model/XDsConnection.go +++ /dev/null @@ -1,62 +0,0 @@ -package model - -import ( - core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - "sync" - "time" -) - -type XDsConnection struct { - sync.RWMutex - // peerAddr is the address of the client, from network layer. - peerAddr string - - // WatchedResources contains the list of watched resources for the proxy, keyed by the DiscoveryRequest TypeUrl. - WatchedResources map[string]*WatchedResource - - // Time of connection, for debugging - connectedAt time.Time - - // conID is the connection conID, used as a key in the connection table. - // Currently based on the node name and a counter. - Identifier ClientIdentifier - - // Both ADS and SDS streams implement this interface - Stream DiscoveryStream - - // Original node metadata, to avoid unmarshal/marshal. - // This is included in internal events. - Node *core.Node - - // initialized channel will be closed when proxy is initialized. Pushes, or anything accessing - // the proxy, should not be started until this channel is closed. - Initialized chan struct{} - - // stop can be used to end the connection manually via debug endpoints. Only to be used for testing. - Stop chan struct{} - - // reqChan is used to receive discovery requests for this connection. - ReqChan chan *discovery.DiscoveryRequest -} - -func NewConnection(peerAddr string, stream DiscoveryStream) *XDsConnection { - return &XDsConnection{ - - Initialized: make(chan struct{}), - Stop: make(chan struct{}), - ReqChan: make(chan *discovery.DiscoveryRequest, 1), - peerAddr: peerAddr, - connectedAt: time.Now(), - Stream: stream, - } -} - -func (conn *XDsConnection) Watched(typeUrl string) *WatchedResource { - conn.RLock() - defer conn.RUnlock() - if conn.WatchedResources != nil && conn.WatchedResources[typeUrl] != nil { - return conn.WatchedResources[typeUrl] - } - return nil -} diff --git a/pkg/model/model.go b/pkg/model/model.go index f87c3c1..8cf3981 100644 --- a/pkg/model/model.go +++ b/pkg/model/model.go @@ -22,6 +22,11 @@ import ( "google.golang.org/protobuf/types/known/anypb" ) +// Users could control this variable to determine whether use +var GlobalBoolVariable bool = false + +type ChooseRulesServer bool + // ClientIdentifier represents a unique identifier for an OpenSergo client. type ClientIdentifier string @@ -29,30 +34,25 @@ type OpenSergoTransportStream = trpb.OpenSergoUniversalTransportService_Subscrib type SubscribeRequestHandler func(ClientIdentifier, *trpb.SubscribeRequest, OpenSergoTransportStream) error -type SubscribeXDsRequestHandler func(*discovery.DiscoveryRequest, *XDsConnection) error +type SubscribeXDsRequestHandler func(*discovery.DiscoveryRequest, *XDSConnection) error const ExtensionConfigType = "type.googleapis.com/envoy.config.core.v3.TypedExtensionConfig" type DataEntirePushHandler func(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error -type XDSPushHandler func(namespace, app, kind string, rules []*anypb.Any, version int64) error +type XDSPushHandler func(namespace, app, kind string, rules []*anypb.Any) error type DiscoveryStream = extension.ExtensionConfigDiscoveryService_StreamExtensionConfigsServer // WatchedResource tracks an active DiscoveryRequest subscription. type WatchedResource struct { // TypeUrl is copied from the DiscoveryRequest.TypeUrl that initiated watching this resource. - // nolint TypeUrl string // ResourceNames tracks the list of resources that are actively watched. - // For LDS and CDS, all resources of the TypeUrl type are watched if it is empty. - // For endpoints the resource names will have list of clusters and for clusters it is empty. - // For Delta Xds, all resources of the TypeUrl that a client has subscribed to. ResourceNames []string // NonceSent is the nonce sent in the last sent response. If it is equal with NonceAcked, the - // last message has been processed. If empty: we never sent a message of this type. NonceSent string // NonceAcked is the last acked message. diff --git a/pkg/model/xDS_connection.go b/pkg/model/xDS_connection.go new file mode 100644 index 0000000..3cc8cd8 --- /dev/null +++ b/pkg/model/xDS_connection.go @@ -0,0 +1,67 @@ +package model + +import ( + core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "sync" + "time" +) + +// XDSConnection represents a connection to the xDS server. +type XDSConnection struct { + sync.RWMutex + + // peerAddr is the address of the client, from the network layer. + peerAddr string + + // WatchedResources contains the list of watched resources for the proxy, + // keyed by the DiscoveryRequest TypeUrl. + WatchedResources map[string]*WatchedResource + + // connectedAt stores the time of connection, mainly for debugging purposes. + connectedAt time.Time + + // Identifier represents the unique connection identifier (conID), + // used as a key in the connection table. Currently based on the node name and a counter. + Identifier ClientIdentifier + + // Stream represents the DiscoveryStream interface implemented by both ADS and SDS streams. + Stream DiscoveryStream + + // Node stores the original node metadata, to avoid unmarshal/marshal operations. + // This information is included in internal events. + Node *core.Node + + // Initialized channel is closed when the proxy is initialized. + // Pushes or any other operations accessing the proxy should not start until this channel is closed. + Initialized chan struct{} + + // Stop channel can be used to manually end the connection, typically via debug endpoints. + // It should only be used for testing purposes. + Stop chan struct{} + + // ReqChan is used to receive discovery requests for this connection. + ReqChan chan *discovery.DiscoveryRequest +} + +func NewConnection(peerAddr string, stream DiscoveryStream) *XDSConnection { + return &XDSConnection{ + + Initialized: make(chan struct{}), + Stop: make(chan struct{}), + WatchedResources: make(map[string]*WatchedResource), + ReqChan: make(chan *discovery.DiscoveryRequest, 1), + peerAddr: peerAddr, + connectedAt: time.Now(), + Stream: stream, + } +} + +func (conn *XDSConnection) Watched(typeUrl string) *WatchedResource { + conn.RLock() + defer conn.RUnlock() + if conn.WatchedResources != nil && conn.WatchedResources[typeUrl] != nil { + return conn.WatchedResources[typeUrl] + } + return nil +} diff --git a/pkg/test/grpcClient.go b/pkg/test/grpcClient.go deleted file mode 100644 index e312e69..0000000 --- a/pkg/test/grpcClient.go +++ /dev/null @@ -1,46 +0,0 @@ -package main - -import ( - "context" - "fmt" - v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - "github.com/opensergo/opensergo-control-plane/pkg/model" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "log" -) - -func main() { - conn, err := grpc.Dial(":8002", grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - log.Fatal("服务端出错,连接不上", err) - } - discoveryClient := discovery.NewAggregatedDiscoveryServiceClient(conn) - - // build request - request := discovery.DiscoveryRequest{ - TypeUrl: model.ExtensionConfigType, - Node: &v3.Node{ - Id: "testNode", - }, - ResourceNames: []string{"testns/testapp/RateLimitStrategy", "testns/testapp/FaultToleranceRule", "testns/testapp/ConcurrencyLimitStrategy"}, - } - - streamAggregatedResourcesClient, err := discoveryClient.StreamAggregatedResources(context.Background()) - if err != nil { - log.Fatal("得不到streamAggregatedResourcesClient", err) - } - - if err := streamAggregatedResourcesClient.Send(&request); err != nil { - log.Fatal("发送消息错误", err) - } - for { - discoveryResponse, err := streamAggregatedResourcesClient.Recv() - if err != nil { - log.Fatal("回收消息错误", err) - } - fmt.Println(discoveryResponse) - } - -} diff --git a/pkg/test/init_connection_test.go b/pkg/test/init_connection_test.go new file mode 100644 index 0000000..0c02161 --- /dev/null +++ b/pkg/test/init_connection_test.go @@ -0,0 +1,189 @@ +package test + +import ( + "context" + "fmt" + v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + extension "github.com/envoyproxy/go-control-plane/envoy/service/extension/v3" + "github.com/opensergo/opensergo-control-plane/pkg/controller" + "github.com/opensergo/opensergo-control-plane/pkg/model" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "log" + "testing" + "time" +) + +// test connection init +func TestInitConnection(t *testing.T) { + conn, err := grpc.Dial(":8002", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatal("服务端出错,连接不上", err) + } + discoveryClient := extension.NewExtensionConfigDiscoveryServiceClient(conn) + + // build request + request1 := discovery.DiscoveryRequest{ + TypeUrl: model.ExtensionConfigType, + Node: &v3.Node{ + Id: "testNode", + }, + ResourceNames: []string{"default/foo-app/" + controller.RateLimitStrategyKind}, + } + + streamExtensionClients, err := discoveryClient.StreamExtensionConfigs(context.Background()) + + if err != nil { + log.Fatal("得不到streamAggregatedResourcesClient", err) + } + + if err := streamExtensionClients.Send(&request1); err != nil { + log.Fatal("发送消息错误", err) + } + + for { + discoveryResponse, err := streamExtensionClients.Recv() + if err != nil { + log.Fatal("回收消息错误", err) + } + fmt.Println(discoveryResponse) + } + +} + +func TestMultiConnection(t *testing.T) { + firstconn, err := grpc.Dial(":8002", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatal("服务端出错,连接不上", err) + } + discoveryClient := extension.NewExtensionConfigDiscoveryServiceClient(firstconn) + + // build request + firstrequest := discovery.DiscoveryRequest{ + TypeUrl: model.ExtensionConfigType, + Node: &v3.Node{ + Id: "testNode", + }, + ResourceNames: []string{"default/foo-app/" + controller.ConcurrencyLimitStrategyKind}, + } + + firststreamExtensionClient, err := discoveryClient.StreamExtensionConfigs(context.Background()) + + if err != nil { + log.Fatal("得不到streamAggregatedResourcesClient", err) + } + + if err := firststreamExtensionClient.Send(&firstrequest); err != nil { + log.Fatal("发送消息错误", err) + } + + for { + discoveryResponse, err := firststreamExtensionClient.Recv() + if err != nil { + log.Fatal("回收消息错误", err) + } + fmt.Println(discoveryResponse) + } + +} + +func TestResponseNonce(t *testing.T) { + conn, err := grpc.Dial(":8002", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatal("服务端出错,连接不上", err) + } + discoveryClient := extension.NewExtensionConfigDiscoveryServiceClient(conn) + + // build request + request1 := discovery.DiscoveryRequest{ + TypeUrl: model.ExtensionConfigType, + Node: &v3.Node{ + Id: "testNode", + }, + ResourceNames: []string{"default/foo-app/" + controller.RateLimitStrategyKind}, + } + + streamExtensionClients, err := discoveryClient.StreamExtensionConfigs(context.Background()) + + if err != nil { + log.Fatal("得不到streamAggregatedResourcesClient", err) + } + + if err := streamExtensionClients.Send(&request1); err != nil { + log.Fatal("发送消息错误", err) + } + + for { + discoveryResponse, err := streamExtensionClients.Recv() + if err != nil { + log.Fatal("回收消息错误", err) + } + fmt.Println(discoveryResponse) + // build response + request2 := &discovery.DiscoveryRequest{ + TypeUrl: model.ExtensionConfigType, + Node: &v3.Node{ + Id: "testNode", + }, + ResponseNonce: discoveryResponse.Nonce, + ResourceNames: []string{"default/foo-app/" + controller.RateLimitStrategyKind}, + } + // send ack + streamExtensionClients.Send(request2) + } +} + +func TestSubScribetionChange(t *testing.T) { + conn, err := grpc.Dial(":8002", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatal("服务端出错,连接不上", err) + } + discoveryClient := extension.NewExtensionConfigDiscoveryServiceClient(conn) + + // build request + request1 := discovery.DiscoveryRequest{ + TypeUrl: model.ExtensionConfigType, + Node: &v3.Node{ + Id: "testNode", + }, + ResourceNames: []string{"default/foo-app/" + controller.RateLimitStrategyKind}, + } + + //build request + request2 := discovery.DiscoveryRequest{ + TypeUrl: model.ExtensionConfigType, + Node: &v3.Node{ + Id: "testNode", + }, + // TODO : kind 放到namespace 前面 + ResourceNames: []string{"default/foo-app/" + controller.RateLimitStrategyKind, "default/foo-app/" + controller.ConcurrencyLimitStrategyKind, "default/foo-app/" + controller.ThrottlingStrategyKind}, + } + + streamExtensionClients, err := discoveryClient.StreamExtensionConfigs(context.Background()) + if err != nil { + log.Fatal("得不到streamAggregatedResourcesClient", err) + } + go func() { + fmt.Println("request1 send") + if err := streamExtensionClients.Send(&request1); err != nil { + log.Fatal("发送消息错误", err) + } + }() + + go func() { + time.Sleep(5 * time.Second) + fmt.Println("request2 send") + if err := streamExtensionClients.Send(&request2); err != nil { + log.Fatal("发送消息错误", err) + } + }() + + for { + discoveryResponse, err := streamExtensionClients.Recv() + if err != nil { + log.Fatal("回收消息错误", err) + } + fmt.Println(discoveryResponse) + } +} diff --git a/pkg/test/xDSClient_test.go b/pkg/test/xDSClient_test.go new file mode 100644 index 0000000..33e7868 --- /dev/null +++ b/pkg/test/xDSClient_test.go @@ -0,0 +1,40 @@ +package test + +import ( + "github.com/opensergo/opensergo-control-plane/pkg/controller" + "github.com/opensergo/opensergo-control-plane/pkg/transport/grpc" + "log" + "testing" +) + +func TestXDSClient(t *testing.T) { + // Define the address of the server + serverAddr := ":8002" // Replace with the actual address + + // Create a new XDSClient instance + client := grpc.NewxDSClient(serverAddr, "testNode", []string{controller.FaultToleranceRuleKind, controller.ThrottlingStrategyKind}) + + // Initialize the gRPC connection to the server + conn, err := client.InitGRPCConnection() + if err != nil { + log.Fatalf("Failed to initialize gRPC connection: %v", err) + } + + // Create a DiscoveryRequest + discoveryRequest := client.CreateDiscoveryRequest("default", "foo-app") + + // Create a stream extension client for handling streaming requests + streamExtensionClient, _ := client.CreateStreamExtensionClient(conn) + + // Send the DiscoveryRequest to the server + err = client.SendDiscoveryRequest(streamExtensionClient, discoveryRequest) + if err != nil { + log.Fatalf("Failed to send DiscoveryRequest: %v", err) + } + + // Receive and handle DiscoveryResponses from the server + client.ReceiveDiscoveryResponses(streamExtensionClient) + + // The client will continuously receive and print DiscoveryResponses + // You can add your own logic to handle the responses as needed +} diff --git a/pkg/transport/grpc/connection.go b/pkg/transport/grpc/connection.go index 539e797..323a549 100644 --- a/pkg/transport/grpc/connection.go +++ b/pkg/transport/grpc/connection.go @@ -45,7 +45,7 @@ func (c *Connection) IsValid() bool { type mapValue interface { interface { - *Connection | *model.XDsConnection + *Connection | *model.XDSConnection } } type ConnectionMap[T mapValue] map[model.ClientIdentifier]T diff --git a/pkg/transport/grpc/xDSClient.go b/pkg/transport/grpc/xDSClient.go new file mode 100644 index 0000000..c5d605d --- /dev/null +++ b/pkg/transport/grpc/xDSClient.go @@ -0,0 +1,113 @@ +package grpc + +import ( + "context" + "fmt" + v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + extension "github.com/envoyproxy/go-control-plane/envoy/service/extension/v3" + "github.com/opensergo/opensergo-control-plane/pkg/model" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "log" +) + +const delimiter = "/" + +type XDSClient struct { + address string + id string + // store the rules subscribed + subscribedRules []string + // response nonce + nonce string + + allResponses []*discovery.DiscoveryResponse +} + +// Getter method for the 'nonce' field +func (c *XDSClient) GetNonce() string { + return c.nonce +} + +// Setter method for the 'nonce' field +func (c *XDSClient) SetNonce(nonce string) { + c.nonce = nonce +} + +// Setter method for 'subscribedRules' +func (x *XDSClient) SetSubscribedRules(rules []string) { + x.subscribedRules = rules +} + +// Getter method for 'subscribedRules' +func (x *XDSClient) GetSubscribedRules() []string { + return x.subscribedRules +} + +func NewxDSClient(address string, id string, subscribedRules []string) *XDSClient { + return &XDSClient{ + address: address, + id: id, + subscribedRules: subscribedRules, + } +} + +// InitGRPCConnection initializes a gRPC connection to the server. +func (c *XDSClient) InitGRPCConnection() (*grpc.ClientConn, error) { + ipaddr := c.address + conn, err := grpc.Dial(ipaddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatal("Failed to connect to the server: ", err) + return nil, err + } + return conn, nil +} + +// CreateDiscoveryRequest creates a DiscoveryRequest. +func (c *XDSClient) CreateDiscoveryRequest(namespace string, app string) *discovery.DiscoveryRequest { + resourceNames := make([]string, 0) + for _, rule := range c.subscribedRules { + resourceName := rule + delimiter + app + delimiter + namespace + resourceNames = append(resourceNames, resourceName) + } + return &discovery.DiscoveryRequest{ + TypeUrl: model.ExtensionConfigType, + Node: &v3.Node{ + Id: c.id, + }, + ResponseNonce: c.nonce, + ResourceNames: resourceNames, + } +} + +// createDiscoveryClient creates a DiscoveryServiceClient using the provided gRPC ClientConn. +func (c *XDSClient) CreateStreamExtensionClient(conn *grpc.ClientConn) (extension.ExtensionConfigDiscoveryService_StreamExtensionConfigsClient, error) { + discoveryClient := extension.NewExtensionConfigDiscoveryServiceClient(conn) + streamExtensionClients, err := discoveryClient.StreamExtensionConfigs(context.Background()) + if err != nil { + log.Fatal("Failed to create streamClient: ", err) + return nil, err + } + return streamExtensionClients, nil +} + +// sendDiscoveryRequest sends a DiscoveryRequest to the server using the provided client and request. +func (c *XDSClient) SendDiscoveryRequest(client extension.ExtensionConfigDiscoveryService_StreamExtensionConfigsClient, request *discovery.DiscoveryRequest) error { + if err := client.Send(request); err != nil { + return err + } + return nil +} + +// receiveDiscoveryResponses receives and processes DiscoveryResponses from the server. +func (c *XDSClient) ReceiveDiscoveryResponses(client extension.ExtensionConfigDiscoveryService_StreamExtensionConfigsClient) error { + for { + discoveryResponse, err := client.Recv() + if err != nil { + return err + } + c.SetNonce(discoveryResponse.Nonce) + fmt.Println(discoveryResponse) + } +} diff --git a/pkg/transport/grpc/xDSServer.go b/pkg/transport/grpc/xDSServer.go index 29eda87..e96dcdb 100644 --- a/pkg/transport/grpc/xDSServer.go +++ b/pkg/transport/grpc/xDSServer.go @@ -8,48 +8,34 @@ import ( extension "github.com/envoyproxy/go-control-plane/envoy/service/extension/v3" "github.com/opensergo/opensergo-control-plane/pkg/model" "github.com/opensergo/opensergo-control-plane/pkg/util" + uatomic "go.uber.org/atomic" "google.golang.org/grpc" - "net" - "google.golang.org/grpc/peer" "google.golang.org/protobuf/types/known/anypb" "log" + "net" "strconv" "sync" "sync/atomic" + "time" ) var connectionNumber = int64(0) type DiscoveryServer struct { - port uint32 - // adsClients reflect active gRPC channels, for both ADS and EDS. - ecdsClients map[model.ClientIdentifier]*model.XDsConnection - ecdsClientsMutex sync.RWMutex - grpcServer *grpc.Server - // cxz - // map from to xdsconnection - // namespace appname and kind - XDSConnectionManeger *ConnectionManager[*model.XDsConnection] - subscribeHandlers []model.SubscribeXDsRequestHandler + port uint32 // Port on which the server is running. + ecdsClients map[model.ClientIdentifier]*model.XDSConnection // Active gRPC channels for both ADS and EDS. + ecdsClientsMutex sync.RWMutex // Mutex for managing concurrent access to ecdsClients. + grpcServer *grpc.Server // gRPC server instance. + XDSConnectionManager *ConnectionManager[*model.XDSConnection] // Connection manager for XDS connections. + subscribeHandlers []model.SubscribeXDsRequestHandler // List of request handlers for XDS subscription. + pushVersion uatomic.Uint64 // Atomic counter for push version. } func (s *DiscoveryServer) StreamExtensionConfigs(stream model.DiscoveryStream) error { return s.Stream(stream) } -//func (s *DiscoveryServer) WaitForRequestLimit(ctx context.Context) error { -// if s.RequestRateLimit.Limit() == 0 { -// // Allow opt out when rate limiting is set to 0qps -// return nil -// } -// // Give a bit of time for queue to clear out, but if not fail fast. Client will connect to another -// // instance in best case, or retry with backoff. -// wait, cancel := context.WithTimeout(ctx, time.Second) -// defer cancel() -// return s.RequestRateLimit.Wait(wait) -//} - func (s *DiscoveryServer) Stream(stream model.DiscoveryStream) error { ctx := stream.Context() peerAddr := "0.0.0.0" @@ -99,7 +85,7 @@ func (s *DiscoveryServer) Stream(stream model.DiscoveryStream) error { return nil } -func (s *DiscoveryServer) receive(con *model.XDsConnection) { +func (s *DiscoveryServer) receive(con *model.XDSConnection) { defer func() { close(con.ReqChan) @@ -114,7 +100,10 @@ func (s *DiscoveryServer) receive(con *model.XDsConnection) { firstRequest := true for { req, err := con.Stream.Recv() - log.Printf("Stream received message failed, err=%s\n", err.Error()) + if err != nil { + log.Printf("Stream received message failed, err=%s\n", err.Error()) + return + } if firstRequest { firstRequest = false @@ -132,7 +121,7 @@ func (s *DiscoveryServer) receive(con *model.XDsConnection) { } } -func (s *DiscoveryServer) initConnection(node *core.Node, con *model.XDsConnection) error { +func (s *DiscoveryServer) initConnection(node *core.Node, con *model.XDSConnection) error { // First request so initialize connection id and start tracking it. con.Identifier = connectionID(node.Id) con.Node = node @@ -154,7 +143,7 @@ func connectionID(node string) model.ClientIdentifier { return model.ClientIdentifier(node + "-" + strconv.FormatInt(id, 10)) } -func (s *DiscoveryServer) addCon(identifier model.ClientIdentifier, con *model.XDsConnection) { +func (s *DiscoveryServer) addCon(identifier model.ClientIdentifier, con *model.XDSConnection) { s.ecdsClientsMutex.Lock() defer s.ecdsClientsMutex.Unlock() s.ecdsClients[identifier] = con @@ -179,7 +168,7 @@ func shouldUnsubscribe(request *discovery.DiscoveryRequest) bool { var emptyResourceDelta = model.ResourceDelta{} -func ShouldRespond(con *model.XDsConnection, request *discovery.DiscoveryRequest) (bool, model.ResourceDelta) { +func ShouldRespond(con *model.XDSConnection, request *discovery.DiscoveryRequest) (bool, model.ResourceDelta) { // NACK if request.ErrorDetail != nil { @@ -187,6 +176,10 @@ func ShouldRespond(con *model.XDsConnection, request *discovery.DiscoveryRequest return false, emptyResourceDelta } + con.RLock() + previousInfo := con.WatchedResources[request.TypeUrl] + con.RUnlock() + if shouldUnsubscribe(request) { con.Lock() delete(con.WatchedResources, request.TypeUrl) @@ -194,13 +187,9 @@ func ShouldRespond(con *model.XDsConnection, request *discovery.DiscoveryRequest return false, emptyResourceDelta } - con.RLock() - previousInfo := con.WatchedResources[request.TypeUrl] - con.RUnlock() - // We should always respond with the current resource names. if request.ResponseNonce == "" || previousInfo == nil { - log.Println("ECDS: INIT/RECONNECT %s %s %s", con.Identifier, request.VersionInfo, request.ResponseNonce) + log.Printf("ECDS: INIT/RECONNECT %s %s %s", con.Identifier, request.VersionInfo, request.ResponseNonce) con.Lock() con.WatchedResources[request.TypeUrl] = &model.WatchedResource{TypeUrl: request.TypeUrl, ResourceNames: request.ResourceNames} con.Unlock() @@ -220,6 +209,8 @@ func ShouldRespond(con *model.XDsConnection, request *discovery.DiscoveryRequest return false, emptyResourceDelta } + // log for test + log.Printf("nonce before %s nonce now %s,", request.ResponseNonce, previousInfo.NonceSent) // If it comes here, that means nonce match. con.Lock() previousResources := con.WatchedResources[request.TypeUrl].ResourceNames @@ -235,6 +226,7 @@ func ShouldRespond(con *model.XDsConnection, request *discovery.DiscoveryRequest added := cur.Difference(prev) if len(removed) == 0 && len(added) == 0 { + log.Println("ack received") // this is an ack nonce matched return false, emptyResourceDelta } @@ -245,7 +237,7 @@ func ShouldRespond(con *model.XDsConnection, request *discovery.DiscoveryRequest } } -func (s *DiscoveryServer) pushXds(con *model.XDsConnection, w *model.WatchedResource, version int64, rules []*anypb.Any) error { +func (s *DiscoveryServer) pushXds(con *model.XDSConnection, w *model.WatchedResource, version int64, rules []*anypb.Any) error { resp := &discovery.DiscoveryResponse{ TypeUrl: w.TypeUrl, @@ -257,12 +249,11 @@ func (s *DiscoveryServer) pushXds(con *model.XDsConnection, w *model.WatchedReso return con.Stream.Send(resp) } -// cxz -func (s *DiscoveryServer) AddConnectioonToMap(namespace, appname, kind string, con *model.XDsConnection) { +func (s *DiscoveryServer) AddConnectionToMap(namespace, appname, kind string, con *model.XDSConnection) { s.ecdsClientsMutex.Lock() defer s.ecdsClientsMutex.Unlock() - s.XDSConnectionManeger.Add(namespace, appname, kind, con, con.Identifier) + s.XDSConnectionManager.Add(namespace, appname, kind, con, con.Identifier) } @@ -270,7 +261,7 @@ func (s *DiscoveryServer) RemoveConnectionFromMap(n model.NamespacedApp, kind st s.ecdsClientsMutex.Lock() defer s.ecdsClientsMutex.Unlock() // TODO: HANDLE Error - if err := s.XDSConnectionManeger.removeInternal(n, kind, identifier); err != nil { + if err := s.XDSConnectionManager.removeInternal(n, kind, identifier); err != nil { return err } return nil @@ -278,12 +269,12 @@ func (s *DiscoveryServer) RemoveConnectionFromMap(n model.NamespacedApp, kind st } func NewDiscoveryServer(port uint32, subscribeHandlers []model.SubscribeXDsRequestHandler) *DiscoveryServer { - connectionManager := NewConnectionManager[*model.XDsConnection]() + connectionManager := NewConnectionManager[*model.XDSConnection]() return &DiscoveryServer{ port: port, - ecdsClients: make(map[model.ClientIdentifier]*model.XDsConnection), + ecdsClients: make(map[model.ClientIdentifier]*model.XDSConnection), grpcServer: grpc.NewServer(), - XDSConnectionManeger: connectionManager, + XDSConnectionManager: connectionManager, subscribeHandlers: subscribeHandlers, } } @@ -303,11 +294,16 @@ func (s *DiscoveryServer) Run() error { if err != nil { return err } - + fmt.Println(listener) extension.RegisterExtensionConfigDiscoveryServiceServer(s.grpcServer, s) err = s.grpcServer.Serve(listener) + if err != nil { return err } return nil } + +func (s *DiscoveryServer) NextVersion() string { + return time.Now().Format(time.RFC3339) + "/" + strconv.FormatUint(s.pushVersion.Inc(), 10) +} diff --git a/samples/test.yaml b/samples/test.yaml new file mode 100644 index 0000000..ae98641 --- /dev/null +++ b/samples/test.yaml @@ -0,0 +1,11 @@ +apiVersion: fault-tolerance.opensergo.io/v1alpha1 +kind: RateLimitStrategy +metadata: + name: rate-limit-foo + labels: + app: foo-app +spec: + metricType: RequestAmount + limitMode: Local + threshold: 4 + statDurationSeconds: 5 \ No newline at end of file From bb76a33fe83530dcee85ea1fecf46a463b29a3c6 Mon Sep 17 00:00:00 2001 From: changxinzhang Date: Sun, 8 Oct 2023 18:49:01 -0500 Subject: [PATCH 3/7] OSPP --- control_plane.go | 48 ++++++++++++++++++++++++-------- pkg/main/main.go | 2 -- pkg/model/model.go | 3 +- pkg/model/xDS_connection.go | 7 +++-- pkg/test/init_connection_test.go | 12 ++++---- pkg/test/xDSClient_test.go | 5 ++-- pkg/transport/grpc/connection.go | 3 +- pkg/transport/grpc/server.go | 5 ++-- pkg/transport/grpc/xDSClient.go | 6 ++-- pkg/transport/grpc/xDSServer.go | 40 +++++++------------------- pkg/util/nonce.go | 3 +- 11 files changed, 74 insertions(+), 60 deletions(-) diff --git a/control_plane.go b/control_plane.go index 5aed23b..692fa9e 100644 --- a/control_plane.go +++ b/control_plane.go @@ -15,20 +15,22 @@ package opensergo import ( - discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - "github.com/opensergo/opensergo-control-plane/pkg/util" - "google.golang.org/protobuf/types/known/anypb" + "fmt" "log" "os" "strings" "sync" + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/pkg/errors" + "google.golang.org/protobuf/types/known/anypb" + "github.com/opensergo/opensergo-control-plane/pkg/controller" "github.com/opensergo/opensergo-control-plane/pkg/model" trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1" "github.com/opensergo/opensergo-control-plane/pkg/transport/grpc" transport "github.com/opensergo/opensergo-control-plane/pkg/transport/grpc" - "github.com/pkg/errors" + "github.com/opensergo/opensergo-control-plane/pkg/util" ) const delimiter = "/" @@ -42,6 +44,12 @@ type ControlPlane struct { mux sync.RWMutex } +type Request struct { + Kind string + AppName string + Namespace string +} + func NewControlPlane() (*ControlPlane, error) { cp := &ControlPlane{} @@ -145,13 +153,16 @@ func (c *ControlPlane) handleXDSSubscribeRequest(req *discovery.DiscoveryRequest var rules []*anypb.Any for resourcename := range subscribed { // Split the resource name into its components. - request := strings.Split(resourcename, delimiter) + request, err := splitRequest(resourcename) + if err != nil { + continue + } // Register a watcher for the specified resource. crdWatcher, err := c.operator.RegisterWatcher(model.SubscribeTarget{ - Namespace: request[4], - AppName: request[3], - Kind: request[0] + delimiter + request[1] + delimiter + request[2], + Namespace: request.Namespace, + AppName: request.AppName, + Kind: request.Kind, }) if err != nil { @@ -161,12 +172,12 @@ func (c *ControlPlane) handleXDSSubscribeRequest(req *discovery.DiscoveryRequest } // Add the connection to the connection map. - c.xdsServer.AddConnectionToMap(request[4], request[3], request[0]+"/"+request[1]+"/"+request[2], con) + c.xdsServer.AddConnectionToMap(request.Namespace, request.AppName, request.Kind, con) // Get the current rules for the resource. curRules, _ := crdWatcher.GetRules(model.NamespacedApp{ - Namespace: request[4], - App: request[3], + Namespace: request.Namespace, + App: request.AppName, }) if len(curRules) > 0 { @@ -304,3 +315,18 @@ func (c *ControlPlane) pushXds(namespace, app, kind string, rules []*anypb.Any) // Return nil to indicate success. return nil } + +func splitRequest(request string) (req Request, err error) { + requestArray := strings.Split(request, delimiter) + if len(requestArray) != 5 { + return req, fmt.Errorf("invalid request format") + } + + req = Request{ + Kind: requestArray[0] + delimiter + requestArray[1] + delimiter + requestArray[2], + AppName: requestArray[3], + Namespace: requestArray[4], + } + + return req, nil +} diff --git a/pkg/main/main.go b/pkg/main/main.go index 66ffb46..f360880 100644 --- a/pkg/main/main.go +++ b/pkg/main/main.go @@ -24,9 +24,7 @@ func main() { if err != nil { log.Fatal(err) } - err = cp.Start() - if err != nil { log.Fatal(err) } diff --git a/pkg/model/model.go b/pkg/model/model.go index 8cf3981..3a8c51a 100644 --- a/pkg/model/model.go +++ b/pkg/model/model.go @@ -17,9 +17,10 @@ package model import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" extension "github.com/envoyproxy/go-control-plane/envoy/service/extension/v3" + "google.golang.org/protobuf/types/known/anypb" + trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1" "github.com/opensergo/opensergo-control-plane/pkg/util" - "google.golang.org/protobuf/types/known/anypb" ) // Users could control this variable to determine whether use diff --git a/pkg/model/xDS_connection.go b/pkg/model/xDS_connection.go index 3cc8cd8..174d3ff 100644 --- a/pkg/model/xDS_connection.go +++ b/pkg/model/xDS_connection.go @@ -1,10 +1,11 @@ package model import ( - core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "sync" "time" + + core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" ) // XDSConnection represents a connection to the xDS server. @@ -60,7 +61,7 @@ func NewConnection(peerAddr string, stream DiscoveryStream) *XDSConnection { func (conn *XDSConnection) Watched(typeUrl string) *WatchedResource { conn.RLock() defer conn.RUnlock() - if conn.WatchedResources != nil && conn.WatchedResources[typeUrl] != nil { + if conn.WatchedResources[typeUrl] != nil { return conn.WatchedResources[typeUrl] } return nil diff --git a/pkg/test/init_connection_test.go b/pkg/test/init_connection_test.go index 0c02161..05a4fe1 100644 --- a/pkg/test/init_connection_test.go +++ b/pkg/test/init_connection_test.go @@ -3,16 +3,18 @@ package test import ( "context" "fmt" + "log" + "testing" + "time" + v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" extension "github.com/envoyproxy/go-control-plane/envoy/service/extension/v3" - "github.com/opensergo/opensergo-control-plane/pkg/controller" - "github.com/opensergo/opensergo-control-plane/pkg/model" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "log" - "testing" - "time" + + "github.com/opensergo/opensergo-control-plane/pkg/controller" + "github.com/opensergo/opensergo-control-plane/pkg/model" ) // test connection init diff --git a/pkg/test/xDSClient_test.go b/pkg/test/xDSClient_test.go index 33e7868..f512745 100644 --- a/pkg/test/xDSClient_test.go +++ b/pkg/test/xDSClient_test.go @@ -1,10 +1,11 @@ package test import ( - "github.com/opensergo/opensergo-control-plane/pkg/controller" - "github.com/opensergo/opensergo-control-plane/pkg/transport/grpc" "log" "testing" + + "github.com/opensergo/opensergo-control-plane/pkg/controller" + "github.com/opensergo/opensergo-control-plane/pkg/transport/grpc" ) func TestXDSClient(t *testing.T) { diff --git a/pkg/transport/grpc/connection.go b/pkg/transport/grpc/connection.go index 323a549..3c14a31 100644 --- a/pkg/transport/grpc/connection.go +++ b/pkg/transport/grpc/connection.go @@ -17,9 +17,10 @@ package grpc import ( "sync" + "github.com/pkg/errors" + "github.com/opensergo/opensergo-control-plane/pkg/model" pb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1" - "github.com/pkg/errors" ) type OpenSergoTransportStream = pb.OpenSergoUniversalTransportService_SubscribeConfigServer diff --git a/pkg/transport/grpc/server.go b/pkg/transport/grpc/server.go index d4214d2..ee19f3a 100644 --- a/pkg/transport/grpc/server.go +++ b/pkg/transport/grpc/server.go @@ -20,11 +20,12 @@ import ( "log" "net" + "go.uber.org/atomic" + "google.golang.org/grpc" + "github.com/opensergo/opensergo-control-plane/pkg/model" trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1" "github.com/opensergo/opensergo-control-plane/pkg/util" - "go.uber.org/atomic" - "google.golang.org/grpc" ) const ( diff --git a/pkg/transport/grpc/xDSClient.go b/pkg/transport/grpc/xDSClient.go index c5d605d..9245907 100644 --- a/pkg/transport/grpc/xDSClient.go +++ b/pkg/transport/grpc/xDSClient.go @@ -3,13 +3,15 @@ package grpc import ( "context" "fmt" + "log" + v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" extension "github.com/envoyproxy/go-control-plane/envoy/service/extension/v3" - "github.com/opensergo/opensergo-control-plane/pkg/model" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "log" + + "github.com/opensergo/opensergo-control-plane/pkg/model" ) const delimiter = "/" diff --git a/pkg/transport/grpc/xDSServer.go b/pkg/transport/grpc/xDSServer.go index e96dcdb..7bb3dbc 100644 --- a/pkg/transport/grpc/xDSServer.go +++ b/pkg/transport/grpc/xDSServer.go @@ -3,21 +3,23 @@ package grpc import ( context "context" "fmt" + "log" + "net" + "strconv" + "sync" + "sync/atomic" + "time" + core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" extension "github.com/envoyproxy/go-control-plane/envoy/service/extension/v3" - "github.com/opensergo/opensergo-control-plane/pkg/model" - "github.com/opensergo/opensergo-control-plane/pkg/util" uatomic "go.uber.org/atomic" "google.golang.org/grpc" "google.golang.org/grpc/peer" "google.golang.org/protobuf/types/known/anypb" - "log" - "net" - "strconv" - "sync" - "sync/atomic" - "time" + + "github.com/opensergo/opensergo-control-plane/pkg/model" + "github.com/opensergo/opensergo-control-plane/pkg/util" ) var connectionNumber = int64(0) @@ -43,27 +45,13 @@ func (s *DiscoveryServer) Stream(stream model.DiscoveryStream) error { peerAddr = peerInfo.Addr.String() } - // TODO: Rate Limit - //if err := s.WaitForRequestLimit(stream.Context()); err != nil { - // //log.Warnf("ADS: %q exceeded rate limit: %v", peerAddr, err) - // return status.Errorf(codes.ResourceExhausted, "request rate limit exceeded: %v", err) - //} - con := model.NewConnection(peerAddr, stream) go s.receive(con) - // Wait for the proxy to be fully initialized before we start serving traffic. Because - // initialization doesn't have dependencies that will block, there is no need to add any timeout - // here. Prior to this explicit wait, we were implicitly waiting by receive() not sending to - // reqChannel and the connection not being enqueued for pushes to pushChannel until the - // initialization is complete. <-con.Initialized for { - // Go select{} statements are not ordered; the same channel can be chosen many times. - // For requests, these are higher priority (client may be blocked on startup until these are done) - // and often very cheap to handle (simple ACK), so we check it first. select { case req, ok := <-con.ReqChan: if ok { @@ -126,11 +114,6 @@ func (s *DiscoveryServer) initConnection(node *core.Node, con *model.XDSConnecti con.Identifier = connectionID(node.Id) con.Node = node - // Register the connection. this allows pushes to be triggered for the proxy. Note: the timing of - // this and initializeProxy important. While registering for pushes *after* initialization is complete seems like - // a better choice, it introduces a race condition; If we complete initialization of a new push - // context between initializeProxy and addCon, we would not get any pushes triggered for the new - // push context, leading the proxy to have a stale state until the next full push. s.addCon(con.Identifier, con) defer close(con.Initialized) @@ -199,9 +182,6 @@ func ShouldRespond(con *model.XDSConnection, request *discovery.DiscoveryRequest } } - // If there is mismatch in the nonce, that is a case of expired/stale nonce. - // A nonce becomes stale following a newer nonce being sent to Envoy. - // previousInfo.NonceSent can be empty if we previously had shouldRespond=true but didn't send any resources. if request.ResponseNonce != previousInfo.NonceSent { log.Println("ECDS: REQ %s Expired nonce received %s, sent %s", diff --git a/pkg/util/nonce.go b/pkg/util/nonce.go index e48c42e..a77c246 100644 --- a/pkg/util/nonce.go +++ b/pkg/util/nonce.go @@ -1,8 +1,9 @@ package util import ( - "github.com/google/uuid" "strings" + + "github.com/google/uuid" ) func Nonce() string { From fbf3f8ddcf7c6eff321f49ef854c6a932419cab5 Mon Sep 17 00:00:00 2001 From: changxinzhang Date: Sun, 8 Oct 2023 18:57:45 -0500 Subject: [PATCH 4/7] fourth commit --- pkg/controller/crd_cache.go | 3 ++- pkg/controller/crd_meta.go | 3 ++- pkg/controller/crd_watcher.go | 17 +++++++++-------- pkg/controller/k8s_operator.go | 5 +++-- pkg/controller/traffic_router_transform.go | 1 + 5 files changed, 17 insertions(+), 12 deletions(-) diff --git a/pkg/controller/crd_cache.go b/pkg/controller/crd_cache.go index 3a130aa..e97d1a3 100644 --- a/pkg/controller/crd_cache.go +++ b/pkg/controller/crd_cache.go @@ -17,9 +17,10 @@ package controller import ( "sync" - "github.com/opensergo/opensergo-control-plane/pkg/model" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/opensergo/opensergo-control-plane/pkg/model" ) type CRDObjectsHolder struct { diff --git a/pkg/controller/crd_meta.go b/pkg/controller/crd_meta.go index 859de32..2d1ab3b 100644 --- a/pkg/controller/crd_meta.go +++ b/pkg/controller/crd_meta.go @@ -15,9 +15,10 @@ package controller import ( + "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1" "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1/traffic" - "sigs.k8s.io/controller-runtime/pkg/client" ) type CRDKind = string diff --git a/pkg/controller/crd_watcher.go b/pkg/controller/crd_watcher.go index ff781f0..889bb87 100644 --- a/pkg/controller/crd_watcher.go +++ b/pkg/controller/crd_watcher.go @@ -16,25 +16,26 @@ package controller import ( "context" - trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1" "log" "net/http" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" "strconv" "sync" "github.com/go-logr/logr" - crdv1alpha1 "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1" - crdv1alpha1traffic "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1/traffic" - "github.com/opensergo/opensergo-control-plane/pkg/model" - pb "github.com/opensergo/opensergo-control-plane/pkg/proto/fault_tolerance/v1" - "github.com/opensergo/opensergo-control-plane/pkg/util" "github.com/pkg/errors" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" k8sApiError "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + crdv1alpha1 "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1" + crdv1alpha1traffic "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1/traffic" + "github.com/opensergo/opensergo-control-plane/pkg/model" + pb "github.com/opensergo/opensergo-control-plane/pkg/proto/fault_tolerance/v1" + trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1" + "github.com/opensergo/opensergo-control-plane/pkg/util" ) // CRDWatcher watches a specific kind of CRD. diff --git a/pkg/controller/k8s_operator.go b/pkg/controller/k8s_operator.go index 5a53c80..173ee50 100644 --- a/pkg/controller/k8s_operator.go +++ b/pkg/controller/k8s_operator.go @@ -21,8 +21,6 @@ import ( "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" - crdv1alpha1 "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1" - crdv1alpha1traffic "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1/traffic" "github.com/opensergo/opensergo-control-plane/pkg/model" "github.com/pkg/errors" "k8s.io/apimachinery/pkg/runtime" @@ -30,6 +28,9 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ctrl "sigs.k8s.io/controller-runtime" // +kubebuilder:scaffold:imports + + crdv1alpha1 "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1" + crdv1alpha1traffic "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1/traffic" ) var ( diff --git a/pkg/controller/traffic_router_transform.go b/pkg/controller/traffic_router_transform.go index 144c7b5..02bc672 100644 --- a/pkg/controller/traffic_router_transform.go +++ b/pkg/controller/traffic_router_transform.go @@ -19,6 +19,7 @@ import ( routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" matcherv3 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" "github.com/golang/protobuf/ptypes/wrappers" + "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1/traffic" route "github.com/opensergo/opensergo-control-plane/pkg/proto/router/v1" "github.com/opensergo/opensergo-control-plane/pkg/util" From 7f7f6e757936f4d8b3d10ce6cc4ea903263a5032 Mon Sep 17 00:00:00 2001 From: changxinzhang Date: Mon, 9 Oct 2023 11:15:04 -0500 Subject: [PATCH 5/7] fifth commit --- control_plane.go | 2 +- pkg/test/xDSClient_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/control_plane.go b/control_plane.go index 692fa9e..be0a9b6 100644 --- a/control_plane.go +++ b/control_plane.go @@ -59,7 +59,7 @@ func NewControlPlane() (*ControlPlane, error) { } cp.server = transport.NewServer(uint32(10246), []model.SubscribeRequestHandler{cp.handleSubscribeRequest}) - cp.xdsServer = transport.NewDiscoveryServer(uint32(8002), []model.SubscribeXDsRequestHandler{cp.handleXDSSubscribeRequest}) + cp.xdsServer = transport.NewDiscoveryServer(uint32(10248), []model.SubscribeXDsRequestHandler{cp.handleXDSSubscribeRequest}) cp.operator = operator hostname, herr := os.Hostname() diff --git a/pkg/test/xDSClient_test.go b/pkg/test/xDSClient_test.go index f512745..8c6a500 100644 --- a/pkg/test/xDSClient_test.go +++ b/pkg/test/xDSClient_test.go @@ -10,7 +10,7 @@ import ( func TestXDSClient(t *testing.T) { // Define the address of the server - serverAddr := ":8002" // Replace with the actual address + serverAddr := ":10248" // Replace with the actual address // Create a new XDSClient instance client := grpc.NewxDSClient(serverAddr, "testNode", []string{controller.FaultToleranceRuleKind, controller.ThrottlingStrategyKind}) From 2c859b9cd8047caf9d54cf1b9991f76f7c4747be Mon Sep 17 00:00:00 2001 From: changxinzhang Date: Fri, 13 Oct 2023 11:47:59 -0500 Subject: [PATCH 6/7] sixth commit --- pkg/test/init_connection_test.go | 8 ++++---- pkg/transport/grpc/xDSServer.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/test/init_connection_test.go b/pkg/test/init_connection_test.go index 05a4fe1..c2f1b5a 100644 --- a/pkg/test/init_connection_test.go +++ b/pkg/test/init_connection_test.go @@ -19,7 +19,7 @@ import ( // test connection init func TestInitConnection(t *testing.T) { - conn, err := grpc.Dial(":8002", grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.Dial(":10248", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatal("服务端出错,连接不上", err) } @@ -55,7 +55,7 @@ func TestInitConnection(t *testing.T) { } func TestMultiConnection(t *testing.T) { - firstconn, err := grpc.Dial(":8002", grpc.WithTransportCredentials(insecure.NewCredentials())) + firstconn, err := grpc.Dial(":10248", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatal("服务端出错,连接不上", err) } @@ -91,7 +91,7 @@ func TestMultiConnection(t *testing.T) { } func TestResponseNonce(t *testing.T) { - conn, err := grpc.Dial(":8002", grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.Dial(":10248", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatal("服务端出错,连接不上", err) } @@ -137,7 +137,7 @@ func TestResponseNonce(t *testing.T) { } func TestSubScribetionChange(t *testing.T) { - conn, err := grpc.Dial(":8002", grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.Dial(":10248", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatal("服务端出错,连接不上", err) } diff --git a/pkg/transport/grpc/xDSServer.go b/pkg/transport/grpc/xDSServer.go index 7bb3dbc..2ceb575 100644 --- a/pkg/transport/grpc/xDSServer.go +++ b/pkg/transport/grpc/xDSServer.go @@ -184,7 +184,7 @@ func ShouldRespond(con *model.XDSConnection, request *discovery.DiscoveryRequest if request.ResponseNonce != previousInfo.NonceSent { - log.Println("ECDS: REQ %s Expired nonce received %s, sent %s", + log.Printf("ECDS: REQ %s Expired nonce received %s, sent %s", con.Identifier, request.ResponseNonce, previousInfo.NonceSent) return false, emptyResourceDelta } From 6c9a5dd1ddf007d18d2881df63c8f932710faca8 Mon Sep 17 00:00:00 2001 From: changxinzhang Date: Fri, 13 Oct 2023 23:14:44 -0500 Subject: [PATCH 7/7] seventh commit --- control_plane.go | 8 +++++--- pkg/test/init_connection_test.go | 4 ++++ pkg/test/xDSClient_test.go | 1 + pkg/transport/grpc/xDSServer.go | 1 - 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/control_plane.go b/control_plane.go index be0a9b6..80c71ee 100644 --- a/control_plane.go +++ b/control_plane.go @@ -198,10 +198,12 @@ func (c *ControlPlane) handleXDSSubscribeRequest(req *discovery.DiscoveryRequest // Handle unsubscribed resources. for resourcename := range unsubscribed { // Split the resource name into its components. - request := strings.Split(resourcename, delimiter) - + request, err := splitRequest(resourcename) + if err != nil { + continue + } // Remove the connection from the connection map. - c.xdsServer.RemoveConnectionFromMap(model.NamespacedApp{request[0], request[1]}, request[2], con.Identifier) + c.xdsServer.RemoveConnectionFromMap(model.NamespacedApp{Namespace: request.Namespace, App: request.AppName}, request.Kind, con.Identifier) } } diff --git a/pkg/test/init_connection_test.go b/pkg/test/init_connection_test.go index c2f1b5a..99b5168 100644 --- a/pkg/test/init_connection_test.go +++ b/pkg/test/init_connection_test.go @@ -19,6 +19,7 @@ import ( // test connection init func TestInitConnection(t *testing.T) { + t.Skip() conn, err := grpc.Dial(":10248", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatal("服务端出错,连接不上", err) @@ -55,6 +56,7 @@ func TestInitConnection(t *testing.T) { } func TestMultiConnection(t *testing.T) { + t.Skip() firstconn, err := grpc.Dial(":10248", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatal("服务端出错,连接不上", err) @@ -91,6 +93,7 @@ func TestMultiConnection(t *testing.T) { } func TestResponseNonce(t *testing.T) { + t.Skip() conn, err := grpc.Dial(":10248", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatal("服务端出错,连接不上", err) @@ -137,6 +140,7 @@ func TestResponseNonce(t *testing.T) { } func TestSubScribetionChange(t *testing.T) { + t.Skip() conn, err := grpc.Dial(":10248", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatal("服务端出错,连接不上", err) diff --git a/pkg/test/xDSClient_test.go b/pkg/test/xDSClient_test.go index 8c6a500..63d9c0b 100644 --- a/pkg/test/xDSClient_test.go +++ b/pkg/test/xDSClient_test.go @@ -9,6 +9,7 @@ import ( ) func TestXDSClient(t *testing.T) { + t.Skip() // Define the address of the server serverAddr := ":10248" // Replace with the actual address diff --git a/pkg/transport/grpc/xDSServer.go b/pkg/transport/grpc/xDSServer.go index 2ceb575..99e0aa9 100644 --- a/pkg/transport/grpc/xDSServer.go +++ b/pkg/transport/grpc/xDSServer.go @@ -70,7 +70,6 @@ func (s *DiscoveryServer) Stream(stream model.DiscoveryStream) error { } - return nil } func (s *DiscoveryServer) receive(con *model.XDSConnection) {