-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OSPP2023-23aaf0426 #65
Changes from 2 commits
44fac51
b7073e6
bb76a33
fbf3f8d
7f7f6e7
2c859b9
6c9a5dd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,21 +15,28 @@ | |
package opensergo | ||
|
||
import ( | ||
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" | ||
"github.com/opensergo/opensergo-control-plane/pkg/util" | ||
"google.golang.org/protobuf/types/known/anypb" | ||
"log" | ||
"os" | ||
"strings" | ||
"sync" | ||
|
||
"github.com/opensergo/opensergo-control-plane/pkg/controller" | ||
"github.com/opensergo/opensergo-control-plane/pkg/model" | ||
trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1" | ||
"github.com/opensergo/opensergo-control-plane/pkg/transport/grpc" | ||
transport "github.com/opensergo/opensergo-control-plane/pkg/transport/grpc" | ||
"github.com/pkg/errors" | ||
) | ||
|
||
type ControlPlane struct { | ||
operator *controller.KubernetesOperator | ||
server *transport.Server | ||
const delimiter = "/" | ||
|
||
type ControlPlane struct { | ||
operator *controller.KubernetesOperator | ||
server *transport.Server | ||
xdsServer *transport.DiscoveryServer | ||
protoDesc *trpb.ControlPlaneDesc | ||
|
||
mux sync.RWMutex | ||
|
@@ -38,12 +45,13 @@ type ControlPlane struct { | |
func NewControlPlane() (*ControlPlane, error) { | ||
cp := &ControlPlane{} | ||
|
||
operator, err := controller.NewKubernetesOperator(cp.sendMessage) | ||
operator, err := controller.NewKubernetesOperator(cp.sendMessage, cp.pushXds) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
cp.server = transport.NewServer(uint32(10246), []model.SubscribeRequestHandler{cp.handleSubscribeRequest}) | ||
cp.xdsServer = transport.NewDiscoveryServer(uint32(8002), []model.SubscribeXDsRequestHandler{cp.handleXDSSubscribeRequest}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个端口是你自定义的? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 是的,因为我看原项目也是固定端口是10246,所以我就写了8002 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 你看看10248 有没有用?没有,就用这个吧 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 好的,我看一下 |
||
cp.operator = operator | ||
|
||
hostname, herr := os.Hostname() | ||
|
@@ -59,13 +67,23 @@ func NewControlPlane() (*ControlPlane, error) { | |
func (c *ControlPlane) Start() error { | ||
// Run the Kubernetes operator | ||
err := c.operator.Run() | ||
|
||
if err != nil { | ||
return err | ||
} | ||
// Run the transport server | ||
err = c.server.Run() | ||
if err != nil { | ||
return err | ||
|
||
if model.GlobalBoolVariable { | ||
//Run the transport server | ||
err = c.server.Run() | ||
if err != nil { | ||
return err | ||
} | ||
} else { | ||
//Run the xDS Server | ||
err = c.xdsServer.Run() | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
|
@@ -106,16 +124,80 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream | |
}) | ||
} | ||
|
||
// handleXDSSubscribeRequest handles the XDS subscription request. | ||
func (c *ControlPlane) handleXDSSubscribeRequest(req *discovery.DiscoveryRequest, con *model.XDSConnection) error { | ||
// Check if the request is for ExtensionConfigType. | ||
if req.TypeUrl != model.ExtensionConfigType { | ||
return nil | ||
} | ||
|
||
// Determine whether to respond and calculate the delta. | ||
shouldRespond, delta := grpc.ShouldRespond(con, req) | ||
subscribed := delta.Subscribed | ||
unsubscribed := delta.Unsubscribed | ||
|
||
if !shouldRespond { | ||
// If there's no need to respond, return early. | ||
return nil | ||
} | ||
|
||
if len(subscribed) != 0 { | ||
var rules []*anypb.Any | ||
for resourcename := range subscribed { | ||
// Split the resource name into its components. | ||
request := strings.Split(resourcename, delimiter) | ||
|
||
// Register a watcher for the specified resource. | ||
crdWatcher, err := c.operator.RegisterWatcher(model.SubscribeTarget{ | ||
Namespace: request[4], | ||
AppName: request[3], | ||
Kind: request[0] + delimiter + request[1] + delimiter + request[2], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里 数字的可维护性太差了,建议弄一个函数包一下。 |
||
}) | ||
|
||
if err != nil { | ||
// Log the error and continue to the next resource. | ||
log.Printf("Error registering watcher for resource %s: %s\n", resourcename, err.Error()) | ||
continue | ||
} | ||
|
||
// Add the connection to the connection map. | ||
c.xdsServer.AddConnectionToMap(request[4], request[3], request[0]+"/"+request[1]+"/"+request[2], con) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里同理,建议,在最上面提前计算好 |
||
|
||
// Get the current rules for the resource. | ||
curRules, _ := crdWatcher.GetRules(model.NamespacedApp{ | ||
Namespace: request[4], | ||
App: request[3], | ||
}) | ||
|
||
if len(curRules) > 0 { | ||
// Append the current rules to the rules slice. | ||
rules = append(rules, curRules...) | ||
} | ||
} | ||
|
||
// Push XDS rules to the connection. | ||
err := c.pushXdsToStream(con, con.Watched(req.TypeUrl), rules) | ||
if err != nil { | ||
// Log the error if pushing XDS rules fails. | ||
log.Printf("Failed to push XDS rules to connection: %s\n", err.Error()) | ||
} | ||
} | ||
|
||
if len(unsubscribed) != 0 { | ||
// Handle unsubscribed resources. | ||
for resourcename := range unsubscribed { | ||
// Split the resource name into its components. | ||
request := strings.Split(resourcename, delimiter) | ||
|
||
// Remove the connection from the connection map. | ||
c.xdsServer.RemoveConnectionFromMap(model.NamespacedApp{request[0], request[1]}, request[2], con.Identifier) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error { | ||
// var labels []model.LabelKV | ||
// if request.Target.Labels != nil { | ||
// for _, label := range request.Target.Labels { | ||
// labels = append(labels, model.LabelKV{ | ||
// Key: label.Key, | ||
// Value: label.Value, | ||
// }) | ||
// } | ||
// } | ||
for _, kind := range request.Target.Kinds { | ||
crdWatcher, err := c.operator.RegisterWatcher(model.SubscribeTarget{ | ||
Namespace: request.Target.Namespace, | ||
|
@@ -135,7 +217,8 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent | |
} | ||
continue | ||
} | ||
_ = c.server.ConnectionManager().Add(request.Target.Namespace, request.Target.App, kind, transport.NewConnection(clientIdentifier, stream)) | ||
curConnection := transport.NewConnection(clientIdentifier, stream) | ||
_ = c.server.ConnectionManager().Add(request.Target.Namespace, request.Target.App, kind, curConnection, curConnection.Identifier()) | ||
// send if the watcher cache is not empty | ||
rules, version := crdWatcher.GetRules(model.NamespacedApp{ | ||
Namespace: request.Target.Namespace, | ||
|
@@ -160,3 +243,64 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent | |
} | ||
return nil | ||
} | ||
|
||
func (c *ControlPlane) pushXdsToStream(con *model.XDSConnection, w *model.WatchedResource, rules []*anypb.Any) error { | ||
res := &discovery.DiscoveryResponse{ | ||
TypeUrl: w.TypeUrl, | ||
VersionInfo: c.xdsServer.NextVersion(), | ||
|
||
// TODO: RECORD THE NONCE AND CHECK THE NONCE | ||
Nonce: util.Nonce(), | ||
Resources: rules, | ||
} | ||
|
||
// Set nonce in the XDSConnection's WatchedResource | ||
con.Lock() | ||
if con.WatchedResources[model.ExtensionConfigType] == nil { | ||
con.WatchedResources[res.TypeUrl] = &model.WatchedResource{TypeUrl: res.TypeUrl} | ||
} | ||
con.WatchedResources[res.TypeUrl].NonceSent = res.Nonce | ||
con.Unlock() | ||
|
||
// Send the DiscoveryResponse over the stream | ||
err := con.Stream.Send(res) | ||
if err != nil { | ||
// Handle the error, e.g., log it or return it | ||
// TODO: You can log the error or handle it as needed. | ||
log.Println("Failed to send DiscoveryResponse:", err) | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (c *ControlPlane) pushXds(namespace, app, kind string, rules []*anypb.Any) error { | ||
// Retrieve the XDS connections for the specified namespace, app, and kind. | ||
connections, exists := c.xdsServer.XDSConnectionManager.Get(namespace, app, kind) | ||
if !exists || connections == nil { | ||
// Log that there is no connection for this kind. | ||
// Replace this with your actual logging mechanism. | ||
log.Println("No XDS connection found for namespace:", namespace, "app:", app, "kind:", kind) | ||
return errors.New("There is no connection for this kind") | ||
} | ||
|
||
for _, connection := range connections { | ||
if connection == nil { | ||
// Log a debug message for a nil connection. | ||
// Replace this with your actual logging mechanism. | ||
log.Println("Encountered a nil XDS connection") | ||
continue | ||
} | ||
err := c.pushXdsToStream(connection, connection.WatchedResources[model.ExtensionConfigType], rules) | ||
if err != nil { | ||
// Log an error and return it if there is an error pushing XDS rules. | ||
// Replace this with your actual logging mechanism. | ||
log.Println("Failed to push XDS rules to connection:", err) | ||
// TODO: You might want to consider handling partial failures here. | ||
return err | ||
} | ||
} | ||
|
||
// Return nil to indicate success. | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,24 +1,68 @@ | ||
module github.com/opensergo/opensergo-control-plane | ||
|
||
go 1.14 | ||
go 1.18 | ||
|
||
require ( | ||
github.com/alibaba/sentinel-golang v1.0.3 | ||
github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc | ||
github.com/envoyproxy/go-control-plane v0.10.3-0.20221109183938-2935a23e638f | ||
github.com/envoyproxy/protoc-gen-validate v0.6.7 | ||
github.com/go-logr/logr v0.4.0 | ||
github.com/golang/protobuf v1.5.2 | ||
github.com/json-iterator/go v1.1.12 // indirect | ||
github.com/kr/pretty v0.3.0 // indirect | ||
github.com/google/uuid v1.1.2 | ||
github.com/pkg/errors v0.9.1 | ||
github.com/rogpeppe/go-internal v1.8.0 // indirect | ||
go.uber.org/atomic v1.7.0 | ||
google.golang.org/genproto v0.0.0-20220329172620-7be39ac1afc7 | ||
google.golang.org/grpc v1.51.0 | ||
google.golang.org/protobuf v1.28.1 | ||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect | ||
k8s.io/apimachinery v0.21.4 | ||
k8s.io/client-go v0.21.4 | ||
sigs.k8s.io/controller-runtime v0.9.7 | ||
) | ||
|
||
require ( | ||
cloud.google.com/go v0.65.0 // indirect | ||
github.com/beorn7/perks v1.0.1 // indirect | ||
github.com/cespare/xxhash/v2 v2.1.1 // indirect | ||
github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc // indirect | ||
github.com/davecgh/go-spew v1.1.1 // indirect | ||
github.com/evanphx/json-patch v4.11.0+incompatible // indirect | ||
github.com/fsnotify/fsnotify v1.4.9 // indirect | ||
github.com/gogo/protobuf v1.3.2 // indirect | ||
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect | ||
github.com/google/go-cmp v0.5.7 // indirect | ||
github.com/google/gofuzz v1.1.0 // indirect | ||
github.com/googleapis/gnostic v0.5.5 // indirect | ||
github.com/hashicorp/golang-lru v0.5.4 // indirect | ||
github.com/imdario/mergo v0.3.12 // indirect | ||
github.com/json-iterator/go v1.1.12 // indirect | ||
github.com/kr/pretty v0.3.0 // indirect | ||
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect | ||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect | ||
github.com/modern-go/reflect2 v1.0.2 // indirect | ||
github.com/prometheus/client_golang v1.11.0 // indirect | ||
github.com/prometheus/client_model v0.3.0 // indirect | ||
github.com/prometheus/common v0.26.0 // indirect | ||
github.com/prometheus/procfs v0.6.0 // indirect | ||
github.com/rogpeppe/go-internal v1.8.0 // indirect | ||
github.com/spf13/pflag v1.0.5 // indirect | ||
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect | ||
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect | ||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect | ||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect | ||
golang.org/x/text v0.4.0 // indirect | ||
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect | ||
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect | ||
google.golang.org/appengine v1.6.7 // indirect | ||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect | ||
gopkg.in/inf.v0 v0.9.1 // indirect | ||
gopkg.in/yaml.v2 v2.4.0 // indirect | ||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect | ||
k8s.io/api v0.21.4 // indirect | ||
k8s.io/apiextensions-apiserver v0.21.4 // indirect | ||
k8s.io/component-base v0.21.4 // indirect | ||
k8s.io/klog/v2 v2.8.0 // indirect | ||
k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 // indirect | ||
k8s.io/utils v0.0.0-20210802155522-efc7438f0176 // indirect | ||
sigs.k8s.io/structured-merge-diff/v4 v4.1.2 // indirect | ||
sigs.k8s.io/yaml v1.2.0 // indirect | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
所有的import 都需要保持以下格式
基础库
第三方库
该项目