Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NSP/IPAM reconnect improvements #430

Merged
merged 3 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/frontend/internal/env/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Config struct {
LogLevel string `default:"DEBUG" desc:"Log level" split_words:"true"`
NSPEntryTimeout time.Duration `default:"30s" desc:"Timeout of the entries" envconfig:"nsp_entry_timeout"`
GRPCKeepaliveTime time.Duration `default:"30s" desc:"gRPC keepalive timeout"`
GRPCMaxBackoff time.Duration `default:"5s" desc:"Upper bound on gRPC connection backoff delay" envconfig:"grpc_max_backoff"`
DelayConnectivity time.Duration `default:"1s" desc:"Delay between checks with connectivity"`
DelayNoConnectivity time.Duration `default:"3s" desc:"Delay between checks without connectivity"`
MaxSessionErrors int `default:"5" desc:"Max session errors when checking Bird until denounce"`
Expand Down
13 changes: 10 additions & 3 deletions cmd/frontend/internal/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,17 @@ func (fes *FrontEndService) Monitor(ctx context.Context, errCh chan<- error) {
var refreshCtx context.Context
refreshCtx, refreshCancel = context.WithCancel(ctx)
go func() {
// refresh
_ = retry.Do(func() error {
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
return announceFrontend(ctxTimeout, fes.targetRegistryClient)
// guarantee connection with server (if announce timed out FE could retry right away)
_ = retry.Do(func() error {
ctxTimeout, cancel := context.WithTimeout(refreshCtx, time.Second*30)
defer cancel()
return announceFrontend(ctxTimeout, fes.targetRegistryClient)
}, retry.WithContext(refreshCtx),
retry.WithDelay(time.Second))

return nil
}, retry.WithContext(refreshCtx),
retry.WithDelay(fes.nspEntryTimeout),
retry.WithErrorIngnored())
Expand Down
8 changes: 8 additions & 0 deletions cmd/frontend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/go-logr/logr"
"github.com/kelseyhightower/envconfig"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"

nspAPI "github.com/nordix/meridio/api/nsp/v1"
Expand Down Expand Up @@ -92,6 +93,10 @@ func main() {

// connect NSP
logger.Info("Dial NSP", "NSPService", config.NSPService)
grpcBackoffCfg := backoff.DefaultConfig
if grpcBackoffCfg.MaxDelay != config.GRPCMaxBackoff {
grpcBackoffCfg.MaxDelay = config.GRPCMaxBackoff
}
conn, err := grpc.DialContext(ctx,
config.NSPService,
grpc.WithTransportCredentials(
Expand All @@ -100,6 +105,9 @@ func main() {
grpc.WithDefaultCallOptions(
grpc.WaitForReady(true),
),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: grpcBackoffCfg,
}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: config.GRPCKeepaliveTime,
}),
Expand Down
1 change: 1 addition & 0 deletions cmd/ipam/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ type Config struct {
LogLevel string `default:"DEBUG" desc:"Log level" split_words:"true"`
GRPCKeepaliveTime time.Duration `default:"30s" desc:"gRPC keepalive timeout"`
GRPCProbeRPCTimeout time.Duration `default:"1s" desc:"RPC timeout of internal gRPC health probe" envconfig:"grpc_probe_rpc_timeout"`
GRPCMaxBackoff time.Duration `default:"5s" desc:"Upper bound on gRPC connection backoff delay" envconfig:"grpc_max_backoff"`
}
8 changes: 8 additions & 0 deletions cmd/ipam/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/nordix/meridio/pkg/security/credentials"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
grpcHealth "google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -99,6 +100,10 @@ func main() {
}

// connect NSP
grpcBackoffCfg := backoff.DefaultConfig
if grpcBackoffCfg.MaxDelay != config.GRPCMaxBackoff {
grpcBackoffCfg.MaxDelay = config.GRPCMaxBackoff
}
conn, err := grpc.DialContext(
ctx,
config.NSPService,
Expand All @@ -108,6 +113,9 @@ func main() {
grpc.WithDefaultCallOptions(
grpc.WaitForReady(true),
),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: grpcBackoffCfg,
}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: config.GRPCKeepaliveTime,
}),
Expand Down
1 change: 1 addition & 0 deletions cmd/proxy/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Config struct {
MTU int `default:"1500" desc:"Conduit MTU considered by local NSCs and NSE composing the network mesh" split_words:"true"`
GRPCKeepaliveTime time.Duration `default:"30s" desc:"gRPC keepalive timeout"`
GRPCProbeRPCTimeout time.Duration `default:"1s" desc:"RPC timeout of internal gRPC health probe" envconfig:"grpc_probe_rpc_timeout"`
GRPCMaxBackoff time.Duration `default:"5s" desc:"Upper bound on gRPC connection backoff delay" envconfig:"grpc_max_backoff"`
}

// IsValid checks if the configuration is valid
Expand Down
11 changes: 11 additions & 0 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/nordix/meridio/pkg/security/credentials"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"
)

Expand Down Expand Up @@ -125,6 +126,10 @@ func main() {

// connect IPAM the proxy relies on to assign IPs both locally and remote via nsc and nse
logger.Info("Dial IPAM", "service", config.IPAMService)
grpcBackoffCfg := backoff.DefaultConfig
if grpcBackoffCfg.MaxDelay != config.GRPCMaxBackoff {
grpcBackoffCfg.MaxDelay = config.GRPCMaxBackoff
}
conn, err := grpc.DialContext(signalCtx,
config.IPAMService,
grpc.WithTransportCredentials(
Expand All @@ -133,6 +138,9 @@ func main() {
grpc.WithDefaultCallOptions(
grpc.WaitForReady(true),
),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: grpcBackoffCfg,
}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: config.GRPCKeepaliveTime,
}),
Expand Down Expand Up @@ -217,6 +225,9 @@ func main() {
grpc.WithDefaultCallOptions(
grpc.WaitForReady(true),
),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: grpcBackoffCfg,
}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: config.GRPCKeepaliveTime,
}),
Expand Down
1 change: 1 addition & 0 deletions cmd/stateless-lb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Config struct {
IdentifierOffsetStart int `default:"5000" desc:"Each Stream will get a unique identifier range starting from that value" split_words:"true"`
GRPCKeepaliveTime time.Duration `default:"30s" desc:"gRPC keepalive timeout"`
GRPCProbeRPCTimeout time.Duration `default:"1s" desc:"RPC timeout of internal gRPC health probe" envconfig:"grpc_probe_rpc_timeout"`
GRPCMaxBackoff time.Duration `default:"5s" desc:"Upper bound on gRPC connection backoff delay" envconfig:"grpc_max_backoff"`
}

