Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leak fixes #1643

Merged
merged 7 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/benbjohnson/clock v1.3.0
github.com/edwarnicke/exechelper v1.0.2
github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29
github.com/edwarnicke/grpcfd v1.1.2
github.com/edwarnicke/grpcfd v1.1.4
github.com/edwarnicke/serialize v1.0.7
github.com/fsnotify/fsnotify v1.5.4
github.com/ghodss/yaml v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ github.com/edwarnicke/exechelper v1.0.2 h1:dD49Ui2U0FBFxxhalnKw6vLS0P0TkgnXBRvKL
github.com/edwarnicke/exechelper v1.0.2/go.mod h1:/T271jtNX/ND4De6pa2aRy2+8sNtyCDB1A2pp4M+fUs=
github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29 h1:4/2wgileNvQB4HfJbq7u4FFLKIfc38a6P0S/51ZGgX8=
github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29/go.mod h1:3m+ZfVq+z0pTLW798jmqnifMsalrVLIKmfXaMFvqSuc=
github.com/edwarnicke/grpcfd v1.1.2 h1:2b8kCABQ1+JjSKGDoHadqSW7whCeTXMqtyo6jmB5B8k=
github.com/edwarnicke/grpcfd v1.1.2/go.mod h1:rHihB9YvNMixz8rS+ZbwosI2kj65VLkeyYAI2M+/cGA=
github.com/edwarnicke/grpcfd v1.1.4 h1:MuXeJTyIyWuUMYJJBIW7Cr8TUBWPXRxop3aGudhzV2I=
github.com/edwarnicke/grpcfd v1.1.4/go.mod h1:rHihB9YvNMixz8rS+ZbwosI2kj65VLkeyYAI2M+/cGA=
github.com/edwarnicke/serialize v0.0.0-20200705214914-ebc43080eecf/go.mod h1:XvbCO/QGsl3X8RzjBMoRpkm54FIAZH5ChK2j+aox7pw=
github.com/edwarnicke/serialize v1.0.7 h1:geX8vmyu8Ij2S5fFIXjy9gBDkKxXnrMIzMoDvV0Ddac=
github.com/edwarnicke/serialize v1.0.7/go.mod h1:y79KgU2P7ALH/4j37uTSIdNavHFNttqN7pzO6Y8B2aw=
Expand Down
5 changes: 4 additions & 1 deletion pkg/networkservice/common/monitor/client_filter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2021 Cisco and/or its affiliates.
//
// Copyright (c) 2023 Cisco Systems, Inc.
// Copyright (c) 2023-2024 Cisco Systems, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -37,6 +37,9 @@ func newClientFilter(client networkservice.MonitorConnection_MonitorConnectionsC

