Skip to content

Commit

Permalink
VIP implementation in TAPA replaced by NSM IP and policy update
Browse files Browse the repository at this point in the history
* The initial NSM request does not contain any information about the ip
  context. Once the conduit connected (sucessful NSM request), the VIP
  watcher will call SetVIPs which will update (new request) the NSM
  connection (SrcIPs and policies).

* the initial SrcIPs are saved in the conduit struct (to not mix the
  target IPs and VIPs)

* The previous code has been removed
  • Loading branch information
LionelJouin committed Mar 30, 2022
1 parent 916aee7 commit ee80de0
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 121 deletions.
109 changes: 62 additions & 47 deletions pkg/ambassador/tap/conduit/conduit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"errors"
"fmt"
"net"
"strings"
"sync"

"github.com/networkservicemesh/api/pkg/api/networkservice"
Expand Down Expand Up @@ -53,8 +55,7 @@ type Conduit struct {
StreamFactory StreamFactory
connection *networkservice.Connection
mu sync.Mutex
vips []*virtualIP
tableID int
localIPs []string
}

// New is the constructor of Conduit.
Expand All @@ -76,8 +77,7 @@ func New(conduit *ambassadorAPI.Conduit,
NetworkServiceClient: networkServiceClient,
NetUtils: netUtils,
connection: nil,
vips: []*virtualIP{},
tableID: 1,
localIPs: []string{},
}
c.StreamFactory = stream.NewFactory(targetRegistryClient, stream.MaxNumberOfTargets, c)
c.StreamManager = NewStreamManager(configurationManagerClient, targetRegistryClient, streamRegistry, c.StreamFactory, PendingTime)
Expand Down Expand Up @@ -117,6 +117,7 @@ func (c *Conduit) Connect(ctx context.Context) error {
}
logrus.Infof("Conduit connected: %v", c.Conduit)
c.connection = connection
c.localIPs = c.connection.GetContext().GetIpContext().GetSrcIpAddrs()

c.Configuration.Watch()

Expand All @@ -132,9 +133,6 @@ func (c *Conduit) Disconnect(ctx context.Context) error {
logrus.Infof("Disconnect from conduit: %v", c.Conduit)
// Stops the configuration
c.Configuration.Stop()
// reset the VIPs related configuration
c.deleteVIPs(c.vips)
c.tableID = 1
var errFinal error
// Stop the stream manager (close the streams)
errFinal = c.StreamManager.Stop(ctx)
Expand Down Expand Up @@ -174,7 +172,7 @@ func (c *Conduit) GetConduit() *ambassadorAPI.Conduit {
// GetStreams returns the local IPs for this conduit
func (c *Conduit) GetIPs() []string {
if c.connection != nil {
return c.connection.GetContext().GetIpContext().GetSrcIpAddrs()
return c.localIPs
}
return []string{}
}
Expand All @@ -186,34 +184,65 @@ func (c *Conduit) SetVIPs(vips []string) error {
if !c.isConnected() {
return nil
}
currentVIPs := make(map[string]*virtualIP)
for _, vip := range c.vips {
currentVIPs[vip.prefix] = vip
// prepare SrcIpAddrs (IPs allocated by the proxy + VIPs)
c.connection.Context.IpContext.SrcIpAddrs = append(c.GetIPs(), vips...)
// prepare the routes (nexthops = proxy bridge IPs)
ipv4Nexthops := []*networkservice.Route{}
ipv6Nexthops := []*networkservice.Route{}
for _, nexthop := range c.getGateways() {
gw, _, err := net.ParseCIDR(nexthop)
if err != nil {
continue
}
route := &networkservice.Route{
NextHop: gw.String(),
}
if isIPv6(nexthop) {
route.Prefix = "::/0"
ipv6Nexthops = append(ipv6Nexthops, route)
} else {
route.Prefix = "0.0.0.0/0"
ipv4Nexthops = append(ipv4Nexthops, route)
}
}
// prepare the policies (only based on VIP address for now)
c.connection.Context.IpContext.Policies = []*networkservice.PolicyRoute{}
for _, vip := range vips {
if _, ok := currentVIPs[vip]; !ok {
newVIP, err := newVirtualIP(vip, c.tableID, c.NetUtils)
if err != nil {
logrus.Errorf("SimpleTarget: Error adding SourceBaseRoute: %v", err) // todo: err handling
continue
}
c.tableID++
c.vips = append(c.vips, newVIP)
for _, nexthop := range c.getGateways() {
err = newVIP.AddNexthop(nexthop)
if err != nil {
logrus.Errorf("Client: Error adding nexthop: %v", err) // todo: err handling
}
}
nexthops := ipv4Nexthops
if isIPv6(vip) {
nexthops = ipv6Nexthops
}
delete(currentVIPs, vip)
newPolicyRoute := &networkservice.PolicyRoute{
From: vip,
Routes: nexthops,
}
c.connection.Context.IpContext.Policies = append(c.connection.Context.IpContext.Policies, newPolicyRoute)
}
// delete remaining vips
vipsSlice := []*virtualIP{}
for _, vip := range currentVIPs {
vipsSlice = append(vipsSlice, vip)
var err error
// update the NSM connection
// TODO: retry if error returned?
c.connection, err = c.NetworkServiceClient.Request(context.TODO(),
&networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: c.connection.GetId(),
NetworkService: c.connection.GetNetworkService(),
Labels: c.connection.GetLabels(),
Payload: c.connection.GetPayload(),
Context: &networkservice.ConnectionContext{
IpContext: c.connection.GetContext().GetIpContext(),
},
},
MechanismPreferences: []*networkservice.Mechanism{
{
Cls: cls.LOCAL,
Type: kernelmech.MECHANISM,
},
},
})
if err != nil {
return fmt.Errorf("error updating the VIPs in conduit: %v - %v", c.Conduit, err)
}
c.deleteVIPs(vipsSlice)
logrus.Infof("VIPs in conduit updated: %v - %v", c.Conduit, vips)
return nil
}

Expand All @@ -226,22 +255,8 @@ func (c *Conduit) isConnected() bool {
return c.connection != nil
}

func (c *Conduit) deleteVIPs(vips []*virtualIP) {
vipsMap := make(map[string]*virtualIP)
for _, vip := range vips {
vipsMap[vip.prefix] = vip
}
for index := 0; index < len(c.vips); index++ {
vip := c.vips[index]
if _, ok := vipsMap[vip.prefix]; ok {
c.vips = append(c.vips[:index], c.vips[index+1:]...)
index--
err := vip.Delete()
if err != nil {
logrus.Errorf("Client: Error deleting vip: %v", err) // todo: err handling
}
}
}
func isIPv6(address string) bool {
return strings.Count(address, ":") >= 2
}

// TODO: Requires the IPs of the bridge
Expand Down
74 changes: 0 additions & 74 deletions pkg/ambassador/tap/conduit/vip.go

This file was deleted.

0 comments on commit ee80de0

Please sign in to comment.