Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove forced delay for linux interface notifications #1742

Merged
merged 4 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions plugins/configurator/configurator.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,15 @@ func (svc *configuratorServer) Update(ctx context.Context, req *pb.UpdateRequest
}

if req.WaitDone {
waitStart := time.Now()
var pendingKeys []string
for _, res := range results {
if res.Status.GetState() == kvscheduler.ValueState_PENDING {
pendingKeys = append(pendingKeys, res.Key)
}
}
if len(pendingKeys) > 0 {
waitStart := time.Now()
svc.log.Infof("waiting for %d pending keys to be done", len(pendingKeys))
svc.log.Infof("waiting for %d pending keys", len(pendingKeys))
for len(pendingKeys) > 0 {
select {
case <-time.After(waitDoneCheckPendingPeriod):
Expand All @@ -144,10 +144,10 @@ func (svc *configuratorServer) Update(ctx context.Context, req *pb.UpdateRequest
return nil, ctx.Err()
}
}
svc.log.Infof("finished waiting for pending keys to be done (took %v)", time.Since(waitStart))
} else {
svc.log.Debugf("no pendings keys to wait for")
}
svc.log.Infof("finished waiting for done (took %v)", time.Since(waitStart))
}

svc.log.Debugf("config update finished with %d results", len(results))
Expand Down
84 changes: 18 additions & 66 deletions plugins/linux/ifplugin/descriptor/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"strings"
"sync"
"time"

"github.com/golang/protobuf/proto"
prototypes "github.com/golang/protobuf/ptypes/empty"
Expand All @@ -35,11 +34,6 @@ const (
// InterfaceWatcherName is the name of the descriptor watching Linux interfaces
// in the default namespace.
InterfaceWatcherName = "linux-interface-watcher"

// notificationDelay specifies how long to delay notification when interface changes.
// Typically interface is created in multiple stages and we do not want to notify
// scheduler about intermediate states.
notificationDelay = 500 * time.Millisecond
)

// InterfaceWatcher watches default namespace for newly added/removed Linux interfaces.
Expand All @@ -58,9 +52,6 @@ type InterfaceWatcher struct {
ifacesMu sync.Mutex
ifaces map[string]struct{}

// interface changes delayed to give Linux time to "finalize" them
pendingIntfs map[string]bool // interface name -> exists?

// conditional variable to check if the list of interfaces is in-sync with
// Linux network stack
intfsInSync bool
Expand All @@ -74,13 +65,12 @@ type InterfaceWatcher struct {
// NewInterfaceWatcher creates a new instance of the Interface Watcher.
func NewInterfaceWatcher(kvscheduler kvs.KVScheduler, ifHandler linuxcalls.NetlinkAPI, log logging.PluginLogger) *InterfaceWatcher {
descriptor := &InterfaceWatcher{
log: log.NewLogger("if-watcher"),
kvscheduler: kvscheduler,
ifHandler: ifHandler,
ifaces: make(map[string]struct{}),
pendingIntfs: make(map[string]bool),
notifCh: make(chan netlink.LinkUpdate),
doneCh: make(chan struct{}),
log: log.NewLogger("if-watcher"),
kvscheduler: kvscheduler,
ifHandler: ifHandler,
ifaces: make(map[string]struct{}),
notifCh: make(chan netlink.LinkUpdate),
doneCh: make(chan struct{}),
}
descriptor.intfsInSyncCond = sync.NewCond(&descriptor.ifacesMu)
descriptor.ctx, descriptor.cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -185,59 +175,14 @@ func (w *InterfaceWatcher) processLinkNotification(linkUpdate netlink.LinkUpdate
defer w.ifacesMu.Unlock()

ifName := linkUpdate.Attrs().Name
isEnabled := linkUpdate.Attrs().OperState != netlink.OperDown &&
linkUpdate.Attrs().OperState != netlink.OperNotPresent

_, isPendingNotif := w.pendingIntfs[ifName]
if isPendingNotif {
// notification for this interface is already scheduled, just update the state
w.pendingIntfs[ifName] = isEnabled
return
}
isUp := isLinkUp(linkUpdate)

if !w.needsUpdate(ifName, isEnabled) {
if !w.needsUpdate(ifName, isUp) {
// ignore notification if the interface admin status remained the same
return
}

if isEnabled {
// do not notify until interface is truly finished
w.pendingIntfs[ifName] = true
w.wg.Add(1)
go w.delayNotification(ifName)
return
}

// notification about removed interface is propagated immediately
w.notifyScheduler(ifName, false)
}

// delayNotification delays notification about enabled interface - typically
// interface is created in multiple stages and we do not want to notify scheduler
// about intermediate states.
func (w *InterfaceWatcher) delayNotification(ifName string) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't remember the exact scenario(s), but the reason for this was that newly created interface would send multiple UP/DOWN notifications, as if it was being created in multiple steps (causing dependent stuff to be created/deleted multiple times). Actually I think it is TAP that in VPP is being created through multiple syscalls. Anyway, let's get rid of the timeout and see if any issues appear (if yes, we can do this for example for VPP TAP, but not for VETH).
Btw. I don't think pendingIntfs map and running applyDelayedNotification as go routine is needed anymore. Also "applyDelayedNotification" is probably not a good name anymore.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't remember the exact scenario(s), but the reason for this was that newly created interface would send multiple UP/DOWN notifications, as if it was being created in multiple steps (causing dependent stuff to be created/deleted multiple times).

For simple case of VETHs+AFPACKET I see several notifications, most likely because of various things configured on the interfaces (alias, namespace, peer, ...) and these dont see to cause issues for me. Running more e2e tests to confirm this though.

Actually I think it is TAP that in VPP is being created through multiple syscalls.

Will run e2e tests for TAPs as well.

Anyway, let's get rid of the timeout and see if any issues appear (if yes, we can do this for example for VPP TAP, but not for VETH).

I think we will need to find a way to avoid sleeps/delays totally and somehow figure out which inter-states to ignore, because these sleeps make configuration of interfaces veeeery slow (see #1739)

Btw. I don't think pendingIntfs map and running applyDelayedNotification as go routine is needed anymore. Also "applyDelayedNotification" is probably not a good name anymore.

Will fix that before merge, just wanted travis to run tests on this.

defer w.wg.Done()

select {
case <-w.ctx.Done():
return
case <-time.After(notificationDelay):
w.applyDelayedNotification(ifName)
}
}

// applyDelayedNotification applies delayed interface notification.
func (w *InterfaceWatcher) applyDelayedNotification(ifName string) {
w.ifacesMu.Lock()
defer w.ifacesMu.Unlock()

// in the meantime the status may have changed and may not require update anymore
isEnabled := w.pendingIntfs[ifName]
if w.needsUpdate(ifName, isEnabled) {
w.notifyScheduler(ifName, isEnabled)
}

delete(w.pendingIntfs, ifName)
w.notifyScheduler(ifName, isUp)
}

// notifyScheduler notifies scheduler about interface change.
Expand All @@ -251,14 +196,21 @@ func (w *InterfaceWatcher) notifyScheduler(ifName string, enabled bool) {
delete(w.ifaces, ifName)
}

w.kvscheduler.PushSBNotification(kvs.KVWithMetadata{
if err := w.kvscheduler.PushSBNotification(kvs.KVWithMetadata{
Key: ifmodel.InterfaceHostNameKey(ifName),
Value: value,
Metadata: nil,
})
}); err != nil {
w.log.Warnf("pushing SB notification failed: %v", err)
}
}

func (w *InterfaceWatcher) needsUpdate(ifName string, isEnabled bool) bool {
_, wasEnabled := w.ifaces[ifName]
return isEnabled != wasEnabled
}

func isLinkUp(update netlink.LinkUpdate) bool {
return update.Attrs().OperState != netlink.OperDown &&
update.Attrs().OperState != netlink.OperNotPresent
}
1 change: 1 addition & 0 deletions plugins/linux/nsplugin/ns_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ func (p *NsPlugin) getOrCreateNs(ctx nsLinuxcalls.NamespaceMgmtCtx, ns *nsmodel.
case nsmodel.NetNamespace_NSID:
nsHandle, err = p.sysHandler.GetNamespaceFromName(ns.Reference)
if err != nil {
p.Log.Warnf("GetNamespaceFromName %s failed: %v", ns.Reference, err)
// Create named namespace if it doesn't exist yet.
_, err = p.namedNsHandler.CreateNamedNetNs(ctx, ns.Reference)
if err != nil {
Expand Down