diff --git a/pkg/operator/client.go b/pkg/operator/client.go index 7d1ba6c..a465554 100644 --- a/pkg/operator/client.go +++ b/pkg/operator/client.go @@ -4,7 +4,7 @@ import ( "context" jsoniter "github.com/json-iterator/go" "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" "k8s.io/klog/v2" "rusi/internal/kube" "rusi/pkg/custom-resource/components" @@ -13,33 +13,34 @@ import ( "sync" ) -var conn *grpc.ClientConn +func newClient(ctx context.Context, address string) (operatorv1.RusiOperatorClient, error) { + //var retryPolicy = `{ + // "methodConfig": [{ + // "name": [{"service": "rusi.proto.operator.v1.RusiOperator"}], + // "waitForReady": true, + // "retryPolicy": { + // "MaxAttempts": 4, + // "InitialBackoff": ".01s", + // "MaxBackoff": ".01s", + // "BackoffMultiplier": 1.0, + // "RetryableStatusCodes": [ "UNAVAILABLE" ] + // } + // }]}` -func newClient(ctx context.Context, address string) (cl operatorv1.RusiOperatorClient, err error) { - var retryPolicy = `{ - "methodConfig": [{ - "name": [{"service": "rusi.proto.operator.v1.RusiOperator"}], - "waitForReady": true, - "retryPolicy": { - "MaxAttempts": 4, - "InitialBackoff": ".01s", - "MaxBackoff": ".01s", - "BackoffMultiplier": 1.0, - "RetryableStatusCodes": [ "UNAVAILABLE" ] - } - }]}` + //conn, err = grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithDefaultServiceConfig(retryPolicy)) - if conn == nil { - conn, err = grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithDefaultServiceConfig(retryPolicy)) - if err != nil { - return nil, err - } + conn, conErr := grpc.DialContext(ctx, address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if conErr != nil { + return nil, conErr } return operatorv1.NewRusiOperatorClient(conn), nil } func GetComponentsWatcher(ctx context.Context, address string, wg *sync.WaitGroup) func(context.Context) (<-chan components.Spec, error) { - client, _ := newClient(ctx, address) + client, err := newClient(ctx, address) + if err != nil { + klog.ErrorS(err, "error creating grpc operator client") + } return func(ctx context.Context) (<-chan components.Spec, error) { c := make(chan components.Spec) namespace := kube.GetCurrentNamespace() @@ -84,7 +85,10 @@ func GetComponentsWatcher(ctx context.Context, address string, wg *sync.WaitGrou } func GetConfigurationWatcher(ctx context.Context, address, configName string, wg *sync.WaitGroup) func(context.Context) (<-chan configuration.Spec, error) { - client, _ := newClient(ctx, address) + client, err := newClient(ctx, address) + if err != nil { + klog.ErrorS(err, "error creating grpc operator client") + } return func(ctx context.Context) (<-chan configuration.Spec, error) { c := make(chan configuration.Spec) namespace := kube.GetCurrentNamespace() @@ -125,6 +129,6 @@ func GetConfigurationWatcher(ctx context.Context, address, configName string, wg } } -func IsOperatorClientAlive() bool { - return conn != nil && conn.GetState() == connectivity.Ready -} +//func IsOperatorClientAlive() bool { +// return conn != nil && conn.GetState() == connectivity.Ready +//} diff --git a/pkg/runtime/components_manager.go b/pkg/runtime/components_manager.go index 27f2df2..49fd1d6 100644 --- a/pkg/runtime/components_manager.go +++ b/pkg/runtime/components_manager.go @@ -38,10 +38,7 @@ func NewComponentsManager(ctx context.Context, appId string, for _, opt := range opts { opt(&runtimeOpts) } - klog.V(4).InfoS("Components channel creation started") - compChan, err := componentsLoader(ctx) - klog.V(4).InfoS("Components channel created") if err != nil { klog.ErrorS(err, "error loading components") @@ -61,7 +58,6 @@ func NewComponentsManager(ctx context.Context, appId string, manager.pubSubRegistry.Register(runtimeOpts.pubsubs...) manager.middlewareRegistry.Register(runtimeOpts.pubsubMiddleware...) - klog.V(4).InfoS("Components added to registry") go manager.watchComponentsUpdates()