Skip to content

Commit

Permalink
add reselectcleanup, modify monitor server
Browse files Browse the repository at this point in the history
monitor server can now provide connections locally

Signed-off-by: Danil Uzlov <DanilUzlov@yandex.ru>
  • Loading branch information
d-uzlov committed Jun 20, 2023
1 parent cf89e72 commit 04f8683
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 13 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ issues:
- path: pkg/networkservice/chains/nsmgrproxy/server_test.go
linters:
- funlen
- path: pkg/networkservice/connectioncontext/mtu/vl3mtu/server_test.go
linters:
- funlen
- path: pkg/networkservice/core/next/.*_test.go
linters:
- dupl
Expand Down
5 changes: 4 additions & 1 deletion pkg/networkservice/chains/endpoint/combine_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) 2021-2022 Doc.ai and/or its affiliates.
//
// Copyright (c) 2023 Cisco Systems, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -312,11 +314,12 @@ type testEndpoint struct {

func newTestEndpoint(ctx context.Context, name string) *testEndpoint {
e := new(testEndpoint)
var connectionProvider monitor.ConnectionProvider
e.NetworkServiceServer = next.NewNetworkServiceServer(
updatepath.NewServer(name),
begin.NewServer(),
metadata.NewServer(),
monitor.NewServer(ctx, &e.MonitorConnectionServer),
monitor.NewServer(ctx, &e.MonitorConnectionServer, &connectionProvider),
)
return e
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/networkservice/chains/endpoint/server.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2020-2022 Cisco Systems, Inc.
//
// Copyright (c) 2020-2022 Doc.ai and/or its affiliates.
//
// Copyright (c) 2020-2023 Cisco Systems, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -25,6 +25,7 @@ import (

"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/reselectcleanup"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/trimpath"

"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
Expand Down Expand Up @@ -116,6 +117,8 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
opt(opts)
}
var mcsPtr networkservice.MonitorConnectionServer
var connectionProvider monitor.ConnectionProvider
monitorServer := monitor.NewServer(ctx, &mcsPtr, &connectionProvider)

rv := &endpoint{}
rv.NetworkServiceServer = chain.NewNetworkServiceServer(
Expand All @@ -124,9 +127,10 @@ func NewServer(ctx context.Context, tokenGenerator token.GeneratorFunc, options
begin.NewServer(),
updatetoken.NewServer(tokenGenerator),
opts.authorizeServer,
reselectcleanup.NewServer(connectionProvider),
metadata.NewServer(),
timeout.NewServer(ctx),
monitor.NewServer(ctx, &mcsPtr),
monitorServer,
trimpath.NewServer(),
}, opts.additionalFunctionality...)...)
rv.MonitorConnectionServer = next.NewMonitorConnectionServer(opts.authorizeMonitorConnectionServer, mcsPtr)
Expand Down
20 changes: 19 additions & 1 deletion pkg/networkservice/common/monitor/monitor_connection_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type monitorConnectionServer struct {
executor serialize.Executor
}