func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) {
for {
if c == nil || c.MonitorConnection_MonitorConnectionsClient == nil {
return nil, errors.New("MonitorConnections cilent is nil")
}
eventIn, err := c.MonitorConnection_MonitorConnectionsClient.Recv()
if err != nil {
return nil, errors.Wrap(err, "MonitorConnections client failed to receive an event")
Expand Down
58 changes: 35 additions & 23 deletions pkg/networkservice/common/monitor/eventloop.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021-2023 Cisco and/or its affiliates.
// Copyright (c) 2021-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -20,65 +20,77 @@ import (
"context"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/pkg/errors"
"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/tools/log"
)

type eventLoop struct {
eventLoopCtx context.Context
conn *networkservice.Connection
eventConsumer EventConsumer
client networkservice.MonitorConnection_MonitorConnectionsClient
cancel func()
cc grpc.ClientConnInterface
}

func newEventLoop(ctx context.Context, ec EventConsumer, cc grpc.ClientConnInterface, conn *networkservice.Connection) (context.CancelFunc, error) {
func newEventLoop(ctx context.Context, ec EventConsumer, cc grpc.ClientConnInterface, conn *networkservice.Connection) context.CancelFunc {
conn = conn.Clone()
// Is another chain element asking for events? If not, no need to monitor
if ec == nil {
return func() {}, nil
return func() {}
}

// Create new eventLoopCtx and store its eventLoopCancel
eventLoopCtx, eventLoopCancel := context.WithCancel(ctx)
cev := &eventLoop{
eventLoopCtx: eventLoopCtx,
conn: conn,
eventConsumer: ec,
cc: cc,
cancel: eventLoopCancel,
}

// Start the eventLoop
go cev.eventLoop()
return eventLoopCancel
}

func (cev *eventLoop) eventLoop() {
// Create selector to only ask for events related to our Connection
selector := &networkservice.MonitorScopeSelector{
PathSegments: []*networkservice.PathSegment{
{
Id: conn.GetCurrentPathSegment().GetId(),
Name: conn.GetCurrentPathSegment().GetName(),
Id: cev.conn.GetCurrentPathSegment().GetId(),
Name: cev.conn.GetCurrentPathSegment().GetName(),
},
},
}

client, err := networkservice.NewMonitorConnectionClient(cc).MonitorConnections(eventLoopCtx, selector)
client, err := networkservice.NewMonitorConnectionClient(cev.cc).MonitorConnections(cev.eventLoopCtx, selector)
if err != nil {
eventLoopCancel()
return nil, errors.Wrap(err, "failed to get a MonitorConnections client")
log.FromContext(cev.eventLoopCtx).Infof("failed to get a MonitorConnections client: %s", err.Error())
cev.cancel()
return
}

cev := &eventLoop{
eventLoopCtx: eventLoopCtx,
conn: conn,
eventConsumer: ec,
client: newClientFilter(client, conn),
if client == nil {
log.FromContext(cev.eventLoopCtx).Infof("failed to get a MonitorConnections client: client is nil")
cev.cancel()
return
}

// Start the eventLoop
go cev.eventLoop()
return eventLoopCancel, nil
}
filter := newClientFilter(client, cev.conn)

func (cev *eventLoop) eventLoop() {
// So we have a client, and can receive events
for {
eventIn, err := cev.client.Recv()
eventIn, err := filter.Recv()
if cev.eventLoopCtx.Err() != nil {
return
}
if err != nil {

connOut := cev.conn.Clone()
if err != nil && connOut != nil {
// If we get an error, we've lost our connection... Send Down update
connOut := cev.conn.Clone()
connOut.State = networkservice.State_DOWN
eventOut := &networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_UPDATE,
Expand Down
15 changes: 3 additions & 12 deletions pkg/networkservice/common/monitor/server.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2020-2023 Cisco Systems, Inc.
//
// Copyright (c) 2021-2023 Doc.ai and/or its affiliates.
//
// Copyright (c) 2020-2024 Cisco Systems, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -24,11 +24,9 @@ import (
"context"

"github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
"github.com/networkservicemesh/sdk/pkg/tools/postpone"

"github.com/networkservicemesh/api/pkg/api/networkservice"

Expand Down Expand Up @@ -58,7 +56,6 @@ func NewServer(chainCtx context.Context, monitorServerPtr *networkservice.Monito
}

func (m *monitorServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
closeCtxFunc := postpone.ContextWithValues(ctx)
// Cancel any existing eventLoop
cancelEventLoop, loaded := loadAndDelete(ctx, metadata.IsClient(m))
if loaded {
Expand Down Expand Up @@ -88,13 +85,7 @@ func (m *monitorServer) Request(ctx context.Context, request *networkservice.Net
// events through from, so start an eventLoop
cc, ccLoaded := clientconn.Load(ctx)
if ccLoaded {
cancelEventLoop, eventLoopErr := newEventLoop(m.chainCtx, m.MonitorConnectionServer.(EventConsumer), cc, conn)
if eventLoopErr != nil {
closeCtx, closeCancel := closeCtxFunc()
defer closeCancel()
_, _ = next.Client(closeCtx).Close(closeCtx, conn)
return nil, errors.Wrap(eventLoopErr, "unable to monitor")
}
cancelEventLoop := newEventLoop(m.chainCtx, m.MonitorConnectionServer.(EventConsumer), cc, conn)
store(ctx, metadata.IsClient(m), cancelEventLoop)
}

Expand Down
33 changes: 32 additions & 1 deletion pkg/networkservice/common/monitor/server_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2023 Cisco and/or its affiliates.
// Copyright (c) 2020-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -34,8 +34,10 @@ import (
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/cls"
kernelmech "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkcontext"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
Expand Down Expand Up @@ -209,6 +211,35 @@ func TestMonitorServer_RequestConnEqualsToMonitorConn(t *testing.T) {
require.NoError(t, err)
}

func TestMonitorServer_FailedConnect(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

// Create a grpc connection to non existing address
cc, err := grpc.Dial("1.1.1.1:5000", grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
require.NotNil(t, cc)

// Create a server
var monitorServer networkservice.MonitorConnectionServer
server := chain.NewNetworkServiceServer(
metadata.NewServer(),
checkcontext.NewServer(t, func(t *testing.T, ctx context.Context) {
clientconn.Store(ctx, cc)
}),
monitor.NewServer(ctx, &monitorServer),
)

request := &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{Id: "id"},
}

// Make a request that should be successful and immediate (because monitor server connects to 1.1.1.1:5000 in background)
conn, err := server.Request(ctx, request)
require.NoError(t, err)
require.NotNil(t, conn)
}

type metricsServer struct{}

func (m *metricsServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
Expand Down
8 changes: 5 additions & 3 deletions pkg/networkservice/common/updatepath/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2023 Cisco Systems, Inc.
// Copyright (c) 2020-2024 Cisco Systems, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -56,8 +56,10 @@ func (i *updatePathClient) Request(ctx context.Context, request *networkservice.
return nil, err
}

conn.Id = conn.Path.PathSegments[index].Id
conn.Path.Index = index
if conn.GetPath() != nil && len(conn.GetPath().GetPathSegments()) > int(index) {
conn.Id = conn.GetPath().GetPathSegments()[index].Id
conn.GetPath().Index = index
}

return conn, nil
}
Expand Down
Loading