diff --git a/felix/bpf/attach.go b/felix/bpf/attach.go index 9ac79f45ad4..49ba857d3f6 100644 --- a/felix/bpf/attach.go +++ b/felix/bpf/attach.go @@ -74,6 +74,10 @@ func (ap *AttachPoint) PolicyIdx(family int) int { return -1 } +type AttachResult interface { + ProgID() int +} + // AlreadyAttachedProg checks that the program we are going to attach has the // same parameters as what we remembered about the currently attached. func AlreadyAttachedProg(a AttachPointInfo, object string, id int) (bool, error) { diff --git a/felix/bpf/bpf_defs.go b/felix/bpf/bpf_defs.go index f6730dfe8d9..a00029ec2c0 100644 --- a/felix/bpf/bpf_defs.go +++ b/felix/bpf/bpf_defs.go @@ -32,6 +32,10 @@ func (f ProgFD) Close() error { return unix.Close(int(f)) } +func (f ProgFD) FD() uint32 { + return uint32(f) +} + type ProgResult struct { RC int32 Duration time.Duration diff --git a/felix/bpf/jump/map.go b/felix/bpf/jump/map.go index 18ea0ded2de..57030406283 100644 --- a/felix/bpf/jump/map.go +++ b/felix/bpf/jump/map.go @@ -17,7 +17,6 @@ package jump import ( "encoding/binary" - "github.com/projectcalico/calico/felix/bpf" "github.com/projectcalico/calico/felix/bpf/maps" ) @@ -58,8 +57,8 @@ func Key(idx int) []byte { return k[:] } -func Value(fd bpf.ProgFD) []byte { +func Value(fd uint32) []byte { var v [4]byte - binary.LittleEndian.PutUint32(v[:], uint32(fd)) + binary.LittleEndian.PutUint32(v[:], fd) return v[:] } diff --git a/felix/bpf/libbpf/libbpf.go b/felix/bpf/libbpf/libbpf.go index 13f91b86c5d..7e493e1ea72 100644 --- a/felix/bpf/libbpf/libbpf.go +++ b/felix/bpf/libbpf/libbpf.go @@ -151,22 +151,35 @@ func (o *Obj) ProgramFD(secname string) (int, error) { return int(ret), nil } -func (o *Obj) AttachClassifier(secName, ifName string, ingress bool) (int, error) { +func QueryClassifier(ifindex, handle, pref int, ingress bool) (int, error) { + opts, err := C.bpf_tc_program_query(C.int(ifindex), C.int(handle), C.int(pref), C.bool(ingress)) + + return int(opts.prog_id), err +} + +func DetachClassifier(ifindex, handle, pref int, ingress bool) error { + _, err := C.bpf_tc_program_detach(C.int(ifindex), C.int(handle), C.int(pref), C.bool(ingress)) + + return err +} + +// AttachClassifier return the program id and pref and handle of the qdisc +func (o *Obj) AttachClassifier(secName, ifName string, ingress bool) (int, int, int, error) { cSecName := C.CString(secName) cIfName := C.CString(ifName) defer C.free(unsafe.Pointer(cSecName)) defer C.free(unsafe.Pointer(cIfName)) ifIndex, err := C.if_nametoindex(cIfName) if err != nil { - return -1, err + return -1, -1, -1, err } ret, err := C.bpf_tc_program_attach(o.obj, cSecName, C.int(ifIndex), C.bool(ingress)) if err != nil { - return -1, fmt.Errorf("error attaching tc program %w", err) + return -1, -1, -1, fmt.Errorf("error attaching tc program %w", err) } - return int(ret.prog_id), nil + return int(ret.prog_id), int(ret.priority), int(ret.handle), nil } func (o *Obj) AttachXDP(ifName, progName string, oldID int, mode uint) (int, error) { diff --git a/felix/bpf/libbpf/libbpf_api.h b/felix/bpf/libbpf/libbpf_api.h index 638aae60a57..fb149a08d6f 100644 --- a/felix/bpf/libbpf/libbpf_api.h +++ b/felix/bpf/libbpf/libbpf_api.h @@ -66,29 +66,51 @@ struct bpf_tc_opts bpf_tc_program_attach(struct bpf_object *obj, char *secName, return attach; } -int bpf_tc_query_iface (int ifIndex, struct bpf_tc_opts opts, int isIngress) { +void bpf_tc_program_detach(int ifindex, int handle, int pref, bool ingress) +{ + DECLARE_LIBBPF_OPTS(bpf_tc_hook, hook, + .ifindex = ifindex, + .attach_point = ingress ? BPF_TC_INGRESS : BPF_TC_EGRESS, + ); + DECLARE_LIBBPF_OPTS(bpf_tc_opts, opts, + .handle = handle, + .priority = pref, + ); + + set_errno(bpf_tc_detach(&hook, &opts)); +} + +struct bpf_tc_opts bpf_tc_program_query(int ifindex, int handle, int pref, bool ingress) +{ + DECLARE_LIBBPF_OPTS(bpf_tc_hook, hook, + .ifindex = ifindex, + .attach_point = ingress ? BPF_TC_INGRESS : BPF_TC_EGRESS, + ); + DECLARE_LIBBPF_OPTS(bpf_tc_opts, opts, + .handle = handle, + .priority = pref, + ); - DECLARE_LIBBPF_OPTS(bpf_tc_hook, hook, .attach_point = BPF_TC_EGRESS); - if (isIngress) { - hook.attach_point = BPF_TC_INGRESS; - } - hook.ifindex = ifIndex; - opts.prog_fd = opts.prog_id = opts.flags = 0; set_errno(bpf_tc_query(&hook, &opts)); - return opts.prog_id; + + return opts; } -void bpf_tc_create_qdisc (int ifIndex) { +void bpf_tc_create_qdisc(int ifIndex) +{ DECLARE_LIBBPF_OPTS(bpf_tc_hook, hook, .attach_point = BPF_TC_INGRESS); hook.ifindex = ifIndex; set_errno(bpf_tc_hook_create(&hook)); } -void bpf_tc_remove_qdisc (int ifIndex) { - DECLARE_LIBBPF_OPTS(bpf_tc_hook, hook, .attach_point = BPF_TC_EGRESS | BPF_TC_INGRESS); - hook.ifindex = ifIndex; +void bpf_tc_remove_qdisc(int ifindex) +{ + DECLARE_LIBBPF_OPTS(bpf_tc_hook, hook, + .attach_point = BPF_TC_EGRESS | BPF_TC_INGRESS, + .ifindex = ifindex, + ); + set_errno(bpf_tc_hook_destroy(&hook)); - return; } int bpf_update_jump_map(struct bpf_object *obj, char* mapName, char *progName, int progIndex) { diff --git a/felix/bpf/libbpf/libbpf_stub.go b/felix/bpf/libbpf/libbpf_stub.go index 61eba3b94c6..ae089f84f93 100644 --- a/felix/bpf/libbpf/libbpf_stub.go +++ b/felix/bpf/libbpf/libbpf_stub.go @@ -58,7 +58,15 @@ func (m *Map) NextMap() (*Map, error) { panic("LIBBPF syscall stub") } -func (o *Obj) AttachClassifier(secName, ifName string, ingress bool) (int, error) { +func QueryClassifier(ifindex, handle, pref int, ingress bool) (int, error) { + panic("LIBBPF syscall stub") +} + +func DetachClassifier(ifindex, handle, pref int, ingress bool) error { + panic("LIBBPF syscall stub") +} + +func (o *Obj) AttachClassifier(secName, ifName string, ingress bool) (int, int, int, error) { panic("LIBBPF syscall stub") } diff --git a/felix/bpf/tc/attach.go b/felix/bpf/tc/attach.go index b69b6fa4141..5ff296db2af 100644 --- a/felix/bpf/tc/attach.go +++ b/felix/bpf/tc/attach.go @@ -104,8 +104,26 @@ func (ap *AttachPoint) loadObject(ipVer int, file string) (*libbpf.Obj, error) { return obj, nil } +type AttachResult struct { + progId int + prio int + handle int +} + +func (ar AttachResult) ProgID() int { + return ar.progId +} + +func (ar AttachResult) Prio() int { + return ar.prio +} + +func (ar AttachResult) Handle() int { + return ar.handle +} + // AttachProgram attaches a BPF program from a file to the TC attach point -func (ap *AttachPoint) AttachProgram() (int, error) { +func (ap *AttachPoint) AttachProgram() (bpf.AttachResult, error) { logCxt := log.WithField("attachPoint", ap) // By now the attach type specific generic set of programs is loaded and we @@ -113,32 +131,34 @@ func (ap *AttachPoint) AttachProgram() (int, error) { // configuration further to the selected set of programs. binaryToLoad := path.Join(bpfdefs.ObjectDir, "tc_preamble.o") + var res AttachResult + /* XXX we should remember the tag of the program and skip the rest if the tag is * still the same */ progsToClean, err := ap.listAttachedPrograms(true) if err != nil { - return -1, err + return nil, err } obj, err := ap.loadObject(4, binaryToLoad) if err != nil { logCxt.Warn("Failed to load program") - return -1, fmt.Errorf("object v4: %w", err) + return nil, fmt.Errorf("object v4: %w", err) } defer obj.Close() - progId, err := obj.AttachClassifier("cali_tc_preamble", ap.Iface, ap.Hook == hook.Ingress) + res.progId, res.prio, res.handle, err = obj.AttachClassifier("cali_tc_preamble", ap.Iface, ap.Hook == hook.Ingress) if err != nil { logCxt.Warnf("Failed to attach to TC section cali_tc_preamble") - return -1, err + return nil, err } logCxt.Info("Program attached to TC.") if err := ap.detachPrograms(progsToClean); err != nil { - return -1, err + return nil, err } - return progId, nil + return res, nil } func (ap *AttachPoint) DetachProgram() error { @@ -305,16 +325,16 @@ func (ap *AttachPoint) IsAttached() (bool, error) { } // EnsureQdisc makes sure that qdisc is attached to the given interface -func EnsureQdisc(ifaceName string) error { +func EnsureQdisc(ifaceName string) (bool, error) { hasQdisc, err := HasQdisc(ifaceName) if err != nil { - return err + return false, err } if hasQdisc { log.WithField("iface", ifaceName).Debug("Already have a clsact qdisc on this interface") - return nil + return true, nil } - return libbpf.CreateQDisc(ifaceName) + return false, libbpf.CreateQDisc(ifaceName) } func HasQdisc(ifaceName string) (bool, error) { diff --git a/felix/bpf/ut/attach_test.go b/felix/bpf/ut/attach_test.go index 0dc37393281..5864fac1784 100644 --- a/felix/bpf/ut/attach_test.go +++ b/felix/bpf/ut/attach_test.go @@ -544,7 +544,7 @@ func BenchmarkAttachProgram(b *testing.B) { vethName, veth := createVeth() defer deleteLink(veth) - err := tc.EnsureQdisc(vethName) + _, err := tc.EnsureQdisc(vethName) Expect(err).NotTo(HaveOccurred()) ap := tc.AttachPoint{ diff --git a/felix/bpf/xdp/attach.go b/felix/bpf/xdp/attach.go index ec48dcba69c..5e392b50fc5 100644 --- a/felix/bpf/xdp/attach.go +++ b/felix/bpf/xdp/attach.go @@ -115,7 +115,13 @@ func ConfigureProgram(m *libbpf.Map, iface string, globalData *libbpf.XDPGlobalD return nil } -func (ap *AttachPoint) AttachProgram() (int, error) { +type AttachResult int + +func (ar AttachResult) ProgID() int { + return int(ar) +} + +func (ap *AttachPoint) AttachProgram() (bpf.AttachResult, error) { // By now the attach type specific generic set of programs is loaded and we // only need to load and configure the preamble that will pass the // configuration further to the selected set of programs. @@ -123,7 +129,7 @@ func (ap *AttachPoint) AttachProgram() (int, error) { obj, err := libbpf.OpenObject(binaryToLoad) if err != nil { - return -1, err + return nil, err } defer obj.Close() @@ -137,14 +143,14 @@ func (ap *AttachPoint) AttachProgram() (int, error) { globals.Jumps[tcdefs.ProgIndexPolicy] = uint32(ap.PolicyIdx(4)) if err := ConfigureProgram(m, ap.Iface, &globals); err != nil { - return -1, err + return nil, err } continue } // TODO: We need to set map size here like tc. pinDir := bpf.MapPinDir(m.Type(), m.Name(), ap.Iface, hook.XDP) if err := m.SetPinPath(path.Join(pinDir, m.Name())); err != nil { - return -1, fmt.Errorf("error pinning map %s: %w", m.Name(), err) + return nil, fmt.Errorf("error pinning map %s: %w", m.Name(), err) } } @@ -152,18 +158,18 @@ func (ap *AttachPoint) AttachProgram() (int, error) { progID, isAttached := ap.AlreadyAttached(binaryToLoad) if isAttached { ap.Log().Infof("Programs already attached, skip reattaching %s", binaryToLoad) - return progID, nil + return AttachResult(progID), nil } ap.Log().Infof("Continue with attaching BPF program %s", binaryToLoad) if err := obj.Load(); err != nil { ap.Log().Warn("Failed to load program") - return -1, fmt.Errorf("error loading program: %w", err) + return nil, fmt.Errorf("error loading program: %w", err) } oldID, err := ap.ProgramID() if err != nil { - return -1, fmt.Errorf("failed to get the attached XDP program ID: %w", err) + return nil, fmt.Errorf("failed to get the attached XDP program ID: %w", err) } attachmentSucceeded := false @@ -182,11 +188,11 @@ func (ap *AttachPoint) AttachProgram() (int, error) { } if !attachmentSucceeded { - return -1, fmt.Errorf("failed to attach XDP program with program name %v to interface %v", + return nil, fmt.Errorf("failed to attach XDP program with program name %v to interface %v", ap.ProgramName(), ap.Iface) } - return progID, nil + return AttachResult(progID), nil } func (ap *AttachPoint) DetachProgram() error { diff --git a/felix/dataplane/linux/bpf_ep_mgr.go b/felix/dataplane/linux/bpf_ep_mgr.go index 81512f803ba..1d2762932da 100644 --- a/felix/dataplane/linux/bpf_ep_mgr.go +++ b/felix/dataplane/linux/bpf_ep_mgr.go @@ -119,7 +119,7 @@ type attachPoint interface { IfaceName() string HookName() hook.Hook IsAttached() (bool, error) - AttachProgram() (int, error) + AttachProgram() (bpf.AttachResult, error) DetachProgram() error Log() *log.Entry PolicyIdx(int) int @@ -131,11 +131,16 @@ type attachPointWithPolicyJumps interface { PolicyDenyJumpIdx(int) int } +type fileDescriptor interface { + FD() uint32 + Close() error +} + type bpfDataplane interface { ensureStarted() - ensureProgramAttached(ap attachPoint) error + ensureProgramAttached(ap attachPoint) (bpf.AttachResult, error) ensureNoProgram(ap attachPoint) error - ensureQdisc(iface string) error + ensureQdisc(iface string) (bool, error) ensureBPFDevices() error updatePolicyProgram(rules polprog.Rules, polDir string, ap attachPoint) error removePolicyProgram(ap attachPoint) error @@ -145,13 +150,15 @@ type bpfDataplane interface { delRoute(ip.V4CIDR) ruleMatchID(dir, action, owner, name string, idx int) polprog.RuleMatchID loadDefaultPolicies() error - loadTCLogFilter(ap *tc.AttachPoint) (bpf.ProgFD, int, error) + loadTCLogFilter(ap *tc.AttachPoint) (fileDescriptor, int, error) + interfaceByIndex(int) (*net.Interface, error) + queryClassifier(int, int, int, bool) (int, error) } type hasLoadPolicyProgram interface { loadPolicyProgram(progName string, ipFamily proto.IPVersion, rules polprog.Rules, progsMap maps.Map, opts ...polprog.Option) ( - bpf.ProgFD, asm.Insns, error) + fileDescriptor, asm.Insns, error) } type bpfInterface struct { @@ -185,10 +192,27 @@ func (i bpfInterfaceInfo) ifaceIsUp() bool { return i.isUP } +type ifaceReadiness int + +const ( + ifaceNotReady ifaceReadiness = iota + ifaceIsReady + // We know it was ready at some point in time and we + // assume it still is, but we need to reassure outselves. + ifaceIsReadyNotAssured +) + type bpfInterfaceState struct { policyIdx [hook.Count]int filterIdx [hook.Count]int - isReady bool + readiness ifaceReadiness + qdisc qDiscInfo +} + +type qDiscInfo struct { + valid bool + prio int + handle int } type ctlbWorkaroundMode int @@ -258,7 +282,7 @@ type bpfEndpointManager struct { loadPolicyProgramFn func(progName string, ipFamily proto.IPVersion, rules polprog.Rules, progsMap maps.Map, opts ...polprog.Option) ( - bpf.ProgFD, asm.Insns, error) + fileDescriptor, asm.Insns, error) updatePolicyProgramFn func(rules polprog.Rules, polDir string, ap attachPoint) error // HEP processing. @@ -709,7 +733,7 @@ func (m *bpfEndpointManager) updateIfaceStateMap(name string, iface *bpfInterfac if m.isWorkloadIface(name) { flags |= ifstate.FlgWEP } - if iface.dpState.isReady { + if iface.dpState.readiness != ifaceNotReady { flags |= ifstate.FlgReady } v := ifstate.NewValue(flags, name, @@ -877,7 +901,7 @@ func (m *bpfEndpointManager) onInterfaceUpdate(update *ifaceStateUpdate) { delete(m.hostIfaceToEpMap, update.Name) } m.deleteIfaceCounters(update.Name, iface.info.ifIndex) - iface.dpState.isReady = false + iface.dpState.readiness = ifaceNotReady iface.info.isUP = false m.updateIfaceStateMap(update.Name, iface) iface.info.ifIndex = 0 @@ -1039,6 +1063,10 @@ func jumpMapDeleteEntry(m maps.Map, idx int) error { return nil } +func (m *bpfEndpointManager) interfaceByIndex(ifindex int) (*net.Interface, error) { + return net.InterfaceByIndex(ifindex) +} + func (m *bpfEndpointManager) syncIfStateMap() { palloc := set.New[int]() xdpPalloc := set.New[int]() @@ -1048,7 +1076,7 @@ func (m *bpfEndpointManager) syncIfStateMap() { m.ifStateMap.IterDataplaneCache(func(k ifstate.Key, v ifstate.Value) { ifindex := int(k.IfIndex()) - netiface, err := net.InterfaceByIndex(ifindex) + netiface, err := m.dp.interfaceByIndex(ifindex) if err != nil { // "net" does not export the strings or err types :( if strings.Contains(err.Error(), "no such network interface") { @@ -1081,7 +1109,7 @@ func (m *bpfEndpointManager) syncIfStateMap() { iface.info.ifIndex = netiface.Index iface.info.isUP = true if v.Flags()&ifstate.FlgReady != 0 { - iface.dpState.isReady = true + iface.dpState.readiness = ifaceIsReadyNotAssured } } @@ -1239,6 +1267,10 @@ func (m *bpfEndpointManager) CompleteDeferredWork() error { m.startupOnce.Do(func() { m.dp.ensureStarted() + if err := m.ifStateMap.LoadCacheFromDataplane(); err != nil { + log.WithError(err).Fatal("Cannot load interface state map - essential for consistent operation.") + } + m.initUnknownIfaces.Iter(func(iface string) error { if ai, ok := m.initAttaches[iface]; ok { if err := m.cleanupOldAttach(iface, ai); err != nil { @@ -1254,16 +1286,19 @@ func (m *bpfEndpointManager) CompleteDeferredWork() error { // Makes sure that we delete entries for non-existing devices and preserve entries // for those that exists until we can make sure that they did (not) change. m.syncIfStateMap() + log.Info("BPF Interface state map synced.") if err := m.dp.loadDefaultPolicies(); err != nil { log.WithError(err).Warn("Failed to load default policies, some programs may default to DENY.") } + log.Info("Default BPF policy programs loaded.") m.initUnknownIfaces = nil if err := m.syncIfaceProperties(); err != nil { log.WithError(err).Warn("Failed to sync counters map with existing interfaces - some counters may have leaked.") } + log.Info("BPF counters synced.") }) m.applyProgramsToDirtyDataInterfaces() @@ -1404,7 +1439,7 @@ func (m *bpfEndpointManager) applyProgramsToDirtyDataInterfaces() { defer wg.Done() // Attach the qdisc first; it is shared between the directions. - err := m.dp.ensureQdisc(iface) + _, err := m.dp.ensureQdisc(iface) if err != nil { mutex.Lock() errs[iface] = err @@ -1474,7 +1509,11 @@ func (m *bpfEndpointManager) applyProgramsToDirtyDataInterfaces() { } m.withIface(iface, func(i *bpfInterface) bool { - i.dpState.isReady = isReady + if isReady { + i.dpState.readiness = ifaceIsReady + } else { + i.dpState.readiness = ifaceNotReady + } m.updateIfaceStateMap(iface, i) return false // no need to enforce dirty }) @@ -1564,43 +1603,20 @@ func (m *bpfEndpointManager) updateWEPsInDataplane() { } } -func (m *bpfEndpointManager) doApplyPolicy(ifaceName string) (bpfInterfaceState, error) { - startTime := time.Now() - - var state bpfInterfaceState - - // Other threads might be filling in jump map FDs in the map so take the lock. - m.ifacesLock.Lock() - var endpointID *proto.WorkloadEndpointID - var ifaceUp bool - m.withIface(ifaceName, func(iface *bpfInterface) (forceDirty bool) { - ifaceUp = iface.info.ifaceIsUp() - endpointID = iface.info.endpointID - state = iface.dpState - return false - }) - m.ifacesLock.Unlock() - - if !ifaceUp { - // Interface is gone, nothing to do. - log.WithField("ifaceName", ifaceName).Debug( - "Ignoring request to program interface that is not present.") - return state, nil - } - +func (m *bpfEndpointManager) wepStateFillJumps(state *bpfInterfaceState) error { var err error if state.policyIdx[hook.Ingress] == -1 { state.policyIdx[hook.Ingress], err = m.jumpMapAlloc.Get() if err != nil { - return state, err + return err } } if state.policyIdx[hook.Egress] == -1 { state.policyIdx[hook.Egress], err = m.jumpMapAlloc.Get() if err != nil { - return state, err + return err } } @@ -1608,24 +1624,63 @@ func (m *bpfEndpointManager) doApplyPolicy(ifaceName string) (bpfInterfaceState, if state.filterIdx[hook.Ingress] == -1 { state.filterIdx[hook.Ingress], err = m.jumpMapAlloc.Get() if err != nil { - return state, err + return err } } if state.filterIdx[hook.Egress] == -1 { state.filterIdx[hook.Egress], err = m.jumpMapAlloc.Get() if err != nil { - return state, err + return err } } } + return nil +} + +func (m *bpfEndpointManager) queryClassifier(ifindex, handle, prio int, ingress bool) (int, error) { + return libbpf.QueryClassifier(ifindex, handle, prio, ingress) +} + +func (m *bpfEndpointManager) doApplyPolicy(ifaceName string) (bpfInterfaceState, error) { + startTime := time.Now() + + var ( + state bpfInterfaceState + endpointID *proto.WorkloadEndpointID + ifaceUp bool + ifindex int + ) + + // Other threads might be filling in jump map FDs in the map so take the lock. + m.ifacesLock.Lock() + m.withIface(ifaceName, func(iface *bpfInterface) (forceDirty bool) { + ifaceUp = iface.info.ifaceIsUp() + ifindex = iface.info.ifIndex + endpointID = iface.info.endpointID + state = iface.dpState + return false + }) + m.ifacesLock.Unlock() + + if !ifaceUp { + // Interface is gone, nothing to do. + log.WithField("ifaceName", ifaceName).Debug( + "Ignoring request to program interface that is not present.") + return state, nil + } + + if err := m.wepStateFillJumps(&state); err != nil { + return state, err + } + // Otherwise, the interface appears to be present but we may or may not have an endpoint from the // datastore. If we don't have an endpoint then we'll attach a program to block traffic and we'll // get the jump map ready to insert the policy if the endpoint shows up. // Attach the qdisc first; it is shared between the directions. - err = m.dp.ensureQdisc(ifaceName) + existed, err := m.dp.ensureQdisc(ifaceName) if err != nil { if isLinkNotFoundError(err) { // Interface is gone, nothing to do. @@ -1635,10 +1690,19 @@ func (m *bpfEndpointManager) doApplyPolicy(ifaceName string) (bpfInterfaceState, } return state, err } + if !existed { + // Cannot be ready if the qdisc is not there so no program can be + // attached. Do the full attach! + state.readiness = ifaceNotReady + } + + var ( + ingressErr, egressErr error + ingressQdisc, egressQdisc qDiscInfo + wg sync.WaitGroup + wep *proto.WorkloadEndpoint + ) - var ingressErr, egressErr error - var wg sync.WaitGroup - var wep *proto.WorkloadEndpoint if endpointID != nil { wep = m.allWEPs[*endpointID] } @@ -1646,12 +1710,24 @@ func (m *bpfEndpointManager) doApplyPolicy(ifaceName string) (bpfInterfaceState, wg.Add(2) go func() { defer wg.Done() - ingressErr = m.attachWorkloadProgram(ifaceName, + readiness := state.readiness + if readiness == ifaceIsReady { + if _, err := m.dp.queryClassifier(ifindex, state.qdisc.handle, state.qdisc.prio, true); err != nil { + readiness = ifaceNotReady + } + } + ingressQdisc, ingressErr = m.wepApplyPolicyToDirection(readiness, ifaceName, state.policyIdx[hook.Ingress], state.filterIdx[hook.Ingress], wep, PolDirnIngress) }() go func() { defer wg.Done() - egressErr = m.attachWorkloadProgram(ifaceName, + readiness := state.readiness + if readiness == ifaceIsReady { + if _, err := m.dp.queryClassifier(ifindex, state.qdisc.handle, state.qdisc.prio, false); err != nil { + readiness = ifaceNotReady + } + } + egressQdisc, egressErr = m.wepApplyPolicyToDirection(readiness, ifaceName, state.policyIdx[hook.Egress], state.filterIdx[hook.Egress], wep, PolDirnEgress) }() wg.Wait() @@ -1663,11 +1739,17 @@ func (m *bpfEndpointManager) doApplyPolicy(ifaceName string) (bpfInterfaceState, return state, egressErr } + if egressQdisc != ingressQdisc { + return state, fmt.Errorf("ingress qdisc info (%v) does not equal egress qdist info (%v)", + ingressQdisc, egressQdisc) + } + applyTime := time.Since(startTime) log.WithFields(log.Fields{"timeTaken": applyTime, "ifaceName": ifaceName}). Info("Finished applying BPF programs for workload") - state.isReady = true + state.qdisc = ingressQdisc + state.readiness = ifaceIsReady return state, nil } @@ -1698,13 +1780,8 @@ func isLinkNotFoundError(err error) bool { var calicoRouterIP = net.IPv4(169, 254, 1, 1).To4() -func (m *bpfEndpointManager) attachWorkloadProgram(ifaceName string, policyIdx, filterIdx int, - endpoint *proto.WorkloadEndpoint, polDirection PolDirection) error { - - if m.hostIP == nil { - // Do not bother and wait - return fmt.Errorf("unknown host IP") - } +func (m *bpfEndpointManager) wepTCAttachPoint(ifaceName string, policyIdx, filterIdx int, + polDirection PolDirection) *tc.AttachPoint { ap := m.calculateTCAttachPoint(polDirection, ifaceName) ap.HostIP = m.hostIP @@ -1716,9 +1793,44 @@ func (m *bpfEndpointManager) attachWorkloadProgram(ifaceName string, policyIdx, ap.PolicyIdx4 = policyIdx ap.LogFilterIdx = filterIdx - err := m.dp.ensureProgramAttached(ap) + return ap +} + +func (m *bpfEndpointManager) wepApplyPolicyToDirection(readiness ifaceReadiness, ifaceName string, policyIdx, + filterIdx int, endpoint *proto.WorkloadEndpoint, polDirection PolDirection) (qDiscInfo, error) { + + var qdisc qDiscInfo + + if m.hostIP == nil { + // Do not bother and wait + return qdisc, fmt.Errorf("unknown host IP") + } + + ap := m.wepTCAttachPoint(ifaceName, policyIdx, filterIdx, polDirection) + + log.WithField("iface", ifaceName).Debugf("readiness: %d", readiness) + if readiness != ifaceIsReady { + res, err := m.wepAttachProgram(ap) + if err != nil { + return qdisc, fmt.Errorf("attaching program to wep: %w", err) + } + qdisc.valid = true + qdisc.prio = res.(tc.AttachResult).Prio() + qdisc.handle = res.(tc.AttachResult).Handle() + ap.Log().Info("Attached programs to the WEP") + } + + if err := m.wepApplyPolicy(ap, endpoint, polDirection); err != nil { + return qdisc, fmt.Errorf("applying policy to wep: %w", err) + } + + return qdisc, nil +} + +func (m *bpfEndpointManager) wepAttachProgram(ap *tc.AttachPoint) (bpf.AttachResult, error) { + res, err := m.dp.ensureProgramAttached(ap) if err != nil { - return err + return nil, err } if ap.LogLevel == "debug" { @@ -1727,6 +1839,12 @@ func (m *bpfEndpointManager) attachWorkloadProgram(ifaceName string, policyIdx, } } + return res, nil +} + +func (m *bpfEndpointManager) wepApplyPolicy(ap *tc.AttachPoint, + endpoint *proto.WorkloadEndpoint, polDirection PolDirection) error { + var profileIDs []string var tier *proto.TierInfo if endpoint != nil { @@ -1735,7 +1853,7 @@ func (m *bpfEndpointManager) attachWorkloadProgram(ifaceName string, policyIdx, tier = endpoint.Tiers[0] } } else { - log.WithField("name", ifaceName).Debug( + log.WithField("name", ap.IfaceName()).Debug( "Workload interface with no endpoint in datastore, installing default-drop program.") } @@ -1808,7 +1926,7 @@ func (m *bpfEndpointManager) attachDataIfaceProgram(ifaceName string, ep *proto. ap.PolicyIdx4 = policyIdx ap.LogFilterIdx = filterIdx - if err := m.dp.ensureProgramAttached(ap); err != nil { + if _, err := m.dp.ensureProgramAttached(ap); err != nil { return err } @@ -1844,7 +1962,7 @@ func (m *bpfEndpointManager) attachXDPProgram(ifaceName string, ep *proto.HostEn } if ep != nil && len(ep.UntrackedTiers) == 1 { - err := m.dp.ensureProgramAttached(ap) + _, err := m.dp.ensureProgramAttached(ap) if err != nil { return err } @@ -1856,7 +1974,10 @@ func (m *bpfEndpointManager) attachXDPProgram(ifaceName string, ep *proto.HostEn ForXDP: true, } ap.Log().Debugf("Rules: %v", rules) - return m.updatePolicyProgramFn(rules, "xdp", ap) + err = m.updatePolicyProgramFn(rules, "xdp", ap) + ap.Log().WithError(err).Debugf("Applied untracked policy hep=%v", ep.Name) + + return err } else { return m.dp.ensureNoProgram(ap) } @@ -2347,10 +2468,6 @@ func (m *bpfEndpointManager) ensureStarted() { if err != nil { log.WithError(err).Warn("Failed to list previously attached programs. We may not clean up some.") } - - if err := m.ifStateMap.LoadCacheFromDataplane(); err != nil { - log.WithError(err).Fatal("Cannot load interface state map - essential for consistent operation.") - } } func (m *bpfEndpointManager) ensureBPFDevices() error { @@ -2421,12 +2538,12 @@ func (m *bpfEndpointManager) ensureBPFDevices() error { return fmt.Errorf("failed to configure %s parameters: %w", bpfOutDev, err) } - err = m.ensureQdisc(bpfInDev) + _, err = m.ensureQdisc(bpfInDev) if err != nil { return fmt.Errorf("failed to set qdisc on %s: %w", bpfOutDev, err) } - err = m.ensureQdisc("lo") + _, err = m.ensureQdisc("lo") if err != nil { log.WithError(err).Fatalf("Failed to set qdisc on lo.") } @@ -2444,7 +2561,7 @@ func (m *bpfEndpointManager) ensureBPFDevices() error { return nil } -func (m *bpfEndpointManager) ensureQdisc(iface string) error { +func (m *bpfEndpointManager) ensureQdisc(iface string) (bool, error) { return tc.EnsureQdisc(iface) } @@ -2470,7 +2587,7 @@ func (m *bpfEndpointManager) loadTCObj(at hook.AttachType) (hook.Layout, error) } // Ensure TC/XDP program is attached to the specified interface. -func (m *bpfEndpointManager) ensureProgramAttached(ap attachPoint) error { +func (m *bpfEndpointManager) ensureProgramAttached(ap attachPoint) (bpf.AttachResult, error) { var err error if aptc, ok := ap.(*tc.AttachPoint); ok { @@ -2485,27 +2602,27 @@ func (m *bpfEndpointManager) ensureProgramAttached(ap attachPoint) error { at.Family = 4 if aptc.HookLayout4, err = m.loadTCObj(at); err != nil { - return fmt.Errorf("loading generic v4 tc hook program: %w", err) + return nil, fmt.Errorf("loading generic v4 tc hook program: %w", err) } // Load deafault policy before the real policy is created and loaded. switch at.DefaultPolicy() { case hook.DefPolicyAllow: err = maps.UpdateMapEntry(m.bpfmaps.JumpMap.MapFD(), - jump.Key(ap.PolicyIdx(4)), jump.Value(m.policyTcAllowFD)) + jump.Key(ap.PolicyIdx(4)), jump.Value(m.policyTcAllowFD.FD())) case hook.DefPolicyDeny: err = maps.UpdateMapEntry(m.bpfmaps.JumpMap.MapFD(), - jump.Key(ap.PolicyIdx(4)), jump.Value(m.policyTcDenyFD)) + jump.Key(ap.PolicyIdx(4)), jump.Value(m.policyTcDenyFD.FD())) } if err != nil { - return fmt.Errorf("failed to set default policy: %w", err) + return nil, fmt.Errorf("failed to set default policy: %w", err) } if aptc.IPv6Enabled { at.Family = 6 if aptc.HookLayout6, err = m.loadTCObj(at); err != nil { - return fmt.Errorf("loading generic v6 tc hook program: %w", err) + return nil, fmt.Errorf("loading generic v6 tc hook program: %w", err) } } @@ -2517,15 +2634,13 @@ func (m *bpfEndpointManager) ensureProgramAttached(ap attachPoint) error { pm := m.bpfmaps.XDPProgramsMap.(*hook.ProgramsMap) if apxdp.HookLayout, err = pm.LoadObj(at); err != nil { - return fmt.Errorf("loading generic xdp hook program: %w", err) + return nil, fmt.Errorf("loading generic xdp hook program: %w", err) } } else { - return fmt.Errorf("unknown attach type") + return nil, fmt.Errorf("unknown attach type") } - _, err = ap.AttachProgram() - - return err + return ap.AttachProgram() } // Ensure that the specified attach point does not have our program. @@ -2610,7 +2725,22 @@ func (m *bpfEndpointManager) updatePolicyProgram(rules polprog.Rules, polDir str for _, ipFamily := range ipVersions { progName := policyProgramName(ap.IfaceName(), polDir, ipFamily) - insns, err := m.doUpdatePolicyProgram(ap, progName, rules, ipFamily) + + var opts []polprog.Option + if apj, ok := ap.(attachPointWithPolicyJumps); ok { + allow := apj.PolicyAllowJumpIdx(int(ipFamily)) + if allow == -1 { + return fmt.Errorf("no allow jump index") + } + + deny := apj.PolicyDenyJumpIdx(int(ipFamily)) + if deny == -1 { + return fmt.Errorf("no deny jump index") + } + opts = append(opts, polprog.WithAllowDenyJumps(allow, deny)) + } + insns, err := m.doUpdatePolicyProgram(ap.HookName(), progName, + ap.PolicyIdx(int(ipFamily)), rules, ipFamily, opts...) perr := m.writePolicyDebugInfo(insns, ap.IfaceName(), ipFamily, polDir, ap.HookName(), err) if perr != nil { log.WithError(perr).Warn("error writing policy debug information") @@ -2622,17 +2752,17 @@ func (m *bpfEndpointManager) updatePolicyProgram(rules polprog.Rules, polDir str return nil } -func (m *bpfEndpointManager) loadTCLogFilter(ap *tc.AttachPoint) (bpf.ProgFD, int, error) { +func (m *bpfEndpointManager) loadTCLogFilter(ap *tc.AttachPoint) (fileDescriptor, int, error) { logFilter, err := filter.New(ap.Type, 64, ap.LogFilter, m.bpfmaps.ProgramsMap.MapFD()) if err != nil { - return 0, 0, err + return nil, 0, err } fd, err := bpf.LoadBPFProgramFromInsns(logFilter, "calico_log_filter", "Apache-2.0", uint32(unix.BPF_PROG_TYPE_SCHED_CLS)) if err != nil { - return 0, 0, fmt.Errorf("failed to load BPF log filter program: %w", err) + return nil, 0, fmt.Errorf("failed to load BPF log filter program: %w", err) } return fd, ap.LogFilterIdx, nil @@ -2646,8 +2776,8 @@ func (m *bpfEndpointManager) updateLogFilter(ap attachPoint) error { return err } defer fd.Close() - if err := m.bpfmaps.JumpMap.Update(jump.Key(idx), jump.Value(fd)); err != nil { - return fmt.Errorf("failed to update %s policy jump map [%d]=%d: %w", ap.HookName(), idx, fd, err) + if err := m.bpfmaps.JumpMap.Update(jump.Key(idx), jump.Value(fd.FD())); err != nil { + return fmt.Errorf("failed to update %s policy jump map [%d]=%d: %w", ap.HookName(), idx, fd.FD(), err) } ap.Log().Debugf("Loaded filter at %d", idx) @@ -2669,7 +2799,7 @@ func policyProgramName(iface, polDir string, ipFamily proto.IPVersion) string { func (m *bpfEndpointManager) loadPolicyProgram(progName string, ipFamily proto.IPVersion, rules polprog.Rules, progsMap maps.Map, opts ...polprog.Option) ( - bpf.ProgFD, asm.Insns, error) { + fileDescriptor, asm.Insns, error) { pg := polprog.NewBuilder(m.ipSetIDAlloc, m.bpfmaps.IpsetsMap.MapFD(), m.bpfmaps.StateMap.MapFD(), progsMap.MapFD(), opts...) @@ -2678,7 +2808,7 @@ func (m *bpfEndpointManager) loadPolicyProgram(progName string, } insns, err := pg.Instructions(rules) if err != nil { - return 0, nil, fmt.Errorf("failed to generate policy bytecode v%v: %w", ipFamily, err) + return nil, nil, fmt.Errorf("failed to generate policy bytecode v%v: %w", ipFamily, err) } progType := unix.BPF_PROG_TYPE_SCHED_CLS if rules.ForXDP { @@ -2686,39 +2816,24 @@ func (m *bpfEndpointManager) loadPolicyProgram(progName string, } progFD, err := bpf.LoadBPFProgramFromInsns(insns, progName, "Apache-2.0", uint32(progType)) if err != nil { - return 0, nil, fmt.Errorf("failed to load BPF policy program v%v: %w", ipFamily, err) + return nil, nil, fmt.Errorf("failed to load BPF policy program v%v: %w", ipFamily, err) } return progFD, insns, nil } -func (m *bpfEndpointManager) doUpdatePolicyProgram(ap attachPoint, progName string, rules polprog.Rules, - ipFamily proto.IPVersion) (asm.Insns, error) { +func (m *bpfEndpointManager) doUpdatePolicyProgram(hk hook.Hook, progName string, jmp int, rules polprog.Rules, + ipFamily proto.IPVersion, opts ...polprog.Option) (asm.Insns, error) { - opts := []polprog.Option{} if m.bpfPolicyDebugEnabled { opts = append(opts, polprog.WithPolicyDebugEnabled()) } progsMap := m.bpfmaps.ProgramsMap - if ap.HookName() == hook.XDP { + if hk == hook.XDP { progsMap = m.bpfmaps.XDPProgramsMap } - if apj, ok := ap.(attachPointWithPolicyJumps); ok { - allow := apj.PolicyAllowJumpIdx(int(ipFamily)) - if allow == -1 { - return nil, fmt.Errorf("no allow jump index") - } - - deny := apj.PolicyDenyJumpIdx(int(ipFamily)) - if deny == -1 { - return nil, fmt.Errorf("no deny jump index") - } - - opts = append(opts, polprog.WithAllowDenyJumps(allow, deny)) - } - progFD, insns, err := m.loadPolicyProgramFn(progName, ipFamily, rules, progsMap, opts...) if err != nil { return nil, err @@ -2732,25 +2847,16 @@ func (m *bpfEndpointManager) doUpdatePolicyProgram(ap attachPoint, progName stri } }() - if err := m.jumpMapUpdate(ap, int(ipFamily), progFD); err != nil { - return nil, err - } - - return insns, nil -} - -func (m *bpfEndpointManager) jumpMapUpdate(ap attachPoint, family int, fd bpf.ProgFD) error { jumpMap := m.bpfmaps.JumpMap - if ap.HookName() == hook.XDP { + if hk == hook.XDP { jumpMap = m.bpfmaps.XDPJumpMap } - jumpIdx := ap.PolicyIdx(int(family)) - if err := jumpMap.Update(jump.Key(jumpIdx), jump.Value(fd)); err != nil { - return fmt.Errorf("failed to update %s policy jump map [%d]=%d: %w", ap.HookName(), jumpIdx, fd, err) + if err := jumpMap.Update(jump.Key(jmp), jump.Value(progFD.FD())); err != nil { + return nil, fmt.Errorf("failed to update %s policy jump map [%d]=%d: %w", hk, jmp, progFD, err) } - return nil + return insns, nil } func (m *bpfEndpointManager) jumpMapDelete(h hook.Hook, idx int) error { diff --git a/felix/dataplane/linux/bpf_ep_mgr_test.go b/felix/dataplane/linux/bpf_ep_mgr_test.go index 762260343bc..41399eb7750 100644 --- a/felix/dataplane/linux/bpf_ep_mgr_test.go +++ b/felix/dataplane/linux/bpf_ep_mgr_test.go @@ -18,10 +18,10 @@ package intdataplane import ( "encoding/binary" + "errors" "fmt" "hash/fnv" "net" - "os" "regexp" "strconv" "sync" @@ -63,7 +63,9 @@ type mockDataplane struct { policy map[string]polprog.Rules routes map[ip.V4CIDR]struct{} - ensureStartedFn func() + ensureStartedFn func() + ensureQdiscFn func(string) (bool, error) + interfaceByIndexFn func(ifindex int) (*net.Interface, error) } func newMockDataplane() *mockDataplane { @@ -81,6 +83,14 @@ func (m *mockDataplane) ensureStarted() { } } +func (m *mockDataplane) interfaceByIndex(ifindex int) (*net.Interface, error) { + if m.interfaceByIndexFn != nil { + return m.interfaceByIndexFn(ifindex) + } + + return nil, errors.New("no such network interface") +} + func (m *mockDataplane) ensureBPFDevices() error { return nil } @@ -89,10 +99,12 @@ func (m *mockDataplane) loadDefaultPolicies() error { return nil } -func (m *mockDataplane) ensureProgramAttached(ap attachPoint) error { +func (m *mockDataplane) ensureProgramAttached(ap attachPoint) (bpf.AttachResult, error) { m.mutex.Lock() defer m.mutex.Unlock() + var res tc.AttachResult // we don't care about the values + if apxdp, ok := ap.(*xdp.AttachPoint); ok { apxdp.HookLayout = hook.Layout{ hook.SubProgXDPAllowed: 123, @@ -102,11 +114,11 @@ func (m *mockDataplane) ensureProgramAttached(ap attachPoint) error { key := ap.IfaceName() + ":" + ap.HookName().String() if _, exists := m.progs[key]; exists { - return nil + return res, nil } m.lastProgID += 1 m.progs[key] = m.lastProgID - return nil + return res, nil } func (m *mockDataplane) ensureNoProgram(ap attachPoint) error { @@ -120,8 +132,11 @@ func (m *mockDataplane) ensureNoProgram(ap attachPoint) error { return nil } -func (m *mockDataplane) ensureQdisc(iface string) error { - return nil +func (m *mockDataplane) ensureQdisc(iface string) (bool, error) { + if m.ensureQdiscFn != nil { + return m.ensureQdiscFn(iface) + } + return false, nil } func (m *mockDataplane) updatePolicyProgram(rules polprog.Rules, polDir string, ap attachPoint) error { @@ -193,17 +208,15 @@ func (m *mockDataplane) ruleMatchID(dir, action, owner, name string, idx int) po return h.Sum64() } -func (m *mockDataplane) loadTCLogFilter(ap *tc.AttachPoint) (bpf.ProgFD, int, error) { - file, err := os.CreateTemp("/tmp", "test_file") - if err != nil { - return 0, 0, err - } +func (m *mockDataplane) queryClassifier(ifindex, handle, prio int, ingress bool) (int, error) { + return 0, nil +} - if err := os.Remove(file.Name()); err != nil { - return 0, 0, err - } +var fdCounter = uint32(1234) - return bpf.ProgFD(file.Fd()), ap.LogFilterIdx, nil +func (m *mockDataplane) loadTCLogFilter(ap *tc.AttachPoint) (fileDescriptor, int, error) { + fdCounter++ + return mockFD(fdCounter), ap.LogFilterIdx, nil } type mockProgMapDP struct { @@ -211,18 +224,21 @@ type mockProgMapDP struct { } func (m *mockProgMapDP) loadPolicyProgram(_ string, _ proto.IPVersion, _ polprog.Rules, _ maps.Map, _ ...polprog.Option) ( - bpf.ProgFD, asm.Insns, error) { + fileDescriptor, asm.Insns, error) { + fdCounter++ - file, err := os.CreateTemp("/tmp", "test_file") - if err != nil { - return 0, nil, err - } + return mockFD(fdCounter), []asm.Insn{{Comments: []string{"blah"}}}, nil +} - if err := os.Remove(file.Name()); err != nil { - return 0, nil, err - } +type mockFD uint32 + +func (f mockFD) Close() error { + log.WithField("fd", int(f)).Debug("Closing mockFD") + return nil +} - return bpf.ProgFD(file.Fd()), []asm.Insn{{Comments: []string{"blah"}}}, nil +func (f mockFD) FD() uint32 { + return uint32(f) } var _ = Describe("BPF Endpoint Manager", func() { @@ -1143,5 +1159,95 @@ var _ = Describe("BPF Endpoint Manager", func() { Expect(jumpMap.Contents).To(HaveLen(0)) Expect(xdpJumpMap.Contents).To(HaveLen(0)) }) + + It("should not update the wep log filter if only policy changes", func() { + dp.ensureQdiscFn = func(iface string) (bool, error) { + if iface == "cali12345" { + return true, nil + } + return false, nil + } + genIfaceUpdate("cali12345", ifacemonitor.StateUp, 15)() + genPolicy("default", "mypolicy")() + genWLUpdate("cali12345", "mypolicy")() + + err := bpfEpMgr.CompleteDeferredWork() + Expect(err).NotTo(HaveOccurred()) + + Expect(jumpMap.Contents).To(HaveLen(4)) // 2x policy and 2x filter + Expect(xdpJumpMap.Contents).To(HaveLen(0)) + + jumpCopyContents := make(map[string]string, len(jumpMap.Contents)) + for k, v := range jumpMap.Contents { + jumpCopyContents[k] = v + } + + genPolicy("default", "anotherpolicy")() + genWLUpdate("cali12345", "anotherpolicy")() + + err = bpfEpMgr.CompleteDeferredWork() + Expect(err).NotTo(HaveOccurred()) + + Expect(len(jumpCopyContents)).To(Equal(len(jumpMap.Contents))) + Expect(jumpCopyContents).NotTo(Equal(jumpMap.Contents)) + + changes := 0 + + for k, v := range jumpCopyContents { + if v != jumpMap.Contents[k] { + changes++ + } + } + + Expect(changes).To(Equal(2)) // only policies have changed + + // restart + + jumpCopyContents = make(map[string]string, len(jumpMap.Contents)) + for k, v := range jumpMap.Contents { + jumpCopyContents[k] = v + } + + dp = newMockDataplane() + mockDP = &mockProgMapDP{ + dp, + } + newBpfEpMgr() + + bpfEpMgr.bpfLogLevel = "debug" + bpfEpMgr.logFilters = map[string]string{"all": "tcp"} + + dp.interfaceByIndexFn = func(ifindex int) (*net.Interface, error) { + if ifindex == 15 { + return &net.Interface{ + Index: 15, + Name: "cali12345", + Flags: net.FlagUp, + }, nil + } + return nil, errors.New("no such network interface") + } + genPolicy("default", "anotherpolicy")() + genWLUpdate("cali12345", "anotherpolicy")() + + err = bpfEpMgr.CompleteDeferredWork() + Expect(err).NotTo(HaveOccurred()) + + Expect(len(jumpCopyContents)).To(Equal(len(jumpMap.Contents))) + Expect(jumpCopyContents).NotTo(Equal(jumpMap.Contents)) + + changes = 0 + + for k, v := range jumpCopyContents { + if v != jumpMap.Contents[k] { + changes++ + } + } + + // After a restart, even devices that are in ready state get both + // programs reapplied as the configuration of logfilters coul dhave + // changed. + Expect(changes).To(Equal(4)) + }) }) })