Skip to content

Commit

Permalink
[BPF] BPFExcludeIPsFromNAT allows excluding service IPs from being NATed
Browse files Browse the repository at this point in the history
The main use case is node-local dns cache. In iptables mode with k8s
provided kube-proxy, the cache installs such iptables rules that snatch
the traffic before the kube-proxy rules, which would do the NAT.

However, in ebpf NAT is done way before packets may hit iptables. Often
they do not hit iptables at all. Therefore, we need to tell our
kube-proxy, not to translate a service IP that we want to let dns cache
to process.

Unfortunatelly, we cannot annotate the service as the annotations are
not propagate to the kube-proxy code by the k8s front-end.

To setup a node-local dns cache, set BPFExcludeIPsFromNAT as follows:

kubedns=`kubectl get svc kube-dns -n kube-system -o jsonpath={.spec.clusterIP}`

BPFExcludeIPsFromNAT=$kubedns
  • Loading branch information
tomastigera committed Dec 15, 2023
1 parent 1c29421 commit b5c7cb6
Show file tree
Hide file tree
Showing 35 changed files with 274 additions and 19 deletions.
4 changes: 4 additions & 0 deletions api/pkg/apis/projectcalico/v3/felixconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,10 @@ type FelixConfigurationSpec struct {
// BPFDisableGROForIfaces is a regular expression that controls which interfaces Felix should disable the
// Generic Receive Offload [GRO] option. It should not match the workload interfaces (usually named cali...).
BPFDisableGROForIfaces string `json:"bpfDisableGROForIfaces,omitempty" validate:"omitempty,regexp"`
// BPFExcludeIPsFromNAT is a list of CIDRs that are to be excluded from NAT
// resolution so that host can handle them. A typicall usecase is node local
// DNS cache.
BPFExcludeIPsFromNAT *[]string `json:"bpfExcludeIPsFromNAT,omitempty" validate:"omitempty,cidrs"`

// RouteSource configures where Felix gets its routing information.
// - WorkloadIPs: use workload endpoints to construct routes.
Expand Down
9 changes: 9 additions & 0 deletions api/pkg/apis/projectcalico/v3/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions api/pkg/openapi/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion calicoctl/calicoctl/commands/crds/crds.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions felix/bpf-gpl/nat_lookup.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ static CALI_BPF_INLINE struct calico_nat_dest* calico_nat_lookup(ipv46_addr_t *i
}
__u32 count = nat_lv1_val->count;

if (nat_lv1_val->flags & NAT_FLG_NAT_EXCLUDE) {
*res = NAT_EXCLUDE;
return NULL;
}

if (from_tun) {
count = nat_lv1_val->local;
} else if (nat_lv1_val->flags & (NAT_FLG_INTERNAL_LOCAL | NAT_FLG_EXTERNAL_LOCAL)) {
Expand Down
2 changes: 2 additions & 0 deletions felix/bpf-gpl/nat_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ typedef enum calico_nat_lookup_result {
NAT_LOOKUP_ALLOW,
NAT_FE_LOOKUP_DROP,
NAT_NO_BACKEND,
NAT_EXCLUDE,
} nat_lookup_result;


Expand Down Expand Up @@ -58,6 +59,7 @@ struct calico_nat_value {

#define NAT_FLG_EXTERNAL_LOCAL 0x1
#define NAT_FLG_INTERNAL_LOCAL 0x2
#define NAT_FLG_NAT_EXCLUDE 0x4

#ifdef IPVER6
CALI_MAP_NAMED(cali_v6_nat_fe, cali_nat_fe, 3,
Expand Down
6 changes: 6 additions & 0 deletions felix/bpf-gpl/tc.c
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,12 @@ static CALI_BPF_INLINE void calico_tc_process_ct_lookup(struct cali_tc_ctx *ctx)
} else {
ctx->state->post_nat_ip_dst = ctx->state->ip_dst;
ctx->state->post_nat_dport = ctx->state->dport;
if (nat_res == NAT_EXCLUDE) {
/* We want such packets to go through the host namespace. The main
* usecase of this is node-local-dns.
*/
ctx->state->flags |= CALI_ST_SKIP_FIB;
}
}

syn_force_policy:
Expand Down
2 changes: 2 additions & 0 deletions felix/bpf/nat/maps.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,13 @@ func FrontendKeyFromBytes(b []byte) FrontendKeyInterface {
const (
NATFlgExternalLocal = 0x1
NATFlgInternalLocal = 0x2
NATFlgExclude = 0x4
)

var flgTostr = map[int]string{
NATFlgExternalLocal: "external-local",
NATFlgInternalLocal: "internal-local",
NATFlgExclude: "nat-exclude",
}

type FrontendValue [frontendValueSize]byte
Expand Down
8 changes: 6 additions & 2 deletions felix/bpf/proxy/kube-proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/projectcalico/calico/felix/bpf/bpfmap"
"github.com/projectcalico/calico/felix/bpf/maps"
"github.com/projectcalico/calico/felix/bpf/routes"
"github.com/projectcalico/calico/felix/ip"
)

// KubeProxy is a wrapper of Proxy that deals with higher level issue like
Expand All @@ -49,6 +50,8 @@ type KubeProxy struct {
rt *RTCache
opts []Option

excludedIPs *ip.CIDRTrie

dsrEnabled bool
}

Expand Down Expand Up @@ -128,7 +131,8 @@ func (kp *KubeProxy) run(hostIPs []net.IP) error {
withLocalNP = append(withLocalNP, podNPIPV6)
}

syncer, err := NewSyncer(kp.ipFamily, withLocalNP, kp.frontendMap, kp.backendMap, kp.affinityMap, kp.rt)
syncer, err := NewSyncer(kp.ipFamily, withLocalNP, kp.frontendMap, kp.backendMap, kp.affinityMap,
kp.rt, kp.excludedIPs)
if err != nil {
return errors.WithMessage(err, "new bpf syncer")
}
Expand All @@ -150,7 +154,7 @@ func (kp *KubeProxy) start() error {
withLocalNP = append(withLocalNP, podNPIPV6)
}

syncer, err := NewSyncer(kp.ipFamily, withLocalNP, kp.frontendMap, kp.backendMap, kp.affinityMap, kp.rt)
syncer, err := NewSyncer(kp.ipFamily, withLocalNP, kp.frontendMap, kp.backendMap, kp.affinityMap, kp.rt, kp.excludedIPs)
if err != nil {
return errors.WithMessage(err, "new bpf syncer")
}
Expand Down
6 changes: 3 additions & 3 deletions felix/bpf/proxy/lb_src_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func testfn(makeIPs func(ips []string) proxy.K8sServicePortOption) {
externalIP := makeIPs([]string{"35.0.0.2"})
twoExternalIPs := makeIPs([]string{"35.0.0.2", "45.0.1.2"})

s, _ := proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt)
s, _ := proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil)

svcKey := k8sp.ServicePortName{
NamespacedName: types.NamespacedName{
Expand Down Expand Up @@ -201,7 +201,7 @@ func testfn(makeIPs func(ips []string) proxy.K8sServicePortOption) {
externalIP,
proxy.K8sSvcWithLBSourceRangeIPs([]string{"35.0.1.2/24"}),
)
s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt)
s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil)
err := s.Apply(state)
Expect(err).NotTo(HaveOccurred())
Expect(svcs.m).To(HaveLen(3))
Expand All @@ -214,7 +214,7 @@ func testfn(makeIPs func(ips []string) proxy.K8sServicePortOption) {
v1.ProtocolTCP,
externalIP,
)
s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt)
s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil)
err := s.Apply(state)
Expect(err).NotTo(HaveOccurred())
Expect(svcs.m).To(HaveLen(2))
Expand Down
30 changes: 30 additions & 0 deletions felix/bpf/proxy/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
package proxy

