Skip to content

Commit

Permalink
apply self code review
Browse files Browse the repository at this point in the history
Signed-off-by: Denis Tingaikin <denis.tingajkin@xored.com>
  • Loading branch information
denis-tingaikin committed Jan 25, 2022
1 parent 2787d18 commit 2da42b0
Show file tree
Hide file tree
Showing 21 changed files with 144 additions and 129 deletions.
27 changes: 23 additions & 4 deletions pkg/networkservice/chains/nsmgr/heal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/registry/chains/client"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"
)

Expand Down Expand Up @@ -366,7 +368,7 @@ func testNSMGRCloseHeal(t *testing.T, withNSEExpiration bool) {
SetRegistryProxySupplier(nil)

if withNSEExpiration {
builder = builder.SetRegistryExpiryDuration(time.Second)
builder = builder.SetRegistryExpiryDuration(time.Second / 2)
}

domain := builder.Build()
Expand Down Expand Up @@ -408,17 +410,34 @@ func testNSMGRCloseHeal(t *testing.T, withNSEExpiration bool) {
if withNSEExpiration {
// 3.1 Wait for the endpoint expiration
time.Sleep(time.Second)
c := client.NewNetworkServiceEndpointRegistryClient(ctx, domain.Nodes[0].NSMgr.URL, client.WithDialOptions(sandbox.DialOptions(sandbox.WithTokenGenerator(sandbox.GenerateTestToken))...))

stream, err := c.Find(ctx, &registry.NetworkServiceEndpointQuery{
NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{
Name: "final-endpoint",
},
})

require.NoError(t, err)

require.Len(t, registry.ReadNetworkServiceEndpointList(stream), 0)
}

// 4. Close connection
_, err = nsc.Close(nscCtx, conn.Clone())
require.NoError(t, err)
_, _ = nsc.Close(nscCtx, conn.Clone())

nscCtxCancel()
require.NoError(t, ctx.Err())

for _, fwd := range domain.Nodes[0].Forwarders {
fwd.Cancel()
}

require.Eventually(t, func() bool {
logrus.Error(goleak.Find())
return goleak.Find(ignoreCurrent) == nil
}, timeout, tick)

require.NoError(t, ctx.Err())
}

func checkSecondRequestsReceived(requestsDone func() int) func() bool {
Expand Down
15 changes: 6 additions & 9 deletions pkg/networkservice/chains/nsmgr/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import (
"github.com/networkservicemesh/sdk/pkg/registry/common/clientconn"
registryclientinfo "github.com/networkservicemesh/sdk/pkg/registry/common/clientinfo"
"github.com/networkservicemesh/sdk/pkg/registry/common/clienturl"
"github.com/networkservicemesh/sdk/pkg/registry/common/connect2"
registryconnect "github.com/networkservicemesh/sdk/pkg/registry/common/connect"
"github.com/networkservicemesh/sdk/pkg/registry/common/dial"
"github.com/networkservicemesh/sdk/pkg/registry/common/expire"
"github.com/networkservicemesh/sdk/pkg/registry/common/localbypass"
Expand Down Expand Up @@ -163,16 +163,15 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
var nsRegistry = memory.NewNetworkServiceRegistryServer()
if opts.regURL != nil {
// Use remote registry
nsRegistry = connect2.NewNetworkServiceRegistryServer(
nsRegistry = registryconnect.NewNetworkServiceRegistryServer(
chain.NewNetworkServiceRegistryClient(
clienturl.NewNetworkServiceRegistryClient(opts.regURL),
begin.NewNetworkServiceRegistryClient(),
clientconn.NewNetworkServiceRegistryClient(),
dial.NewNetworkServiceRegistryClient(ctx,
dial.WithDialOptions(opts.dialOptions...),
dial.WithDialTimeout(time.Millisecond*100),
),
connect2.NewNetworkServiceRegistryClient(),
registryconnect.NewNetworkServiceRegistryClient(),
),
)
}
Expand Down Expand Up @@ -206,17 +205,15 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
Condition: func(c context.Context, nse *registryapi.NetworkServiceEndpoint) bool {
return opts.regURL != nil
},
Action: connect2.NewNetworkServiceEndpointRegistryServer(
Action: registryconnect.NewNetworkServiceEndpointRegistryServer(
chain.NewNetworkServiceEndpointRegistryClient(
clienturl.NewNetworkServiceEndpointRegistryClient(opts.regURL),
// retry.NewNetworkServiceEndpointRegistryClient(),
begin.NewNetworkServiceEndpointRegistryClient(),
clienturl.NewNetworkServiceEndpointRegistryClient(opts.regURL),
clientconn.NewNetworkServiceEndpointRegistryClient(),
dial.NewNetworkServiceEndpointRegistryClient(ctx,
dial.WithDialOptions(opts.dialOptions...),
dial.WithDialTimeout(time.Millisecond*100),
),
connect2.NewNetworkServiceEndpointRegistryClient(),
registryconnect.NewNetworkServiceEndpointRegistryClient(),
),
),
}),
Expand Down
39 changes: 18 additions & 21 deletions pkg/networkservice/chains/nsmgrproxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
"github.com/networkservicemesh/sdk/pkg/registry/common/begin"
"github.com/networkservicemesh/sdk/pkg/registry/common/clientconn"
"github.com/networkservicemesh/sdk/pkg/registry/common/clienturl"
"github.com/networkservicemesh/sdk/pkg/registry/common/connect2"
registryconnect "github.com/networkservicemesh/sdk/pkg/registry/common/connect"
"github.com/networkservicemesh/sdk/pkg/registry/common/dial"
registryswapip "github.com/networkservicemesh/sdk/pkg/registry/common/swapip"
"github.com/networkservicemesh/sdk/pkg/registry/core/chain"
Expand Down Expand Up @@ -159,27 +159,25 @@ func NewServer(ctx context.Context, regURL, proxyURL *url.URL, tokenGenerator to

var interdomainBypassNSEServer registryapi.NetworkServiceEndpointRegistryServer

nseClient :=
chain.NewNetworkServiceEndpointRegistryClient(
clienturl.NewNetworkServiceEndpointRegistryClient(regURL),
begin.NewNetworkServiceEndpointRegistryClient(),
clientconn.NewNetworkServiceEndpointRegistryClient(),
dial.NewNetworkServiceEndpointRegistryClient(ctx,
dial.WithDialOptions(opts.dialOptions...),
dial.WithDialTimeout(opts.dialTimeout),
),
connect2.NewNetworkServiceEndpointRegistryClient(),
)
nseClient := chain.NewNetworkServiceEndpointRegistryClient(
begin.NewNetworkServiceEndpointRegistryClient(),
clienturl.NewNetworkServiceEndpointRegistryClient(regURL),
clientconn.NewNetworkServiceEndpointRegistryClient(),
dial.NewNetworkServiceEndpointRegistryClient(ctx,
dial.WithDialOptions(opts.dialOptions...),
dial.WithDialTimeout(opts.dialTimeout),
),
registryconnect.NewNetworkServiceEndpointRegistryClient(),
)

nsClient := chain.NewNetworkServiceRegistryClient(
begin.NewNetworkServiceRegistryClient(),
clienturl.NewNetworkServiceRegistryClient(regURL),
clientconn.NewNetworkServiceRegistryClient(),
dial.NewNetworkServiceRegistryClient(ctx,
dial.WithDialOptions(opts.dialOptions...),
dial.WithDialTimeout(time.Millisecond*100),
),
connect2.NewNetworkServiceRegistryClient(),
registryconnect.NewNetworkServiceRegistryClient(),
)

rv.Endpoint = endpoint.NewServer(ctx, tokenGenerator,
Expand All @@ -201,32 +199,31 @@ func NewServer(ctx context.Context, regURL, proxyURL *url.URL, tokenGenerator to
),
)

var nsServerChain = connect2.NewNetworkServiceRegistryServer(
var nsServerChain = registryconnect.NewNetworkServiceRegistryServer(
chain.NewNetworkServiceRegistryClient(
clienturl.NewNetworkServiceRegistryClient(proxyURL),
begin.NewNetworkServiceRegistryClient(),
clienturl.NewNetworkServiceRegistryClient(proxyURL),
clientconn.NewNetworkServiceRegistryClient(),
dial.NewNetworkServiceRegistryClient(ctx,
dial.WithDialOptions(opts.dialOptions...),
dial.WithDialTimeout(time.Millisecond*100),
),
connect2.NewNetworkServiceRegistryClient(),
registryconnect.NewNetworkServiceRegistryClient(),
),
)

var nseServerChain = chain.NewNetworkServiceEndpointRegistryServer(
begin.NewNetworkServiceEndpointRegistryServer(),
clienturl.NewNetworkServiceEndpointRegistryServer(proxyURL),
interdomainBypassNSEServer,
registryswapip.NewNetworkServiceEndpointRegistryServer(opts.openMapIPChannel(ctx)),
connect2.NewNetworkServiceEndpointRegistryServer(
registryconnect.NewNetworkServiceEndpointRegistryServer(
chain.NewNetworkServiceEndpointRegistryClient(
begin.NewNetworkServiceEndpointRegistryClient(),
clientconn.NewNetworkServiceEndpointRegistryClient(),
dial.NewNetworkServiceEndpointRegistryClient(ctx,
dial.WithDialOptions(opts.dialOptions...),
dial.WithDialTimeout(opts.dialTimeout),
),
connect2.NewNetworkServiceEndpointRegistryClient(),
registryconnect.NewNetworkServiceEndpointRegistryClient(),
),
),
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/registry/chains/client/ns_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/networkservicemesh/sdk/pkg/registry/common/begin"
"github.com/networkservicemesh/sdk/pkg/registry/common/clientconn"
"github.com/networkservicemesh/sdk/pkg/registry/common/clienturl"
"github.com/networkservicemesh/sdk/pkg/registry/common/connect2"
"github.com/networkservicemesh/sdk/pkg/registry/common/connect"
"github.com/networkservicemesh/sdk/pkg/registry/common/dial"
"github.com/networkservicemesh/sdk/pkg/registry/common/heal"
"github.com/networkservicemesh/sdk/pkg/registry/common/retry"
Expand All @@ -50,6 +50,6 @@ func NewNetworkServiceRegistryClient(ctx context.Context, connectTo *url.URL, op
dial.WithDialOptions(clientOpts.dialOptions...),
dial.WithDialTimeout(time.Second),
),
connect2.NewNetworkServiceRegistryClient(),
connect.NewNetworkServiceRegistryClient(),
)
}
6 changes: 2 additions & 4 deletions pkg/registry/chains/client/nse_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ package client
import (
"context"
"net/url"
"time"

"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/registry/common/begin"
"github.com/networkservicemesh/sdk/pkg/registry/common/clientconn"
"github.com/networkservicemesh/sdk/pkg/registry/common/clienturl"
"github.com/networkservicemesh/sdk/pkg/registry/common/connect2"
"github.com/networkservicemesh/sdk/pkg/registry/common/connect"
"github.com/networkservicemesh/sdk/pkg/registry/common/dial"
"github.com/networkservicemesh/sdk/pkg/registry/common/heal"
"github.com/networkservicemesh/sdk/pkg/registry/common/refresh"
Expand All @@ -51,8 +50,7 @@ func NewNetworkServiceEndpointRegistryClient(ctx context.Context, connectTo *url
clientconn.NewNetworkServiceEndpointRegistryClient(),
dial.NewNetworkServiceEndpointRegistryClient(ctx,
dial.WithDialOptions(clientOpts.dialOptions...),
dial.WithDialTimeout(time.Millisecond*100),
),
connect2.NewNetworkServiceEndpointRegistryClient(),
connect.NewNetworkServiceEndpointRegistryClient(),
)
}
12 changes: 5 additions & 7 deletions pkg/registry/chains/memory/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/networkservicemesh/sdk/pkg/registry/common/begin"
"github.com/networkservicemesh/sdk/pkg/registry/common/clientconn"
"github.com/networkservicemesh/sdk/pkg/registry/common/clienturl"
"github.com/networkservicemesh/sdk/pkg/registry/common/connect2"
"github.com/networkservicemesh/sdk/pkg/registry/common/connect"
"github.com/networkservicemesh/sdk/pkg/registry/common/dial"
"github.com/networkservicemesh/sdk/pkg/registry/common/expire"
"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
Expand Down Expand Up @@ -58,16 +58,15 @@ func NewServer(ctx context.Context, expiryDuration time.Duration, proxyRegistryU
return false
},
Action: chain.NewNetworkServiceEndpointRegistryServer(
connect2.NewNetworkServiceEndpointRegistryServer(
connect.NewNetworkServiceEndpointRegistryServer(
chain.NewNetworkServiceEndpointRegistryClient(
begin.NewNetworkServiceEndpointRegistryClient(),
clienturl.NewNetworkServiceEndpointRegistryClient(proxyRegistryURL),
clientconn.NewNetworkServiceEndpointRegistryClient(),
dial.NewNetworkServiceEndpointRegistryClient(ctx,
dial.WithDialOptions(dialOptions...),
dial.WithDialTimeout(time.Millisecond*100),
),
connect2.NewNetworkServiceEndpointRegistryClient(),
connect.NewNetworkServiceEndpointRegistryClient(),
),
),
),
Expand All @@ -89,16 +88,15 @@ func NewServer(ctx context.Context, expiryDuration time.Duration, proxyRegistryU
Condition: func(c context.Context, ns *registry.NetworkService) bool {
return interdomain.Is(ns.GetName())
},
Action: connect2.NewNetworkServiceRegistryServer(
Action: connect.NewNetworkServiceRegistryServer(
chain.NewNetworkServiceRegistryClient(
clienturl.NewNetworkServiceRegistryClient(proxyRegistryURL),
begin.NewNetworkServiceRegistryClient(),
clientconn.NewNetworkServiceRegistryClient(),
dial.NewNetworkServiceRegistryClient(ctx,
dial.WithDialOptions(dialOptions...),
dial.WithDialTimeout(time.Millisecond*100),
),
connect2.NewNetworkServiceRegistryClient(),
connect.NewNetworkServiceRegistryClient(),
),
),
},
Expand Down
13 changes: 5 additions & 8 deletions pkg/registry/chains/proxydns/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package proxydns

import (
"context"
"time"

"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/registry"
"github.com/networkservicemesh/sdk/pkg/registry/common/begin"
"github.com/networkservicemesh/sdk/pkg/registry/common/clientconn"
"github.com/networkservicemesh/sdk/pkg/registry/common/connect2"
"github.com/networkservicemesh/sdk/pkg/registry/common/connect"
"github.com/networkservicemesh/sdk/pkg/registry/common/dial"
"github.com/networkservicemesh/sdk/pkg/registry/common/dnsresolve"
"github.com/networkservicemesh/sdk/pkg/registry/core/chain"
Expand All @@ -37,28 +36,26 @@ func NewServer(ctx context.Context, dnsResolver dnsresolve.Resolver, dialOptions
nseChain := chain.NewNetworkServiceEndpointRegistryServer(
begin.NewNetworkServiceEndpointRegistryServer(),
dnsresolve.NewNetworkServiceEndpointRegistryServer(dnsresolve.WithResolver(dnsResolver)),
connect2.NewNetworkServiceEndpointRegistryServer(
connect.NewNetworkServiceEndpointRegistryServer(
chain.NewNetworkServiceEndpointRegistryClient(
clientconn.NewNetworkServiceEndpointRegistryClient(),
dial.NewNetworkServiceEndpointRegistryClient(ctx,
dial.WithDialOptions(dialOptions...),
dial.WithDialTimeout(time.Millisecond*100),
),
connect2.NewNetworkServiceEndpointRegistryClient(),
connect.NewNetworkServiceEndpointRegistryClient(),
),
))
nsChain := chain.NewNetworkServiceRegistryServer(
begin.NewNetworkServiceRegistryServer(),
dnsresolve.NewNetworkServiceRegistryServer(dnsresolve.WithResolver(dnsResolver)),
connect2.NewNetworkServiceRegistryServer(
connect.NewNetworkServiceRegistryServer(
chain.NewNetworkServiceRegistryClient(
clientconn.NewNetworkServiceRegistryClient(),
dial.NewNetworkServiceRegistryClient(
ctx,
dial.WithDialOptions(dialOptions...),
dial.WithDialTimeout(time.Millisecond*100),
),
connect2.NewNetworkServiceRegistryClient(),
connect.NewNetworkServiceRegistryClient(),
),
))
return registry.NewServer(nsChain, nseChain)
Expand Down
15 changes: 10 additions & 5 deletions pkg/registry/common/clientconn/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func nameFromContext(ctx context.Context) string {
return ""
}

// LoadAndDelete -
// LoadAndDelete deletes the value for a key, returning the previous value if any.
// The loaded result reports whether the key was present.
func LoadAndDelete(ctx context.Context) (grpc.ClientConnInterface, bool) {
k := nameFromContext(ctx)

Expand All @@ -54,7 +55,7 @@ func LoadAndDelete(ctx context.Context) (grpc.ClientConnInterface, bool) {
return nil, false
}

// Store -
// Store sets the value for a key.
func Store(ctx context.Context, cc grpc.ClientConnInterface) {
k := nameFromContext(ctx)

Expand All @@ -63,7 +64,7 @@ func Store(ctx context.Context, cc grpc.ClientConnInterface) {
}
}

// Delete -
// Delete deletes the value for a key.
func Delete(ctx context.Context) {
k := nameFromContext(ctx)

Expand All @@ -72,7 +73,9 @@ func Delete(ctx context.Context) {
}
}

// Load -
// Load returns the value stored in the map for a key, or nil if no
// value is present.
// The ok result indicates whether value was found in the map.
func Load(ctx context.Context) (grpc.ClientConnInterface, bool) {
k := nameFromContext(ctx)

Expand All @@ -83,7 +86,9 @@ func Load(ctx context.Context) (grpc.ClientConnInterface, bool) {
return nil, false
}

// LoadOrStore -
// LoadOrStore returns the existing value for the key if present.
// Otherwise, it stores and returns the given value.
// The loaded result is true if the value was loaded, false if stored.
func LoadOrStore(ctx context.Context, cc grpc.ClientConnInterface) (grpc.ClientConnInterface, bool) {
k := nameFromContext(ctx)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package connect2
package connect

import (
"context"
Expand Down
Loading

0 comments on commit 2da42b0

Please sign in to comment.