Skip to content

Commit

Permalink
Added link_addrs
Browse files Browse the repository at this point in the history
  • Loading branch information
song-jiang committed Mar 2, 2025
1 parent d871b0b commit 32fade4
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 266 deletions.
164 changes: 26 additions & 138 deletions felix/dataplane/linux/endpoint_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import (

apiv3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3"
log "github.com/sirupsen/logrus"
"github.com/vishvananda/netlink"

"github.com/projectcalico/calico/felix/dataplane/common"
"github.com/projectcalico/calico/felix/environment"
"github.com/projectcalico/calico/felix/generictables"
"github.com/projectcalico/calico/felix/ifacemonitor"
"github.com/projectcalico/calico/felix/ip"
"github.com/projectcalico/calico/felix/iptables"
"github.com/projectcalico/calico/felix/linkaddrs"
"github.com/projectcalico/calico/felix/netlinkshim"
"github.com/projectcalico/calico/felix/netlinkshim/handlemgr"
"github.com/projectcalico/calico/felix/nftables"
Expand Down Expand Up @@ -212,6 +212,8 @@ type endpointManager struct {
newLocalBGPPeerIP string
needToCheckLocalBGPPeerIP bool

linkAddrsMgr *linkaddrs.LinkAddrsManager

needToCheckDispatchChains bool
needToCheckEndpointMarkChains bool

Expand Down Expand Up @@ -248,6 +250,7 @@ func newEndpointManager(
callbacks *common.Callbacks,
floatingIPsEnabled bool,
nft bool,
linkAddrsMgr *linkaddrs.LinkAddrsManager,
featureDetector environment.FeatureDetectorIface,
netlinkTimeout time.Duration,
) *endpointManager {
Expand All @@ -271,6 +274,7 @@ func newEndpointManager(
callbacks,
floatingIPsEnabled,
nft,
linkAddrsMgr,
featureDetector,
netlinkTimeout,
)
Expand All @@ -296,6 +300,7 @@ func newEndpointManagerWithShims(
callbacks *common.Callbacks,
floatingIPsEnabled bool,
nft bool,
linkAddrsMgr *linkaddrs.LinkAddrsManager,
featureDetector environment.FeatureDetectorIface,
netlinkTimeout time.Duration,
) *endpointManager {
Expand Down Expand Up @@ -379,6 +384,8 @@ func newEndpointManagerWithShims(
OnEndpointStatusUpdate: onWorkloadEndpointStatusUpdate,
callbacks: newEndpointManagerCallbacks(callbacks, ipVersion),
newNetlinkHandle: netlinkshim.NewRealNetlink,

linkAddrsMgr: linkAddrsMgr,
}

epManager.nl = handlemgr.NewHandleManager(
Expand Down Expand Up @@ -713,6 +720,7 @@ func (m *endpointManager) resolveWorkloadEndpoints() {
logCxt.Info("Workload removed, deleting old state.")
m.routeTable.SetRoutes(oldWorkload.Name, nil)
m.wlIfaceNamesToReconfigure.Discard(oldWorkload.Name)
m.linkAddrsMgr.RemoveLinkLocalAddress(oldWorkload.Name)
delete(m.activeWlIfaceNameToID, oldWorkload.Name)
if m.hasSourceSpoofingConfiguration(oldWorkload.Name) {
logCxt.Debugf("Removing RPF configuration for old workload %s", oldWorkload.Name)
Expand Down Expand Up @@ -770,6 +778,7 @@ func (m *endpointManager) resolveWorkloadEndpoints() {
}
m.routeTable.SetRoutes(oldWorkload.Name, nil)
m.wlIfaceNamesToReconfigure.Discard(oldWorkload.Name)
m.linkAddrsMgr.RemoveLinkLocalAddress(oldWorkload.Name)
delete(m.activeWlIfaceNameToID, oldWorkload.Name)
}
adminUp := workload.State == "active"
Expand Down Expand Up @@ -891,23 +900,11 @@ func (m *endpointManager) resolveWorkloadEndpoints() {
}

if m.needToCheckLocalBGPPeerIP {
var err error
// If LocalBGPPeerIP has been updated, we need to remove old peer IP from all workload interfaces.
m.needToCheckLocalBGPPeerIP = false
m.localBGPPeerIP = m.newLocalBGPPeerIP
// Reconfigure the interfaces of all active workload endpoints.
for ifaceName := range m.activeWlIfaceNameToID {
err = m.removeBGPPeerIPOnInterface(ifaceName, m.localBGPPeerIP)
if err != nil {
log.WithError(err).Warn("Failed to remove old peer ip from interface, will retry")
break
}
}

if err == nil {
m.needToCheckLocalBGPPeerIP = false
m.localBGPPeerIP = m.newLocalBGPPeerIP
// Reconfigure the interfaces of all active workload endpoints.
for ifaceName := range m.activeWlIfaceNameToID {
m.wlIfaceNamesToReconfigure.Add(ifaceName)
}
m.wlIfaceNamesToReconfigure.Add(ifaceName)
}
}

Expand Down Expand Up @@ -1682,21 +1679,19 @@ func (m *endpointManager) onBGPConfigUpdate(update *proto.GlobalBGPConfigUpdate)
}
}

func (m *endpointManager) ipToNetlinkAddr(ipString string) (*netlink.Addr, error) {
func (m *endpointManager) ipToIPNetString(ipString string) (string, error) {
if m.ipVersion == 4 {
ip, net, err := net.ParseCIDR(ipString + "/32")
_, _, err := net.ParseCIDR(ipString + "/32")
if err != nil {
return nil, err
return "", err
}
net.IP = ip
return &netlink.Addr{IPNet: net, Scope: int(netlink.SCOPE_LINK)}, nil
return ipString + "/32", nil
} else {
ip, net, err := net.ParseCIDR(ipString + "/128")
_, _, err := net.ParseCIDR(ipString + "/128")
if err != nil {
return nil, err
return "", err
}
net.IP = ip
return &netlink.Addr{IPNet: net, Scope: int(netlink.SCOPE_LINK)}, nil
return ipString + "/128", nil
}
}

Expand All @@ -1709,138 +1704,31 @@ func (m *endpointManager) ifaceIsForLocalBGPPeer(name string) bool {
return ep != nil && ep.LocalBgpPeer != nil && len(ep.LocalBgpPeer.BgpPeerName) != 0
}

func netlinkAddrsContains(addrs []netlink.Addr, ip string) bool {
parsedIP := net.ParseIP(ip)
if parsedIP == nil {
return false
}

for _, addr := range addrs {
if addr.IP.Equal(parsedIP) {
return true
}
}
return false
}

func (m *endpointManager) removeBGPPeerIPOnInterface(name string, peerIP string) error {
nl, err := m.nl.Handle()
if err != nil {
return fmt.Errorf("failed to connect to netlink")
}

// Remove local BGP peer IP from the inteface if it is present.
family := netlink.FAMILY_V4
if m.ipVersion == 6 {
family = netlink.FAMILY_V6
}

// Look up the interface.
link, err := lookupLink(nl, name)
if _, ok := err.(netlink.LinkNotFoundError); ok {
// The link has been removed. Address already gone.
return nil
} else if err != nil {
log.WithError(err).Warning("Failed to look up device link")
return err
}

addrs, err := nl.AddrList(link, family)
if err != nil {
// CNI may delete link at this point, pass it up.
log.WithError(err).Warning("Failed to list address on the link")
return err
}

if !netlinkAddrsContains(addrs, peerIP) {
return nil
}

log.WithField("iface", name).Debug("About to remove peer ip on device link")

addr, err := m.ipToNetlinkAddr(peerIP)
if err != nil {
log.WithError(err).Warning("Failed to get netlink addr")
return err
}

if err = nl.AddrDel(link, addr); err != nil {
// Only emit the following warning log if the link still exists.
if _, ok := err.(netlink.LinkNotFoundError); ok {
// The link has been removed. Address already gone.
return nil
} else if err != nil {
log.WithField("address", addr).WithError(err).Warning("Failed to remove host side address on workload interface")
}
return err
}

log.WithField("address", addr).Info("Removed host side address on workload interface")
return nil
}

func (m *endpointManager) ensureLocalBGPPeerIPOnInterface(name string) error {
logCtx := log.WithField("iface", name)
logCtx.Debug("Configure interface for local bpg peer role")

nl, err := m.nl.Handle()
if err != nil {
return fmt.Errorf("failed to connect to netlink")
}

if m.ifaceIsForLocalBGPPeer(name) {
if len(m.localBGPPeerIP) == 0 {
logCtx.Warning("no peer ip is defined trying to configure local BGP peer ip on interface")
return fmt.Errorf("interface belongs to a local BGP peer but peer IP is not defined yet.")
}

family := netlink.FAMILY_V4
if m.ipVersion == 6 {
family = netlink.FAMILY_V6
}

link, err := nl.LinkByName(name)
if err != nil {
// Presumably the link is not up yet. We will be called again when it is.
log.WithError(err).Warning("Failed to look up device link")
return err
}
addrs, err := nl.AddrList(link, family)
if err != nil {
// Not sure why this would happen, but pass it up.
logCtx.WithError(err).Warning("Failed to list address on the link")
return err
}

// Do nothing if the address is already configured.
if netlinkAddrsContains(addrs, m.localBGPPeerIP) {
return nil
}

addr, err := m.ipToNetlinkAddr(m.localBGPPeerIP)
ipNetString, err := m.ipToIPNetString(m.localBGPPeerIP)
if err != nil {
log.WithError(err).Warning("Failed to get netlink addr")
log.WithError(err).Warning("Failed to parse peer ip")
return err
}

if err = nl.AddrAdd(link, addr); err != nil {
if err := m.linkAddrsMgr.SetLinkLocalAddress(name, ipNetString); err != nil {
log.WithError(err).Warning("Failed to add peer ip")
return err
}
logCtx.WithFields(log.Fields{"address": addr}).Info("Assigned host side address to workload interface to set up local BGP peer")
logCtx.WithFields(log.Fields{"address": ipNetString}).Info("Assigned host side address to workload interface to set up local BGP peer")
} else {
err := m.removeBGPPeerIPOnInterface(name, m.localBGPPeerIP)
if err != nil {
log.WithError(err).Warning("Failed to remove peer ip")
return err
}
m.linkAddrsMgr.RemoveLinkLocalAddress(name)
}

logCtx.Debug("Completed configure local bgp role on device")
return nil
}

func lookupLink(nlHandle netlinkHandle, name string) (link netlink.Link, err error) {
link, err = nlHandle.LinkByName(name)
return
}
20 changes: 20 additions & 0 deletions felix/dataplane/linux/int_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
"github.com/projectcalico/calico/felix/iptables/cmdshim"
"github.com/projectcalico/calico/felix/jitter"
"github.com/projectcalico/calico/felix/labelindex"
"github.com/projectcalico/calico/felix/linkaddrs"
"github.com/projectcalico/calico/felix/logutils"
"github.com/projectcalico/calico/felix/netlinkshim"
"github.com/projectcalico/calico/felix/nftables"
Expand Down Expand Up @@ -326,6 +327,8 @@ type InternalDataplane struct {
vxlanParentCV6 chan string
vxlanFDBs []*vxlanfdb.VXLANFDB

linkAddrsManagers []*linkaddrs.LinkAddrsManager

wireguardManager *wireguardManager
wireguardManagerV6 *wireguardManager

Expand Down Expand Up @@ -1011,6 +1014,9 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
nftMaps = nftablesV4RootTable
}

linkAddrsManagerV4 := linkaddrs.New(4, config.RulesConfig.WorkloadIfacePrefixes, featureDetector, config.NetlinkTimeout)
dp.linkAddrsManagers = append(dp.linkAddrsManagers, linkAddrsManagerV4)

epManager := newEndpointManager(
rawTableV4,
mangleTableV4,
Expand All @@ -1029,6 +1035,7 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
callbacks,
config.FloatingIPsEnabled,
config.RulesConfig.NFTables,
linkAddrsManagerV4,
featureDetector,
config.NetlinkTimeout,
)
Expand Down Expand Up @@ -1152,6 +1159,9 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
nftMapsV6 = nftablesV6RootTable
}

linkAddrsManagerV6 := linkaddrs.New(6, config.RulesConfig.WorkloadIfacePrefixes, featureDetector, config.NetlinkTimeout)
dp.linkAddrsManagers = append(dp.linkAddrsManagers, linkAddrsManagerV6)

dp.RegisterManager(newEndpointManager(
rawTableV6,
mangleTableV6,
Expand All @@ -1170,6 +1180,7 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
callbacks,
config.FloatingIPsEnabled,
config.RulesConfig.NFTables,
linkAddrsManagerV6,
featureDetector,
config.NetlinkTimeout,
))
Expand Down Expand Up @@ -2465,6 +2476,15 @@ func (d *InternalDataplane) apply() {
}
}

// Update any linkAddrs entries.
for _, la := range d.linkAddrsManagers {
err := la.Apply()
if err != nil {
log.WithError(err).Warn("Failed to synchronize link addr entries, will retry...")
d.dataplaneNeedsSync = true
}
}

// Update the routing table in parallel with the other updates. We'll wait for it to finish
// before we return.
var routesWG sync.WaitGroup
Expand Down
Loading

0 comments on commit 32fade4

Please sign in to comment.