Skip to content

Commit

Permalink
rework registry to use 'begin' pattern
Browse files Browse the repository at this point in the history
Signed-off-by: Denis Tingaikin <denis.tingajkin@xored.com>
  • Loading branch information
denis-tingaikin committed Jan 24, 2022
1 parent c6568d1 commit 2995e87
Show file tree
Hide file tree
Showing 121 changed files with 4,340 additions and 3,738 deletions.
6 changes: 1 addition & 5 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ linters-settings:
dupl:
threshold: 150
funlen:
Lines: 100
Lines: 110
Statements: 50
goconst:
min-len: 2
Expand Down Expand Up @@ -219,10 +219,6 @@ issues:
linters:
- interfacer
text: "can be `fmt.Stringer`"
- path: pkg/networkservice/chains/nsmgr/peertracker/server.go
linters:
- interfacer
text: "can be `fmt.Stringer`"
- path: pkg/networkservice/core/trace/client.go
linters:
- dupl
Expand Down
23 changes: 11 additions & 12 deletions pkg/networkservice/chains/nsmgr/heal_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2021 Doc.ai and/or its affiliates.
// Copyright (c) 2020-2022 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -71,7 +71,7 @@ func testNSMGRHealEndpoint(t *testing.T, nodeNum int) {

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService())
nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService(t.Name()))
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)
Expand Down Expand Up @@ -148,7 +148,7 @@ func testNSMGRHealForwarder(t *testing.T, nodeNum int) {

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService())
nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService(t.Name()))
require.NoError(t, err)

counter := new(count.Server)
Expand Down Expand Up @@ -230,7 +230,7 @@ func testNSMGRHealNSMgr(t *testing.T, nodeNum int, restored bool) {

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService())
nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService(t.Name()))
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)
Expand Down Expand Up @@ -300,7 +300,7 @@ func TestNSMGR_HealRegistry(t *testing.T) {

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService())
nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService(t.Name()))
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)
Expand Down Expand Up @@ -366,14 +366,14 @@ func testNSMGRCloseHeal(t *testing.T, withNSEExpiration bool) {
SetRegistryProxySupplier(nil)

if withNSEExpiration {
builder = builder.SetRegistryExpiryDuration(sandbox.RegistryExpiryDuration)
builder = builder.SetRegistryExpiryDuration(time.Second)
}

domain := builder.Build()

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService())
nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService(t.Name()))
require.NoError(t, err)

nseCtx, nseCtxCancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -407,19 +407,18 @@ func testNSMGRCloseHeal(t *testing.T, withNSEExpiration bool) {

if withNSEExpiration {
// 3.1 Wait for the endpoint expiration
time.Sleep(sandbox.RegistryExpiryDuration)
time.Sleep(time.Second)
}

// 4. Close connection
_, _ = nsc.Close(nscCtx, conn.Clone())
_, err = nsc.Close(nscCtx, conn.Clone())
require.NoError(t, err)

nscCtxCancel()

require.NoError(t, ctx.Err())
require.Eventually(t, func() bool {
return goleak.Find(ignoreCurrent) == nil
}, timeout, tick)

require.NoError(t, ctx.Err())
}