func newMonitorConnectionServer(chainCtx context.Context) networkservice.MonitorConnectionServer {
func newMonitorConnectionServer(chainCtx context.Context) *monitorConnectionServer {
return &monitorConnectionServer{
chainCtx: chainCtx,
connections: make(map[string]*networkservice.Connection),
Expand Down Expand Up @@ -105,9 +105,27 @@ func (m *monitorConnectionServer) Send(event *networkservice.ConnectionEvent) (_
return nil
}

func (m *monitorConnectionServer) Find(selector *networkservice.MonitorScopeSelector) (connections map[string]*networkservice.Connection, err error) {
rv := make(map[string]*networkservice.Connection)
<-m.executor.AsyncExec(func() {
connections = networkservice.FilterMapOnManagerScopeSelector(m.connections, selector)
for k, v := range connections {
rv[k] = v.Clone()
}
})
return rv, nil
}

// EventConsumer - interface for monitor events sending
type EventConsumer interface {
Send(event *networkservice.ConnectionEvent) (err error)
}

var _ EventConsumer = &monitorConnectionServer{}

// ConnectionProvider - interface for providing connections locally and synchronously, without a network connection
type ConnectionProvider interface {
Find(selector *networkservice.MonitorScopeSelector) (connections map[string]*networkservice.Connection, err error)
}

var _ ConnectionProvider = &monitorConnectionServer{}
13 changes: 9 additions & 4 deletions pkg/networkservice/common/monitor/server.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2020-2022 Cisco Systems, Inc.
//
// Copyright (c) 2021-2022 Doc.ai and/or its affiliates.
//
// Copyright (c) 2020-2023 Cisco Systems, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -49,8 +49,13 @@ type monitorServer struct {
// networkservice.MonitorConnectionServer that can be used either standalone or in a
// networkservice.MonitorConnectionServer chain
// chainCtx - context for lifecycle management
func NewServer(chainCtx context.Context, monitorServerPtr *networkservice.MonitorConnectionServer) networkservice.NetworkServiceServer {
*monitorServerPtr = newMonitorConnectionServer(chainCtx)
// - connectionProvider - allows you to get monitor.ConnectionProvider,
// that will give you a way to get connections locally,
// without having to connect to monitor server via loopback.
func NewServer(chainCtx context.Context, monitorServerPtr *networkservice.MonitorConnectionServer, connectionProvider *ConnectionProvider) networkservice.NetworkServiceServer {
connectionsServer := newMonitorConnectionServer(chainCtx)
*monitorServerPtr = connectionsServer
*connectionProvider = connectionsServer
return &monitorServer{
chainCtx: chainCtx,
MonitorConnectionServer: *monitorServerPtr,
Expand Down
5 changes: 3 additions & 2 deletions pkg/networkservice/common/monitor/server_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2021 Cisco and/or its affiliates.
// Copyright (c) 2020-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -43,9 +43,10 @@ func TestMonitorServer(t *testing.T) {

// Create monitorServer, monitorClient, and server.
var monitorServer networkservice.MonitorConnectionServer
var connectionProvider monitor.ConnectionProvider
server := chain.NewNetworkServiceServer(
metadata.NewServer(),
monitor.NewServer(ctx, &monitorServer),
monitor.NewServer(ctx, &monitorServer, &connectionProvider),
)
monitorClient := adapters.NewMonitorServerToClient(monitorServer)

Expand Down
88 changes: 88 additions & 0 deletions pkg/networkservice/common/reselectcleanup/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package reselectcleanup provides a server chain element
// that will call Close before request
// if an already existing connection has reselect flag
package reselectcleanup

import (
"context"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

type reselectcleanupServer struct {
connectionProvider monitor.ConnectionProvider
}

// NewServer - create a new reselectcleanup server
func NewServer(connectionProvider monitor.ConnectionProvider) networkservice.NetworkServiceServer {
return &reselectcleanupServer{
connectionProvider: connectionProvider,
}
}

func (c *reselectcleanupServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
if request.GetConnection().GetState() != networkservice.State_RESELECT_REQUESTED {
return next.Server(ctx).Request(ctx, request)
}

conns, err := c.connectionProvider.Find(&networkservice.MonitorScopeSelector{
PathSegments: []*networkservice.PathSegment{
{
Id: request.GetConnection().GetCurrentPathSegment().GetId(),
Name: request.GetConnection().GetCurrentPathSegment().GetName(),
},
},
})
if err != nil {
log.FromContext(ctx).Errorf("Can't check if we have an old connection to close on reselect: %v", err)
conn, err2 := next.Server(ctx).Request(ctx, request)
if err2 == nil {
conn.State = networkservice.State_UP
}
return conn, err2
}
oldConnection, ok := conns[request.GetConnection().GetId()]
if !ok || oldConnection == nil {
// most likely the connection has already been closed
conn, err2 := next.Server(ctx).Request(ctx, request)
if err2 == nil {
conn.State = networkservice.State_UP
}
return conn, err2
}
log.FromContext(ctx).Info("Closing connection due to RESELECT_REQUESTED state")
_, err = next.Server(ctx).Close(ctx, oldConnection)
if err != nil {
log.FromContext(ctx).Errorf("Can't close old connection: %v", err)
}
conn, err := next.Server(ctx).Request(ctx, request)
if err == nil {
conn.State = networkservice.State_UP
}
return conn, err
}

func (c *reselectcleanupServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
return next.Server(ctx).Close(ctx, conn)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
// Copyright (c) 2022-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -45,9 +45,10 @@ func Test_vl3MtuServer(t *testing.T) {

// Create monitorServer
var monitorServer networkservice.MonitorConnectionServer
var connectionProvider monitor.ConnectionProvider
server := chain.NewNetworkServiceServer(
metadata.NewServer(),
monitor.NewServer(ctx, &monitorServer),
monitor.NewServer(ctx, &monitorServer, &connectionProvider),
vl3mtu.NewServer(),
)
monitorClient := adapters.NewMonitorServerToClient(monitorServer)
Expand Down

0 comments on commit 04f8683

Please sign in to comment.