// IsValid checks if the configuration is valid
Expand Down
8 changes: 8 additions & 0 deletions cmd/stateless-lb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/vishvananda/netlink"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"
)

Expand Down Expand Up @@ -125,6 +126,10 @@ func main() {
}

logger.Info("Dial NSP", "NSPService", config.NSPService)
grpcBackoffCfg := backoff.DefaultConfig
if grpcBackoffCfg.MaxDelay != config.GRPCMaxBackoff {
grpcBackoffCfg.MaxDelay = config.GRPCMaxBackoff
}
conn, err := grpc.DialContext(ctx,
config.NSPService,
grpc.WithTransportCredentials(
Expand All @@ -133,6 +138,9 @@ func main() {
grpc.WithDefaultCallOptions(
grpc.WaitForReady(true),
),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: grpcBackoffCfg,
}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: config.GRPCKeepaliveTime,
}),
Expand Down
1 change: 1 addition & 0 deletions cmd/tapa/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Config struct {
MaxTokenLifetime time.Duration `default:"24h" desc:"maximum lifetime of tokens" split_words:"true"`
LogLevel string `default:"DEBUG" desc:"Log level" split_words:"true"`
NSPEntryTimeout time.Duration `default:"30s" desc:"Timeout of the entries" envconfig:"nsp_entry_timeout"`
GRPCMaxBackoff time.Duration `default:"5s" desc:"Upper bound on gRPC connection backoff delay" envconfig:"grpc_max_backoff"`
}

