Skip to content

Commit

Permalink
enable grpc broker work with openshift route and customize certificate.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <lcao@redhat.com>
  • Loading branch information
morvencao committed Jan 13, 2025
1 parent 9368aa6 commit 9464d47
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 36 deletions.
6 changes: 6 additions & 0 deletions cmd/maestro/server/auth_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ func newAuthUnaryInterceptor(authNType string, authorizer grpcauthorizer.GRPCAut
klog.Errorf("unable to get user and groups from certificate: %v", err)
return nil, err
}
case "mock":
user = "mock"
groups = []string{"mock-group"}
default:
return nil, fmt.Errorf("unsupported authentication type %s", authNType)
}
Expand Down Expand Up @@ -165,6 +168,9 @@ func newAuthStreamInterceptor(authNType string, authorizer grpcauthorizer.GRPCAu
klog.Errorf("unable to get user and groups from certificate: %v", err)
return err
}
case "mock":
user = "mock"
groups = []string{"mock-group"}
default:
return fmt.Errorf("unsupported authentication Type %s", authNType)
}
Expand Down
57 changes: 52 additions & 5 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package server

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"os"
"strconv"
"sync"
"time"
Expand All @@ -14,6 +17,7 @@ import (
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
Expand Down Expand Up @@ -71,11 +75,55 @@ func NewGRPCBroker(eventBroadcaster *event.EventBroadcaster) EventServer {
grpcServerOptions = append(grpcServerOptions, grpc.ConnectionTimeout(config.ConnectionTimeout))
grpcServerOptions = append(grpcServerOptions, grpc.WriteBufferSize(config.WriteBufferSize))
grpcServerOptions = append(grpcServerOptions, grpc.ReadBufferSize(config.ReadBufferSize))
grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: config.ClientMinPingInterval,
PermitWithoutStream: config.PermitPingWithoutStream,
}))
grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionAge: config.MaxConnectionAge,
Time: config.ServerPingInterval,
Timeout: config.ServerPingTimeout,
}))

klog.Infof("Serving gRPC broker without TLS at %s", config.BrokerBindPort)
if !config.DisableTLS {
// Check tls cert and key path path
if config.BrokerTLSCertFile == "" || config.BrokerTLSKeyFile == "" {
check(
fmt.Errorf("unspecified required --grpc-broker-tls-cert-file, --grpc-broker-tls-key-file"),
"Can't start gRPC broker",
)
}
// Serve with TLS
serverCerts, err := tls.LoadX509KeyPair(config.BrokerTLSCertFile, config.BrokerTLSKeyFile)
if err != nil {
check(fmt.Errorf("failed to load broker certificates: %v", err), "Can't start gRPC broker")
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{serverCerts},
MinVersion: tls.VersionTLS13,
MaxVersion: tls.VersionTLS13,
}
if config.BrokerClientCAFile != "" {
certPool, err := x509.SystemCertPool()
if err != nil {
check(fmt.Errorf("failed to load system cert pool: %v", err), "Can't start gRPC broker")
}
caPEM, err := os.ReadFile(config.BrokerClientCAFile)
if err != nil {
check(fmt.Errorf("failed to read broker client CA file: %v", err), "Can't start gRPC broker")
}
if ok := certPool.AppendCertsFromPEM(caPEM); !ok {
check(fmt.Errorf("failed to append broker client CA to cert pool"), "Can't start gRPC broker")
}
tlsConfig.ClientCAs = certPool
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
}
grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig)))
klog.Infof("Serving gRPC broker with TLS at %s", config.ServerBindPort)
} else {
klog.Infof("Serving gRPC broker without TLS at %s", config.ServerBindPort)
}

sessionFactory := env().Database.SessionFactory
return &GRPCBroker{
grpcServer: grpc.NewServer(grpcServerOptions...),
Expand Down Expand Up @@ -160,8 +208,7 @@ func (bkr *GRPCBroker) register(clusterName string, handler resourceHandler) (st
errChan: errChan,
}

klog.V(4).Infof("register a subscriber %s (cluster name = %s)", id, clusterName)

klog.V(4).Infof("registered a subscriber %s (cluster name = %s)", id, clusterName)
return id, errChan
}

Expand All @@ -170,9 +217,9 @@ func (bkr *GRPCBroker) unregister(id string) {
bkr.mu.Lock()
defer bkr.mu.Unlock()

klog.V(10).Infof("unregister subscriber %s", id)
close(bkr.subscribers[id].errChan)
delete(bkr.subscribers, id)
klog.V(4).Infof("unregistered subscriber %s", id)
}

