Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial rework pkg/registry to use begin pattern #1213

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ linters-settings:
dupl:
threshold: 150
funlen:
Lines: 100
Lines: 110
Statements: 50
goconst:
min-len: 2
Expand Down Expand Up @@ -219,10 +219,6 @@ issues:
linters:
- interfacer
text: "can be `fmt.Stringer`"
- path: pkg/networkservice/chains/nsmgr/peertracker/server.go
linters:
- interfacer
text: "can be `fmt.Stringer`"
- path: pkg/networkservice/core/trace/client.go
linters:
- dupl
Expand Down
34 changes: 26 additions & 8 deletions pkg/networkservice/chains/nsmgr/heal_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2021 Doc.ai and/or its affiliates.
// Copyright (c) 2020-2022 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -21,12 +21,14 @@ import (
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/networkservice/utils/count"
"github.com/networkservicemesh/sdk/pkg/registry/chains/client"
"github.com/networkservicemesh/sdk/pkg/tools/sandbox"
)

Expand Down Expand Up @@ -71,7 +73,7 @@ func testNSMGRHealEndpoint(t *testing.T, nodeNum int) {

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService())
nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService(t.Name()))
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)
Expand Down Expand Up @@ -148,7 +150,7 @@ func testNSMGRHealForwarder(t *testing.T, nodeNum int) {

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService())
nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService(t.Name()))
require.NoError(t, err)

counter := new(count.Server)
Expand Down Expand Up @@ -230,7 +232,7 @@ func testNSMGRHealNSMgr(t *testing.T, nodeNum int, restored bool) {

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService())
nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService(t.Name()))
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)
Expand Down Expand Up @@ -300,7 +302,7 @@ func TestNSMGR_HealRegistry(t *testing.T) {

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService())
nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService(t.Name()))
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)
Expand Down Expand Up @@ -366,14 +368,14 @@ func testNSMGRCloseHeal(t *testing.T, withNSEExpiration bool) {
SetRegistryProxySupplier(nil)

if withNSEExpiration {
builder = builder.SetRegistryExpiryDuration(sandbox.RegistryExpiryDuration)
builder = builder.SetRegistryExpiryDuration(time.Second / 2)
}

domain := builder.Build()

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService())
nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService(t.Name()))
require.NoError(t, err)