func checkSecondRequestsReceived(requestsDone func() int) func() bool {
Expand Down
112 changes: 0 additions & 112 deletions pkg/networkservice/chains/nsmgr/peertracker/server.go

This file was deleted.

79 changes: 55 additions & 24 deletions pkg/networkservice/chains/nsmgr/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,21 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/common/metrics"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters"
"github.com/networkservicemesh/sdk/pkg/registry"
"github.com/networkservicemesh/sdk/pkg/registry/common/begin"
"github.com/networkservicemesh/sdk/pkg/registry/common/clientconn"
registryclientinfo "github.com/networkservicemesh/sdk/pkg/registry/common/clientinfo"
"github.com/networkservicemesh/sdk/pkg/registry/common/clienturl"
registryconnect "github.com/networkservicemesh/sdk/pkg/registry/common/connect"
"github.com/networkservicemesh/sdk/pkg/registry/common/connect2"
"github.com/networkservicemesh/sdk/pkg/registry/common/dial"
"github.com/networkservicemesh/sdk/pkg/registry/common/expire"
"github.com/networkservicemesh/sdk/pkg/registry/common/localbypass"
"github.com/networkservicemesh/sdk/pkg/registry/common/memory"
registryrecvfd "github.com/networkservicemesh/sdk/pkg/registry/common/recvfd"
registrysendfd "github.com/networkservicemesh/sdk/pkg/registry/common/sendfd"
"github.com/networkservicemesh/sdk/pkg/registry/switchcase"

registryserialize "github.com/networkservicemesh/sdk/pkg/registry/common/serialize"
registryadapter "github.com/networkservicemesh/sdk/pkg/registry/core/adapters"
registrychain "github.com/networkservicemesh/sdk/pkg/registry/core/chain"
"github.com/networkservicemesh/sdk/pkg/registry/core/chain"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
"github.com/networkservicemesh/sdk/pkg/tools/token"
)
Expand Down Expand Up @@ -160,38 +163,65 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
var nsRegistry = memory.NewNetworkServiceRegistryServer()
if opts.regURL != nil {
// Use remote registry
nsRegistry = registrychain.NewNetworkServiceRegistryServer(
clienturl.NewNetworkServiceRegistryServer(opts.regURL),
registryconnect.NewNetworkServiceRegistryServer(ctx, registryconnect.WithDialOptions(opts.regDialOptions...)),
nsRegistry = connect2.NewNetworkServiceRegistryServer(
chain.NewNetworkServiceRegistryClient(
clienturl.NewNetworkServiceRegistryClient(opts.regURL),
begin.NewNetworkServiceRegistryClient(),
clientconn.NewNetworkServiceRegistryClient(),
dial.NewNetworkServiceRegistryClient(ctx,
dial.WithDialOptions(opts.dialOptions...),
dial.WithDialTimeout(time.Millisecond*100),
),
connect2.NewNetworkServiceRegistryClient(),
),
)
}

nsRegistry = registrychain.NewNetworkServiceRegistryServer(
registryserialize.NewNetworkServiceRegistryServer(),
nsRegistry = chain.NewNetworkServiceRegistryServer(
nsRegistry,
)

var nseInMemoryRegistry = memory.NewNetworkServiceEndpointRegistryServer()

var nseRegistry = registrychain.NewNetworkServiceEndpointRegistryServer(
var nseRegistry = chain.NewNetworkServiceEndpointRegistryServer(
registryclientinfo.NewNetworkServiceEndpointRegistryServer(),
registryserialize.NewNetworkServiceEndpointRegistryServer(),
begin.NewNetworkServiceEndpointRegistryServer(),
expire.NewNetworkServiceEndpointRegistryServer(ctx, time.Minute),
registryrecvfd.NewNetworkServiceEndpointRegistryServer(), // Allow to receive a passed files
registrysendfd.NewNetworkServiceEndpointRegistryServer(),
nseInMemoryRegistry,
switchcase.NewNetworkServiceEndpointRegistryServer(
switchcase.NSEServerCase{
Condition: func(c context.Context, nse *registryapi.NetworkServiceEndpoint) bool {
return opts.regURL != nil
},
Action: registrysendfd.NewNetworkServiceEndpointRegistryServer(),
},
),
localbypass.NewNetworkServiceEndpointRegistryServer(opts.url),
switchcase.NewNetworkServiceEndpointRegistryServer(
switchcase.NSEServerCase{
Condition: func(c context.Context, nse *registryapi.NetworkServiceEndpoint) bool {
return opts.regURL == nil
},
Action: memory.NewNetworkServiceEndpointRegistryServer(),
},
switchcase.NSEServerCase{
Condition: func(c context.Context, nse *registryapi.NetworkServiceEndpoint) bool {
return opts.regURL != nil
},
Action: connect2.NewNetworkServiceEndpointRegistryServer(
chain.NewNetworkServiceEndpointRegistryClient(
clienturl.NewNetworkServiceEndpointRegistryClient(opts.regURL),
// retry.NewNetworkServiceEndpointRegistryClient(),
begin.NewNetworkServiceEndpointRegistryClient(),
clientconn.NewNetworkServiceEndpointRegistryClient(),
dial.NewNetworkServiceEndpointRegistryClient(ctx,
dial.WithDialOptions(opts.dialOptions...),
dial.WithDialTimeout(time.Millisecond*100),
),
connect2.NewNetworkServiceEndpointRegistryClient(),
),
),
}),
)

if opts.regURL != nil {
// Add remote registry
nseRegistry = registrychain.NewNetworkServiceEndpointRegistryServer(
nseRegistry,
clienturl.NewNetworkServiceEndpointRegistryServer(opts.regURL),
registryconnect.NewNetworkServiceEndpointRegistryServer(ctx, registryconnect.WithDialOptions(opts.regDialOptions...)),
)
}

// Construct Endpoint
rv.Endpoint = endpoint.NewServer(ctx, tokenGenerator,
endpoint.WithName(opts.name),
Expand All @@ -200,8 +230,9 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
adapters.NewClientToServer(clientinfo.NewClient()),
discoverforwarder.NewServer(
registryadapter.NetworkServiceServerToClient(nsRegistry),
registryadapter.NetworkServiceEndpointServerToClient(nseInMemoryRegistry),
registryadapter.NetworkServiceEndpointServerToClient(nseRegistry),
discoverforwarder.WithForwarderServiceName(opts.forwarderServiceName),
discoverforwarder.WithNSMgrURL(opts.url),
),
excludedprefixes.NewServer(ctx),
recvfd.NewServer(), // Receive any files passed
Expand Down
12 changes: 6 additions & 6 deletions pkg/networkservice/chains/nsmgr/single_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2021 Doc.ai and/or its affiliates.
// Copyright (c) 2020-2022 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -50,7 +50,7 @@ func Test_DNSUsecase(t *testing.T) {

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService())
nsReg, err := nsRegistryClient.Register(ctx, defaultRegistryService(t.Name()))
require.NoError(t, err)

nseReg := defaultRegistryEndpoint(nsReg.Name)
Expand Down Expand Up @@ -126,7 +126,7 @@ func Test_ShouldParseNetworkServiceLabelsTemplate(t *testing.T) {

nsRegistryClient := domain.NewNSRegistryClient(ctx, sandbox.GenerateTestToken)

nsReg := defaultRegistryService()
nsReg := defaultRegistryService(t.Name())
nsReg.Matches = []*registry.Match{
{
Routes: []*registry.Destination{
Expand Down Expand Up @@ -167,7 +167,7 @@ func Test_ShouldParseNetworkServiceLabelsTemplate(t *testing.T) {
func Test_UsecasePoint2MultiPoint(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()

domain := sandbox.NewBuilder(ctx, t).
Expand All @@ -176,7 +176,7 @@ func Test_UsecasePoint2MultiPoint(t *testing.T) {
SetNodeSetup(func(ctx context.Context, node *sandbox.Node, _ int) {
node.NewNSMgr(ctx, "nsmgr", nil, sandbox.GenerateTestToken, nsmgr.NewServer)
}).
SetRegistryExpiryDuration(sandbox.RegistryExpiryDuration).
SetRegistryExpiryDuration(time.Second).
Build()

domain.Nodes[0].NewForwarder(ctx, &registry.NetworkServiceEndpoint{
Expand Down Expand Up @@ -297,7 +297,7 @@ func Test_RemoteUsecase_Point2MultiPoint(t *testing.T) {
SetNodeSetup(func(ctx context.Context, node *sandbox.Node, _ int) {
node.NewNSMgr(ctx, "nsmgr", nil, sandbox.GenerateTestToken, nsmgr.NewServer)
}).
SetRegistryExpiryDuration(sandbox.RegistryExpiryDuration).
SetRegistryExpiryDuration(time.Second).
Build()

for i := 0; i < nodeCount; i++ {
Expand Down
Loading

0 comments on commit 2995e87

Please sign in to comment.