import (
"fmt"
"time"

log "github.com/sirupsen/logrus"

"github.com/projectcalico/calico/felix/ip"
)

// Option defines Proxy options
Expand Down Expand Up @@ -82,3 +85,30 @@ func WithIPFamily(ipFamily int) Option {
return nil
}
}

var excludeIPsMatch = 1

func WithExcludedIPs(cidrs []string) Option {
return makeKubeProxyOption(func(kp *KubeProxy) error {
if kp.ipFamily == 0 {
return fmt.Errorf("ip family is not set")
}

kp.excludedIPs = ip.NewCIDRTrie()

for _, c := range cidrs {
cidr, err := ip.CIDRFromString(c)
if err != nil {
return fmt.Errorf("bad CIDR %s: %w", c, err)
}

if int(cidr.Version()) != kp.ipFamily {
continue
}

kp.excludedIPs.Update(cidr, &excludeIPsMatch)
}

return nil
})
}
1 change: 1 addition & 0 deletions felix/bpf/proxy/proxy_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func benchmarkProxyUpdates(b *testing.B, svcN, epsN int) {
&mock.DummyMap{},
&mock.DummyMap{},
proxy.NewRTCache(),
nil,
)
Expect(err).ShouldNot(HaveOccurred())

Expand Down
11 changes: 11 additions & 0 deletions felix/bpf/proxy/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ type Syncer struct {
newBackendValue func(addr net.IP, port uint16) nat.BackendValueInterface
affinityKeyFromBytes func([]byte) nat.AffinityKeyInterface
affinityValueFromBytes func([]byte) nat.AffinityValueInterface

excludedIPs *ip.CIDRTrie
}