nseCtx, nseCtxCancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -407,15 +409,31 @@ func testNSMGRCloseHeal(t *testing.T, withNSEExpiration bool) {

if withNSEExpiration {
// 3.1 Wait for the endpoint expiration
time.Sleep(sandbox.RegistryExpiryDuration)
time.Sleep(time.Second)
c := client.NewNetworkServiceEndpointRegistryClient(ctx, domain.Nodes[0].NSMgr.URL, client.WithDialOptions(sandbox.DialOptions(sandbox.WithTokenGenerator(sandbox.GenerateTestToken))...))

stream, err := c.Find(ctx, &registry.NetworkServiceEndpointQuery{
NetworkServiceEndpoint: &registry.NetworkServiceEndpoint{
Name: "final-endpoint",
},
})

require.NoError(t, err)

require.Len(t, registry.ReadNetworkServiceEndpointList(stream), 0)
}

// 4. Close connection
_, _ = nsc.Close(nscCtx, conn.Clone())

nscCtxCancel()

for _, fwd := range domain.Nodes[0].Forwarders {
fwd.Cancel()
}

require.Eventually(t, func() bool {
logrus.Error(goleak.Find())
return goleak.Find(ignoreCurrent) == nil
}, timeout, tick)

Expand Down
112 changes: 0 additions & 112 deletions pkg/networkservice/chains/nsmgr/peertracker/server.go

This file was deleted.

74 changes: 51 additions & 23 deletions pkg/networkservice/chains/nsmgr/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,21 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/common/metrics"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters"
"github.com/networkservicemesh/sdk/pkg/registry"
"github.com/networkservicemesh/sdk/pkg/registry/common/begin"
"github.com/networkservicemesh/sdk/pkg/registry/common/clientconn"
registryclientinfo "github.com/networkservicemesh/sdk/pkg/registry/common/clientinfo"
"github.com/networkservicemesh/sdk/pkg/registry/common/clienturl"
registryconnect "github.com/networkservicemesh/sdk/pkg/registry/common/connect"
"github.com/networkservicemesh/sdk/pkg/registry/common/dial"
"github.com/networkservicemesh/sdk/pkg/registry/common/expire"
"github.com/networkservicemesh/sdk/pkg/registry/common/localbypass"
"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
registryrecvfd "github.com/networkservicemesh/sdk/pkg/registry/common/recvfd"
registrysendfd "github.com/networkservicemesh/sdk/pkg/registry/common/sendfd"
"github.com/networkservicemesh/sdk/pkg/registry/switchcase"

registryserialize "github.com/networkservicemesh/sdk/pkg/registry/common/serialize"
registryadapter "github.com/networkservicemesh/sdk/pkg/registry/core/adapters"
registrychain "github.com/networkservicemesh/sdk/pkg/registry/core/chain"
"github.com/networkservicemesh/sdk/pkg/registry/core/chain"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
"github.com/networkservicemesh/sdk/pkg/tools/token"
)
Expand Down Expand Up @@ -160,38 +163,62 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
var nsRegistry = memory.NewNetworkServiceRegistryServer()
if opts.regURL != nil {
// Use remote registry
nsRegistry = registrychain.NewNetworkServiceRegistryServer(
clienturl.NewNetworkServiceRegistryServer(opts.regURL),
registryconnect.NewNetworkServiceRegistryServer(ctx, registryconnect.WithDialOptions(opts.regDialOptions...)),
nsRegistry = registryconnect.NewNetworkServiceRegistryServer(
chain.NewNetworkServiceRegistryClient(
clienturl.NewNetworkServiceRegistryClient(opts.regURL),
begin.NewNetworkServiceRegistryClient(),
clientconn.NewNetworkServiceRegistryClient(),
dial.NewNetworkServiceRegistryClient(ctx,
dial.WithDialOptions(opts.dialOptions...),
),
registryconnect.NewNetworkServiceRegistryClient(),
),
)
}

nsRegistry = registrychain.NewNetworkServiceRegistryServer(
registryserialize.NewNetworkServiceRegistryServer(),
nsRegistry = chain.NewNetworkServiceRegistryServer(
nsRegistry,
)

var nseInMemoryRegistry = memory.NewNetworkServiceEndpointRegistryServer()

var nseRegistry = registrychain.NewNetworkServiceEndpointRegistryServer(
var nseRegistry = chain.NewNetworkServiceEndpointRegistryServer(
registryclientinfo.NewNetworkServiceEndpointRegistryServer(),
registryserialize.NewNetworkServiceEndpointRegistryServer(),
begin.NewNetworkServiceEndpointRegistryServer(),
expire.NewNetworkServiceEndpointRegistryServer(ctx, time.Minute),
registryrecvfd.NewNetworkServiceEndpointRegistryServer(), // Allow to receive a passed files
registrysendfd.NewNetworkServiceEndpointRegistryServer(),
nseInMemoryRegistry,
switchcase.NewNetworkServiceEndpointRegistryServer(
switchcase.NSEServerCase{
Condition: func(c context.Context, nse *registryapi.NetworkServiceEndpoint) bool {
return opts.regURL != nil
},
Action: registrysendfd.NewNetworkServiceEndpointRegistryServer(),
},
),
localbypass.NewNetworkServiceEndpointRegistryServer(opts.url),
switchcase.NewNetworkServiceEndpointRegistryServer(
switchcase.NSEServerCase{
Condition: func(c context.Context, nse *registryapi.NetworkServiceEndpoint) bool {
return opts.regURL == nil
},
Action: memory.NewNetworkServiceEndpointRegistryServer(),
},
switchcase.NSEServerCase{
Condition: func(c context.Context, nse *registryapi.NetworkServiceEndpoint) bool {
return opts.regURL != nil
},
Action: registryconnect.NewNetworkServiceEndpointRegistryServer(
chain.NewNetworkServiceEndpointRegistryClient(
begin.NewNetworkServiceEndpointRegistryClient(),
clienturl.NewNetworkServiceEndpointRegistryClient(opts.regURL),
clientconn.NewNetworkServiceEndpointRegistryClient(),
dial.NewNetworkServiceEndpointRegistryClient(ctx,
dial.WithDialOptions(opts.dialOptions...),
),
registryconnect.NewNetworkServiceEndpointRegistryClient(),
),
),
}),
)

if opts.regURL != nil {
// Add remote registry
nseRegistry = registrychain.NewNetworkServiceEndpointRegistryServer(
nseRegistry,
clienturl.NewNetworkServiceEndpointRegistryServer(opts.regURL),
registryconnect.NewNetworkServiceEndpointRegistryServer(ctx, registryconnect.WithDialOptions(opts.regDialOptions...)),
)
}

// Construct Endpoint
rv.Endpoint = endpoint.NewServer(ctx, tokenGenerator,
endpoint.WithName(opts.name),
Expand All @@ -200,8 +227,9 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
adapters.NewClientToServer(clientinfo.NewClient()),
discoverforwarder.NewServer(
registryadapter.NetworkServiceServerToClient(nsRegistry),
registryadapter.NetworkServiceEndpointServerToClient(nseInMemoryRegistry),
registryadapter.NetworkServiceEndpointServerToClient(nseRegistry),
discoverforwarder.WithForwarderServiceName(opts.forwarderServiceName),
discoverforwarder.WithNSMgrURL(opts.url),
),
excludedprefixes.NewServer(ctx),
recvfd.NewServer(), // Receive any files passed
Expand Down
10 changes: 5 additions & 5 deletions pkg/networkservice/chains/nsmgr/single_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2021 Doc.ai and/or its affiliates.
// Copyright (c) 2020-2022 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -50,7 +50,7 @@ func Test_DNSUsecase(t *testing.T) {

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService())
nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService(t.Name()))
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)
Expand Down Expand Up @@ -126,7 +126,7 @@ func Test_ShouldParseNetworkServiceLabelsTemplate(t *testing.T) {

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg := defaultRegistryService()
nsReg := defaultRegistryService(t.Name())
nsReg.Matches = []*registry.Match{
{
Routes: []*registry.Destination{
Expand Down Expand Up @@ -176,7 +176,7 @@ func Test_UsecasePoint2MultiPoint(t *testing.T) {
SetNodeSetup(func(ctx context.Context, node *sandbox.Node, _ int) {
node.NewNSMgr(ctx, "nsmgr", nil, sandbox.GenerateTestToken, nsmgr.NewServer)
}).
SetRegistryExpiryDuration(sandbox.RegistryExpiryDuration).
SetRegistryExpiryDuration(time.Second).
Build()

domain.Nodes[0].NewForwarder(ctx, &registry.NetworkServiceEndpoint{
Expand Down Expand Up @@ -297,7 +297,7 @@ func Test_RemoteUsecase_Point2MultiPoint(t *testing.T) {
SetNodeSetup(func(ctx context.Context, node *sandbox.Node, _ int) {
node.NewNSMgr(ctx, "nsmgr", nil, sandbox.GenerateTestToken, nsmgr.NewServer)
}).
SetRegistryExpiryDuration(sandbox.RegistryExpiryDuration).
SetRegistryExpiryDuration(time.Second).
Build()

for i := 0; i < nodeCount; i++ {
Expand Down
Loading