// IsValid checks if the configuration is valid
Expand Down
13 changes: 12 additions & 1 deletion cmd/tapa/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,18 @@ func main() {
s := grpc.NewServer()
defer s.Stop()

ambassador, err := tap.New(config.Name, config.Namespace, config.Node, networkServiceClient, monitorClient, config.NSPServiceName, config.NSPServicePort, config.NSPEntryTimeout, netUtils)
ambassador, err := tap.New(
config.Name,
config.Namespace,
config.Node,
networkServiceClient,
monitorClient,
config.NSPServiceName,
config.NSPServicePort,
config.NSPEntryTimeout,
config.GRPCMaxBackoff,
netUtils,
)
if err != nil {
log.Fatal(logger, "creating new tap ambassador", "error", err)
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/ambassador/tap/conduit/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,10 @@ func (sr *streamRetry) Open(nspStream *nspAPI.Stream) {
go func() {
// retry to refresh
_ = retry.Do(func() error {
openCtx, cancel := context.WithTimeout(ctx, sr.Timeout)
defer cancel()

// retry to open
_ = retry.Do(func() error {
openCtx, cancel := context.WithTimeout(ctx, sr.Timeout)
defer cancel()
err := sr.Stream.Open(openCtx, nspStream)
if err != nil {
log.Logger.Error(err, "opening stream", "stream", sr.Stream.GetStream())
Expand All @@ -263,7 +262,7 @@ func (sr *streamRetry) Open(nspStream *nspAPI.Stream) {
}
sr.setStatus(ambassadorAPI.StreamStatus_OPEN)
return nil
}, retry.WithContext(openCtx),
}, retry.WithContext(ctx),
retry.WithDelay(sr.RetryDelay))

return nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/ambassador/tap/tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Tap struct {
NSPServiceName string
NSPServicePort int
NSPEntryTimeout time.Duration
GRPCMaxBackoff time.Duration
NetUtils networking.Utils
StreamRegistry types.Registry
currentTrench types.Trench
Expand All @@ -59,6 +60,7 @@ func New(targetName string,
nspServiceName string,
nspServicePort int,
nspEntryTimeout time.Duration,
grpcMaxBackoff time.Duration,
netUtils networking.Utils) (*Tap, error) {
tap := &Tap{
TargetName: targetName,
Expand All @@ -69,6 +71,7 @@ func New(targetName string,
NSPServiceName: nspServiceName,
NSPServicePort: nspServicePort,
NSPEntryTimeout: nspEntryTimeout,
GRPCMaxBackoff: grpcMaxBackoff,
NetUtils: netUtils,
logger: log.Logger.WithValues("class", "Tap"),
}
Expand Down Expand Up @@ -175,6 +178,7 @@ func (tap *Tap) setTrench(t *ambassadorAPI.Trench) (types.Trench, error) {
tap.NSPServiceName,
tap.NSPServicePort,
tap.NSPEntryTimeout,
tap.GRPCMaxBackoff,
tap.NetUtils)
if err != nil {
return nil, err
Expand Down
20 changes: 18 additions & 2 deletions pkg/ambassador/tap/trench/trench.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/nordix/meridio/pkg/nsp"
"github.com/nordix/meridio/pkg/security/credentials"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"
)

Expand Down Expand Up @@ -73,6 +74,7 @@ func New(trench *ambassadorAPI.Trench,
nspServiceName string,
nspServicePort int,
nspEntryTimeout time.Duration,
grpcMaxBackoff time.Duration,
netUtils networking.Utils) (*Trench, error) {

t := &Trench{
Expand All @@ -88,7 +90,7 @@ func New(trench *ambassadorAPI.Trench,
}

var err error
t.nspConn, err = t.connectNSPService(context.TODO(), nspServiceName, nspServicePort)
t.nspConn, err = t.connectNSPService(context.TODO(), nspServiceName, nspServicePort, grpcMaxBackoff)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -210,16 +212,30 @@ func (t *Trench) GetTrench() *ambassadorAPI.Trench {
return t.Trench
}

func (t *Trench) connectNSPService(ctx context.Context, nspServiceName string, nspServicePort int) (*grpc.ClientConn, error) {
func (t *Trench) connectNSPService(ctx context.Context,
nspServiceName string,
nspServicePort int,
grpcMaxBackoff time.Duration) (*grpc.ClientConn, error) {
service := nsp.GetService(nspServiceName, t.Trench.GetName(), t.Namespace, nspServicePort)
t.logger.Info("Connect to NSP Service", "service", service)
// Allow changing max backoff delay from gRPC default 120s to limit reconnect interval.
// Thus, allow faster reconnect if NSP has been unavailable. Otherwise gRPC might
// wait up to 2 minutes to attempt reconnect due to the default backoff algorithm.
// (refer to: https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md)
grpcBackoffCfg := backoff.DefaultConfig
if grpcBackoffCfg.MaxDelay != grpcMaxBackoff {
grpcBackoffCfg.MaxDelay = grpcMaxBackoff
}
return grpc.Dial(service,
grpc.WithTransportCredentials(
credentials.GetClient(ctx),
),
grpc.WithDefaultCallOptions(
grpc.WaitForReady(true),
),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: grpcBackoffCfg,
}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
// if the NSP service is re-created, the TAPA will take around 15 minutes to re-connect to the NSP service without this setting.
Time: grpcKeepaliveTime,
Expand Down