// Subscribe in stub implementation for maestro agent subscribe resource spec from maestro server.
Expand Down Expand Up @@ -215,7 +262,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
case err := <-errChan:
// When reaching this point, an unrecoverable error occurred while sending the event,
// such as the connection being closed. Unregister the subscriber to trigger agent reconnection.
klog.Errorf("unregister subscriber %s, error= %v", subscriberID, err)
klog.Infof("unregistering subscriber %s because unrecoverable error= %v", subscriberID, err)
bkr.unregister(subscriberID)
return err
case <-subServer.Context().Done():
Expand Down
9 changes: 7 additions & 2 deletions cmd/maestro/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,14 @@ func NewGRPCServer(resourceService services.ResourceService, eventBroadcaster *e
grpcServerOptions = append(grpcServerOptions, grpc.ConnectionTimeout(config.ConnectionTimeout))
grpcServerOptions = append(grpcServerOptions, grpc.WriteBufferSize(config.WriteBufferSize))
grpcServerOptions = append(grpcServerOptions, grpc.ReadBufferSize(config.ReadBufferSize))
grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: config.ClientMinPingInterval,
PermitWithoutStream: config.PermitPingWithoutStream,
}))
grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionAge: config.MaxConnectionAge,
Time: config.ServerPingInterval,
Timeout: config.ServerPingTimeout,
}))