type ipPort struct {
Expand Down Expand Up @@ -207,6 +209,7 @@ func uniqueIPs(ips []net.IP) []net.IP {
func NewSyncer(family int, nodePortIPs []net.IP,
frontendMap maps.MapWithExistsCheck, backendMap maps.MapWithExistsCheck,
affmap maps.Map, rt Routes,
excludedIPs *ip.CIDRTrie,
) (*Syncer, error) {

s := &Syncer{
Expand All @@ -217,6 +220,7 @@ func NewSyncer(family int, nodePortIPs []net.IP,
prevSvcMap: make(map[svcKey]svcInfo),
prevEpsMap: make(k8sp.EndpointsMap),
stop: make(chan struct{}),
excludedIPs: excludedIPs,
}

switch family {
Expand Down Expand Up @@ -906,6 +910,13 @@ func (s *Syncer) writeSvc(svc k8sp.ServicePort, svcID uint32, count, local int,
return err
}

if s.excludedIPs != nil {
_, v := s.excludedIPs.LPM(ip.CIDRFromNetIP(svc.ClusterIP()))
if v != nil {
flags |= nat.NATFlgExclude
}
}

affinityTimeo := uint32(0)
if svc.SessionAffinityType() == v1.ServiceAffinityClientIP {
affinityTimeo = uint32(svc.StickyMaxAgeSeconds())
Expand Down
2 changes: 2 additions & 0 deletions felix/bpf/proxy/syncer_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func runBenchmarkServiceUpdate(b *testing.B, svcCnt, epCnt int, mockMaps bool, o
&mock.DummyMap{},
&mock.DummyMap{},
NewRTCache(),
nil,
)
Expect(err).ShouldNot(HaveOccurred())
} else {
Expand All @@ -190,6 +191,7 @@ func runBenchmarkServiceUpdate(b *testing.B, svcCnt, epCnt int, mockMaps bool, o
&mock.DummyMap{},
&mock.DummyMap{},
NewRTCache(),
nil,
)
Expect(err).ShouldNot(HaveOccurred())
}
Expand Down
12 changes: 6 additions & 6 deletions felix/bpf/proxy/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var _ = Describe("BPF Syncer", func() {
nodeIPs := []net.IP{net.IPv4(192, 168, 0, 1), net.IPv4(10, 123, 0, 1)}
rt := proxy.NewRTCache()

s, _ := proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt)
s, _ := proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil)

svcKey := k8sp.ServicePortName{
NamespacedName: types.NamespacedName{
Expand Down Expand Up @@ -395,15 +395,15 @@ var _ = Describe("BPF Syncer", func() {
}))

By("resyncing after creating a new syncer with the same result", makestep(func() {
s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt)
s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil)
checkAfterResync()
}))

By("resyncing after creating a new syncer and delete stale entries", makestep(func() {
svcs.m[nat.NewNATKey(net.IPv4(5, 5, 5, 5), 1111, 6)] = nat.NewNATValue(0xdeadbeef, 2, 2, 0)
eps.m[nat.NewNATBackendKey(0xdeadbeef, 0)] = nat.NewNATBackendValue(net.IPv4(6, 6, 6, 6), 666)
eps.m[nat.NewNATBackendKey(0xdeadbeef, 1)] = nat.NewNATBackendValue(net.IPv4(7, 7, 7, 7), 777)
s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt)
s, _ = proxy.NewSyncer(4, nodeIPs, svcs, eps, aff, rt, nil)
checkAfterResync()
}))

Expand Down Expand Up @@ -551,7 +551,7 @@ var _ = Describe("BPF Syncer", func() {

By("inserting non-local eps for a NodePort - no route", makestep(func() {
// use the meta node IP for nodeports as well
s, _ = proxy.NewSyncer(4, append(nodeIPs, net.IPv4(255, 255, 255, 255)), svcs, eps, aff, rt)
s, _ = proxy.NewSyncer(4, append(nodeIPs, net.IPv4(255, 255, 255, 255)), svcs, eps, aff, rt, nil)
state.SvcMap[svcKey2] = proxy.NewK8sServicePort(
net.IPv4(10, 0, 0, 2),
2222,
Expand Down Expand Up @@ -704,7 +704,7 @@ var _ = Describe("BPF Syncer", func() {

By("inserting only non-local eps for a NodePort - multiple nodes & pods/node", makestep(func() {
// use the meta node IP for nodeports as well
s, _ = proxy.NewSyncer(4, append(nodeIPs, net.IPv4(255, 255, 255, 255)), svcs, eps, aff, rt)
s, _ = proxy.NewSyncer(4, append(nodeIPs, net.IPv4(255, 255, 255, 255)), svcs, eps, aff, rt, nil)
state.SvcMap[svcKey2] = proxy.NewK8sServicePort(
net.IPv4(10, 0, 0, 2),
2222,
Expand Down Expand Up @@ -784,7 +784,7 @@ var _ = Describe("BPF Syncer", func() {

By("restarting Syncer to check if NodePortRemotes are picked up correctly", makestep(func() {
// use the meta node IP for nodeports as well
s, _ = proxy.NewSyncer(4, append(nodeIPs, net.IPv4(255, 255, 255, 255)), svcs, eps, aff, rt)
s, _ = proxy.NewSyncer(4, append(nodeIPs, net.IPv4(255, 255, 255, 255)), svcs, eps, aff, rt, nil)
err := s.Apply(state)
Expect(err).NotTo(HaveOccurred())

Expand Down
1 change: 1 addition & 0 deletions felix/config/config_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ type Config struct {
BPFPolicyDebugEnabled bool `config:"bool;true"`
BPFForceTrackPacketsFromIfaces []string `config:"iface-filter-slice;docker+"`
BPFDisableGROForIfaces *regexp.Regexp `config:"regexp;"`
BPFExcludeIPsFromNAT []string `config:"cidr-list;;"`

// DebugBPFCgroupV2 controls the cgroup v2 path that we apply the connect-time load balancer to. Most distros
// are configured for cgroup v1, which prevents all but the root cgroup v2 from working so this is only useful
Expand Down
1 change: 1 addition & 0 deletions felix/dataplane/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ func StartDataplaneDriver(configParams *config.Config,
BPFConntrackTimeouts: conntrack.DefaultTimeouts(), // FIXME make timeouts configurable
RouteTableManager: routeTableIndexAllocator,
MTUIfacePattern: configParams.MTUIfacePattern,
BPFExcludeIPsFromNAT: configParams.BPFExcludeIPsFromNAT,

KubeClientSet: k8sClientSet,

Expand Down
27 changes: 24 additions & 3 deletions felix/dataplane/linux/bpf_ep_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,10 @@ type bpfEndpointManager struct {

bpfPolicyDebugEnabled bool

routeTable routetable.RouteTableInterface
services map[serviceKey][]ip.CIDR
dirtyServices set.Set[serviceKey]
routeTable routetable.RouteTableInterface
services map[serviceKey][]ip.CIDR
dirtyServices set.Set[serviceKey]
natExcludedIPs *ip.CIDRTrie

// Maps for policy rule counters
polNameToMatchIDs map[string]set.Set[polprog.RuleMatchID]
Expand Down Expand Up @@ -505,6 +506,22 @@ func newBPFEndpointManager(
)
m.services = make(map[serviceKey][]ip.CIDR)
m.dirtyServices = set.New[serviceKey]()
m.natExcludedIPs = ip.NewCIDRTrie()

var excludeIPsMatch = 1

for _, c := range config.BPFExcludeIPsFromNAT {
cidr, err := ip.CIDRFromString(c)
if err != nil {
log.WithError(err).Warnf("Bad %s CIDR to exclude from NAT", c)
}

if int(cidr.Version()) == 6 && !m.ipv6Enabled {
continue
}

m.natExcludedIPs.Update(cidr, &excludeIPsMatch)
}

// Anything else would prevent packets being accepted from the special
// service veth. It does not create a security hole since BPF does the RPF
Expand Down Expand Up @@ -3242,6 +3259,10 @@ func (m *bpfEndpointManager) onServiceUpdate(update *proto.ServiceUpdate) {
if err != nil {
log.WithFields(log.Fields{"service": key, "ip": i}).Warn("Not a valid CIDR.")
} else {
_, v := m.natExcludedIPs.LPM(cidr)
if v != nil {
continue
}
if m.ipv6Enabled {
if _, ok := cidr.(ip.V6CIDR); ok {
ips = append(ips, cidr)
Expand Down
Loading

0 comments on commit b5c7cb6

Please sign in to comment.