Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OSPP2023-23aaf0426 #65

Merged
merged 7 commits into from
Oct 14, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 162 additions & 18 deletions control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,28 @@
package opensergo

import (
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

所有的import 都需要保持以下格式

基础库

第三方库

该项目

"github.com/opensergo/opensergo-control-plane/pkg/util"
"google.golang.org/protobuf/types/known/anypb"
"log"
"os"
"strings"
"sync"

"github.com/opensergo/opensergo-control-plane/pkg/controller"
"github.com/opensergo/opensergo-control-plane/pkg/model"
trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1"
"github.com/opensergo/opensergo-control-plane/pkg/transport/grpc"
transport "github.com/opensergo/opensergo-control-plane/pkg/transport/grpc"
"github.com/pkg/errors"
)

type ControlPlane struct {
operator *controller.KubernetesOperator
server *transport.Server
const delimiter = "/"

type ControlPlane struct {
operator *controller.KubernetesOperator
server *transport.Server
xdsServer *transport.DiscoveryServer
protoDesc *trpb.ControlPlaneDesc

mux sync.RWMutex
Expand All @@ -38,12 +45,13 @@ type ControlPlane struct {
func NewControlPlane() (*ControlPlane, error) {
cp := &ControlPlane{}

operator, err := controller.NewKubernetesOperator(cp.sendMessage)
operator, err := controller.NewKubernetesOperator(cp.sendMessage, cp.pushXds)
if err != nil {
return nil, err
}

cp.server = transport.NewServer(uint32(10246), []model.SubscribeRequestHandler{cp.handleSubscribeRequest})
cp.xdsServer = transport.NewDiscoveryServer(uint32(8002), []model.SubscribeXDsRequestHandler{cp.handleXDSSubscribeRequest})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个端口是你自定义的?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,因为我看原项目也是固定端口是10246,所以我就写了8002

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你看看10248 有没有用?没有,就用这个吧

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的,我看一下

cp.operator = operator

hostname, herr := os.Hostname()
Expand All @@ -59,13 +67,23 @@ func NewControlPlane() (*ControlPlane, error) {
func (c *ControlPlane) Start() error {
// Run the Kubernetes operator
err := c.operator.Run()

if err != nil {
return err
}
// Run the transport server
err = c.server.Run()
if err != nil {
return err

if model.GlobalBoolVariable {
//Run the transport server
err = c.server.Run()
if err != nil {
return err
}
} else {
//Run the xDS Server
err = c.xdsServer.Run()
if err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -106,16 +124,80 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream
})
}

// handleXDSSubscribeRequest handles the XDS subscription request.
func (c *ControlPlane) handleXDSSubscribeRequest(req *discovery.DiscoveryRequest, con *model.XDSConnection) error {
// Check if the request is for ExtensionConfigType.
if req.TypeUrl != model.ExtensionConfigType {
return nil
}

// Determine whether to respond and calculate the delta.
shouldRespond, delta := grpc.ShouldRespond(con, req)
subscribed := delta.Subscribed
unsubscribed := delta.Unsubscribed

if !shouldRespond {
// If there's no need to respond, return early.
return nil
}

if len(subscribed) != 0 {
var rules []*anypb.Any
for resourcename := range subscribed {
// Split the resource name into its components.
request := strings.Split(resourcename, delimiter)

// Register a watcher for the specified resource.
crdWatcher, err := c.operator.RegisterWatcher(model.SubscribeTarget{
Namespace: request[4],
AppName: request[3],
Kind: request[0] + delimiter + request[1] + delimiter + request[2],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里 数字的可维护性太差了,建议弄一个函数包一下。
namespace, appname, kind := tool(xxxx)

})

if err != nil {
// Log the error and continue to the next resource.
log.Printf("Error registering watcher for resource %s: %s\n", resourcename, err.Error())
continue
}

// Add the connection to the connection map.
c.xdsServer.AddConnectionToMap(request[4], request[3], request[0]+"/"+request[1]+"/"+request[2], con)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里同理,建议,在最上面提前计算好


// Get the current rules for the resource.
curRules, _ := crdWatcher.GetRules(model.NamespacedApp{
Namespace: request[4],
App: request[3],
})

if len(curRules) > 0 {
// Append the current rules to the rules slice.
rules = append(rules, curRules...)
}
}

// Push XDS rules to the connection.
err := c.pushXdsToStream(con, con.Watched(req.TypeUrl), rules)
if err != nil {
// Log the error if pushing XDS rules fails.
log.Printf("Failed to push XDS rules to connection: %s\n", err.Error())
}
}

if len(unsubscribed) != 0 {
// Handle unsubscribed resources.
for resourcename := range unsubscribed {
// Split the resource name into its components.
request := strings.Split(resourcename, delimiter)

// Remove the connection from the connection map.
c.xdsServer.RemoveConnectionFromMap(model.NamespacedApp{request[0], request[1]}, request[2], con.Identifier)
}
}

return nil
}

func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error {
// var labels []model.LabelKV
// if request.Target.Labels != nil {
// for _, label := range request.Target.Labels {
// labels = append(labels, model.LabelKV{
// Key: label.Key,
// Value: label.Value,
// })
// }
// }
for _, kind := range request.Target.Kinds {
crdWatcher, err := c.operator.RegisterWatcher(model.SubscribeTarget{
Namespace: request.Target.Namespace,
Expand All @@ -135,7 +217,8 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent
}
continue
}
_ = c.server.ConnectionManager().Add(request.Target.Namespace, request.Target.App, kind, transport.NewConnection(clientIdentifier, stream))
curConnection := transport.NewConnection(clientIdentifier, stream)
_ = c.server.ConnectionManager().Add(request.Target.Namespace, request.Target.App, kind, curConnection, curConnection.Identifier())
// send if the watcher cache is not empty
rules, version := crdWatcher.GetRules(model.NamespacedApp{
Namespace: request.Target.Namespace,
Expand All @@ -160,3 +243,64 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent
}
return nil
}

func (c *ControlPlane) pushXdsToStream(con *model.XDSConnection, w *model.WatchedResource, rules []*anypb.Any) error {
res := &discovery.DiscoveryResponse{
TypeUrl: w.TypeUrl,
VersionInfo: c.xdsServer.NextVersion(),

// TODO: RECORD THE NONCE AND CHECK THE NONCE
Nonce: util.Nonce(),
Resources: rules,
}

// Set nonce in the XDSConnection's WatchedResource
con.Lock()
if con.WatchedResources[model.ExtensionConfigType] == nil {
con.WatchedResources[res.TypeUrl] = &model.WatchedResource{TypeUrl: res.TypeUrl}
}
con.WatchedResources[res.TypeUrl].NonceSent = res.Nonce
con.Unlock()

// Send the DiscoveryResponse over the stream
err := con.Stream.Send(res)
if err != nil {
// Handle the error, e.g., log it or return it
// TODO: You can log the error or handle it as needed.
log.Println("Failed to send DiscoveryResponse:", err)
return err
}

return nil
}

func (c *ControlPlane) pushXds(namespace, app, kind string, rules []*anypb.Any) error {
// Retrieve the XDS connections for the specified namespace, app, and kind.
connections, exists := c.xdsServer.XDSConnectionManager.Get(namespace, app, kind)
if !exists || connections == nil {
// Log that there is no connection for this kind.
// Replace this with your actual logging mechanism.
log.Println("No XDS connection found for namespace:", namespace, "app:", app, "kind:", kind)
return errors.New("There is no connection for this kind")
}

for _, connection := range connections {
if connection == nil {
// Log a debug message for a nil connection.
// Replace this with your actual logging mechanism.
log.Println("Encountered a nil XDS connection")
continue
}
err := c.pushXdsToStream(connection, connection.WatchedResources[model.ExtensionConfigType], rules)
if err != nil {
// Log an error and return it if there is an error pushing XDS rules.
// Replace this with your actual logging mechanism.
log.Println("Failed to push XDS rules to connection:", err)
// TODO: You might want to consider handling partial failures here.
return err
}
}

// Return nil to indicate success.
return nil
}
56 changes: 50 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,24 +1,68 @@
module github.com/opensergo/opensergo-control-plane

go 1.14
go 1.18

require (
github.com/alibaba/sentinel-golang v1.0.3
github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc
github.com/envoyproxy/go-control-plane v0.10.3-0.20221109183938-2935a23e638f
github.com/envoyproxy/protoc-gen-validate v0.6.7
github.com/go-logr/logr v0.4.0
github.com/golang/protobuf v1.5.2
github.com/json-iterator/go v1.1.12 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/google/uuid v1.1.2
github.com/pkg/errors v0.9.1
github.com/rogpeppe/go-internal v1.8.0 // indirect
go.uber.org/atomic v1.7.0
google.golang.org/genproto v0.0.0-20220329172620-7be39ac1afc7
google.golang.org/grpc v1.51.0
google.golang.org/protobuf v1.28.1
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
k8s.io/apimachinery v0.21.4
k8s.io/client-go v0.21.4
sigs.k8s.io/controller-runtime v0.9.7
)

require (
cloud.google.com/go v0.65.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/evanphx/json-patch v4.11.0+incompatible // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/googleapis/gnostic v0.5.5 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/prometheus/client_golang v1.11.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.4.0 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/api v0.21.4 // indirect
k8s.io/apiextensions-apiserver v0.21.4 // indirect
k8s.io/component-base v0.21.4 // indirect
k8s.io/klog/v2 v2.8.0 // indirect
k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 // indirect
k8s.io/utils v0.0.0-20210802155522-efc7438f0176 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.1.2 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)
Loading