if !config.DisableTLS {
Expand Down Expand Up @@ -266,11 +272,10 @@ func (svr *GRPCServer) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv

select {
case err := <-errChan:
klog.Errorf("unregister client %s, error= %v", clientID, err)
klog.Infof("unregistering client %s due to error= %v", clientID, err)
svr.eventBroadcaster.Unregister(clientID)
return err
case <-subServer.Context().Done():
klog.V(10).Infof("unregister client %s", clientID)
svr.eventBroadcaster.Unregister(clientID)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ require (
k8s.io/klog/v2 v2.130.1
open-cluster-management.io/api v0.15.1-0.20241210025410-0ba6809d0ae2
open-cluster-management.io/ocm v0.15.1-0.20250108154653-2397c4e91119
open-cluster-management.io/sdk-go v0.15.1-0.20241224013925-71378a533f22
open-cluster-management.io/sdk-go v0.15.1-0.20250106052515-7c50bbf220a9
sigs.k8s.io/yaml v1.4.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -893,8 +893,8 @@ open-cluster-management.io/api v0.15.1-0.20241210025410-0ba6809d0ae2 h1:zkp3VJnv
open-cluster-management.io/api v0.15.1-0.20241210025410-0ba6809d0ae2/go.mod h1:9erZEWEn4bEqh0nIX2wA7f/s3KCuFycQdBrPrRzi0QM=
open-cluster-management.io/ocm v0.15.1-0.20250108154653-2397c4e91119 h1:Ftx7vxDumTB9d4+ZdcqYwQavTOVzgF5h6vAXJ/gh0IE=
open-cluster-management.io/ocm v0.15.1-0.20250108154653-2397c4e91119/go.mod h1:T9pfSm3EYHnysEP9JYfCojV2pI44IYMz3zaZNylulz8=
open-cluster-management.io/sdk-go v0.15.1-0.20241224013925-71378a533f22 h1:w15NHc6cBfYxKHtF6zGLeQ1iTUqtN53sdONi9XXy5Xc=
open-cluster-management.io/sdk-go v0.15.1-0.20241224013925-71378a533f22/go.mod h1:fi5WBsbC5K3txKb8eRLuP0Sim/Oqz/PHX18skAEyjiA=
open-cluster-management.io/sdk-go v0.15.1-0.20250106052515-7c50bbf220a9 h1:yxkdser0gmaUryPRA33Fb3I+DDzXlPsDUNFIK5DVbPI=
open-cluster-management.io/sdk-go v0.15.1-0.20250106052515-7c50bbf220a9/go.mod h1:fi5WBsbC5K3txKb8eRLuP0Sim/Oqz/PHX18skAEyjiA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 h1:2770sDpzrjjsAtVhSeUFseziht227YAWYHLGNM8QPwY=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw=
sigs.k8s.io/controller-runtime v0.19.3 h1:XO2GvC9OPftRst6xWCpTgBZO04S2cbp0Qqkj8bX1sPw=
Expand Down
3 changes: 3 additions & 0 deletions pkg/client/cloudevents/source_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ func (s *SourceClientImpl) ReconnectedChan() <-chan struct{} {
// with the agent's status calculation. The resource status is converted to
// manifestwork status based on resource type before calculating the hash.
func ResourceStatusHashGetter(res *api.Resource) (string, error) {
if len(res.Status) == 0 {
return fmt.Sprintf("%x", sha256.Sum256([]byte(""))), nil
}
evt, err := api.JSONMAPToCloudEvent(res.Status)
if err != nil {
return "", fmt.Errorf("failed to convert resource status to cloud event, %v", err)
Expand Down
46 changes: 30 additions & 16 deletions pkg/config/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,29 @@ import (
)

type GRPCServerConfig struct {
EnableGRPCServer bool `json:"enable_grpc_server"`
DisableTLS bool `json:"disable_grpc_tls"`
TLSCertFile string `json:"grpc_tls_cert_file"`
TLSKeyFile string `json:"grpc_tls_key_file"`
GRPCAuthNType string `json:"grpc_authn_type"`
GRPCAuthorizerConfig string `json:"grpc_authorizer_config"`
ClientCAFile string `json:"grpc_client_ca_file"`
ServerBindPort string `json:"server_bind_port"`
BrokerBindPort string `json:"broker_bind_port"`
MaxConcurrentStreams uint32 `json:"max_concurrent_steams"`
MaxReceiveMessageSize int `json:"max_receive_message_size"`
MaxSendMessageSize int `json:"max_send_message_size"`
ConnectionTimeout time.Duration `json:"connection_timeout"`
WriteBufferSize int `json:"write_buffer_size"`
ReadBufferSize int `json:"read_buffer_size"`
MaxConnectionAge time.Duration `json:"max_connection_age"`
EnableGRPCServer bool `json:"enable_grpc_server"`
DisableTLS bool `json:"disable_grpc_tls"`
TLSCertFile string `json:"grpc_tls_cert_file"`
TLSKeyFile string `json:"grpc_tls_key_file"`
BrokerTLSCertFile string `json:"grpc_broker_tls_cert_file"`
BrokerTLSKeyFile string `json:"grpc_broker_tls_key_file"`
GRPCAuthNType string `json:"grpc_authn_type"`
GRPCAuthorizerConfig string `json:"grpc_authorizer_config"`
ClientCAFile string `json:"grpc_client_ca_file"`
BrokerClientCAFile string `json:"grpc_broker_client_ca_file"`
ServerBindPort string `json:"server_bind_port"`
BrokerBindPort string `json:"broker_bind_port"`
MaxConcurrentStreams uint32 `json:"max_concurrent_steams"`
MaxReceiveMessageSize int `json:"max_receive_message_size"`
MaxSendMessageSize int `json:"max_send_message_size"`
ConnectionTimeout time.Duration `json:"connection_timeout"`
WriteBufferSize int `json:"write_buffer_size"`
ReadBufferSize int `json:"read_buffer_size"`
MaxConnectionAge time.Duration `json:"max_connection_age"`
ClientMinPingInterval time.Duration `json:"client_min_ping_interval"`
ServerPingInterval time.Duration `json:"server_ping_interval"`
ServerPingTimeout time.Duration `json:"server_ping_timeout"`
PermitPingWithoutStream bool `json:"permit_ping_without_stream"`
}

func NewGRPCServerConfig() *GRPCServerConfig {
Expand All @@ -39,12 +46,19 @@ func (s *GRPCServerConfig) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.MaxSendMessageSize, "grpc-max-send-message-size", math.MaxInt32, "gPRC max send message size")
fs.DurationVar(&s.ConnectionTimeout, "grpc-connection-timeout", 120*time.Second, "gPRC connection timeout")
fs.DurationVar(&s.MaxConnectionAge, "grpc-max-connection-age", time.Duration(math.MaxInt64), "A duration for the maximum amount of time connection may exist before closing")
fs.DurationVar(&s.ClientMinPingInterval, "grpc-client-min-ping-interval", 5*time.Second, "Server will terminate the connection if the client pings more than once within this duration")
fs.DurationVar(&s.ServerPingInterval, "grpc-server-ping-interval", 30*time.Second, "Duration after which the server pings the client if no activity is detected")
fs.DurationVar(&s.ServerPingTimeout, "grpc-server-ping-timeout", 10*time.Second, "Duration the client waits for a response after sending a keepalive ping")
fs.BoolVar(&s.PermitPingWithoutStream, "permit-ping-without-stream", false, "Allow keepalive pings even when there are no active streams")
fs.IntVar(&s.WriteBufferSize, "grpc-write-buffer-size", 32*1024, "gPRC write buffer size")
fs.IntVar(&s.ReadBufferSize, "grpc-read-buffer-size", 32*1024, "gPRC read buffer size")
fs.BoolVar(&s.DisableTLS, "disable-grpc-tls", false, "Disable TLS for gRPC server, default is false")
fs.StringVar(&s.TLSCertFile, "grpc-tls-cert-file", "", "The path to the tls.crt file")
fs.StringVar(&s.TLSKeyFile, "grpc-tls-key-file", "", "The path to the tls.key file")
fs.StringVar(&s.BrokerTLSCertFile, "grpc-broker-tls-cert-file", "", "The path to the broker tls.crt file")
fs.StringVar(&s.BrokerTLSKeyFile, "grpc-broker-tls-key-file", "", "The path to the broker tls.key file")
fs.StringVar(&s.GRPCAuthNType, "grpc-authn-type", "mock", "Specify the gRPC authentication type (e.g., mock, mtls or token)")
fs.StringVar(&s.GRPCAuthorizerConfig, "grpc-authorizer-config", "", "Path to the gRPC authorizer configuration file")
fs.StringVar(&s.ClientCAFile, "grpc-client-ca-file", "", "The path to the client ca file, must specify if using mtls authentication type")
fs.StringVar(&s.BrokerClientCAFile, "grpc-broker-client-ca-file", "", "The path to the broker client ca file")
}
4 changes: 2 additions & 2 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ func (h *EventBroadcaster) Register(source string, handler resourceHandler) (str
errChan: errChan,
}

klog.V(4).Infof("register a broadcaster client %s (source=%s)", id, source)

klog.V(4).Infof("registered a broadcaster client %s (source=%s)", id, source)
return id, errChan
}

Expand All @@ -63,6 +62,7 @@ func (h *EventBroadcaster) Unregister(id string) {

close(h.clients[id].errChan)
delete(h.clients, id)
klog.V(4).Infof("unregistered broadcaster client %s", id)
}

// Broadcast broadcasts a resource status change event to all registered clients.
Expand Down
19 changes: 17 additions & 2 deletions templates/route-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,20 @@ objects:
kind: Service
name: maestro-grpc
tls:
termination: reencrypt
insecureEdgeTerminationPolicy: Redirect
termination: passthrough
insecureEdgeTerminationPolicy: None

- apiVersion: route.openshift.io/v1
kind: Route
metadata:
name: maestro-grpc-broker
labels:
app: maestro-grpc-broker
spec:
host: maestro-grpc-broker.${EXTERNAL_APPS_DOMAIN}
to:
kind: Service
name: maestro-grpc-broker
tls:
termination: passthrough
insecureEdgeTerminationPolicy: None
17 changes: 15 additions & 2 deletions templates/service-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,12 @@ objects:
- name: tls
secret:
secretName: maestro-tls
- name: grpc-server-tls
secret:
secretName: maestro-grpc-server-tls
- name: grpc-broker-tls
secret:
secretName: maestro-grpc-broker-tls
- name: service
secret:
secretName: maestro
Expand Down Expand Up @@ -305,6 +311,10 @@ objects:
volumeMounts:
- name: tls
mountPath: /secrets/tls
- name: grpc-server-tls
mountPath: /secrets/grpc-server-tls
- name: grpc-broker-tls
mountPath: /secrets/grpc-broker-tls
- name: service
mountPath: /secrets/service
- name: rds
Expand Down Expand Up @@ -342,8 +352,10 @@ objects:
- --https-key-file=/secrets/tls/tls.key
- --enable-grpc-server=${ENABLE_GRPC_SERVER}
- --disable-grpc-tls=${DISABLE_GRPC_TLS}
- --grpc-tls-cert-file=/secrets/tls/tls.crt
- --grpc-tls-key-file=/secrets/tls/tls.key
- --grpc-tls-cert-file=/secrets/grpc-server-tls/tls.crt
- --grpc-tls-key-file=/secrets/grpc-server-tls/tls.key
- --grpc-broker-tls-cert-file=/secrets/grpc-broker-tls/tls.crt
- --grpc-broker-tls-key-file=/secrets/grpc-broker-tls/tls.key
- --acl-file=/configs/authentication/acl.yml
- --jwk-cert-file=/configs/authentication/jwks.json
- --jwk-cert-url=${JWKS_URL}
Expand Down Expand Up @@ -442,6 +454,7 @@ objects:
port: grpc
annotations:
description: Exposes and load balances the maestro pods grpc endpoint
service.alpha.openshift.io/serving-cert-secret-name: maestro-grpc-server-tls
spec:
selector:
app: maestro
Expand Down
3 changes: 3 additions & 0 deletions test/e2e/pkg/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ var _ = BeforeSuite(func() {
// initialize the grpc source options
grpcOptions = grpcoptions.NewGRPCOptions()
grpcOptions.URL = grpcServerAddress
grpcOptions.KeepAliveOptions.Enable = true
grpcOptions.KeepAliveOptions.Time = 6 * time.Second
grpcOptions.KeepAliveOptions.Timeout = 1 * time.Second
sourceID = "sourceclient-test" + rand.String(5)
grpcCertSrt, err := serverTestOpts.kubeClientSet.CoreV1().Secrets(serverTestOpts.serverNamespace).Get(ctx, "maestro-grpc-cert", metav1.GetOptions{})
if !errors.IsNotFound(err) {
Expand Down
Loading

0 comments on commit 9464d47

Please sign in to comment.