Skip to content

Commit

Permalink
Add more mutexes in dial chain element to fix race conditions (networ…
Browse files Browse the repository at this point in the history
…kservicemesh#1670)

* some minor change

Signed-off-by: NikitaSkrynnik <nikita.skrynnik@xored.com>

* add more locks

Signed-off-by: NikitaSkrynnik <nikita.skrynnik@xored.com>

---------

Signed-off-by: NikitaSkrynnik <nikita.skrynnik@xored.com>
  • Loading branch information
NikitaSkrynnik authored Sep 27, 2024
1 parent 052aded commit b66e1bf
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
25 changes: 18 additions & 7 deletions pkg/networkservice/common/dial/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ import (
)

type dialer struct {
ctx context.Context
cleanupContext context.Context
clientURL *url.URL
cleanupCancel context.CancelFunc
ctx context.Context
clientURL *url.URL
cleanupCancel context.CancelFunc
*grpc.ClientConn
dialOptions []grpc.DialOption
dialTimeout time.Duration
Expand Down Expand Up @@ -70,41 +69,53 @@ func (di *dialer) Dial(ctx context.Context, clientURL *url.URL) error {
}

// Dial
di.mu.Lock()
target := grpcutils.URLToTarget(di.clientURL)
di.mu.Unlock()

cc, err := grpc.DialContext(dialCtx, target, di.dialOptions...)
if err != nil {
if cc != nil {
_ = cc.Close()
}
return errors.Wrapf(err, "failed to dial %s", target)
}
di.mu.Lock()
di.ClientConn = cc

di.cleanupContext, di.cleanupCancel = context.WithCancel(di.ctx)
var cleanupContext context.Context
cleanupContext, di.cleanupCancel = context.WithCancel(di.ctx)
di.mu.Unlock()

go func(cleanupContext context.Context, cc *grpc.ClientConn) {
<-cleanupContext.Done()
_ = cc.Close()
}(di.cleanupContext, cc)
}(cleanupContext, cc)
return nil
}

func (di *dialer) Close() error {
if di != nil && di.cleanupCancel != nil {
di.mu.Lock()
di.cleanupCancel()
di.mu.Unlock()
runtime.Gosched()
}
return nil
}

func (di *dialer) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...grpc.CallOption) error {
di.mu.Lock()
defer di.mu.Unlock()
if di.ClientConn == nil {
return errors.New("no dialer.ClientConn found")
}
return di.ClientConn.Invoke(ctx, method, args, reply, opts...)
}

func (di *dialer) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
di.mu.Lock()
defer di.mu.Unlock()

if di.ClientConn == nil {
return nil, errors.New("no dialer.ClientConn found")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/networkservice/common/discoverforwarder/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2021-2022 Doc.ai and/or its affiliates.
//
// Copyright (c) 2023 Cisco and/or its affiliates.
// Copyright (c) 2023-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down

0 comments on commit b66e1bf

Please sign in to comment.