Skip to content

Commit

Permalink
Listen for NetworkPolicies
Browse files Browse the repository at this point in the history
  • Loading branch information
Lykos153 committed Sep 29, 2022
1 parent 7d5310a commit 0bb49c8
Show file tree
Hide file tree
Showing 10 changed files with 314 additions and 24 deletions.
6 changes: 6 additions & 0 deletions cmd/ch-k8s-lbaas-controller/ch-k8s-lbaas-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"

// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

Expand Down Expand Up @@ -90,13 +91,17 @@ func main() {
servicesInformer := kubeInformerFactory.Core().V1().Services()
nodesInformer := kubeInformerFactory.Core().V1().Nodes()
endpointsInformer := kubeInformerFactory.Core().V1().Endpoints()
networkPoliciesInformer := kubeInformerFactory.Networking().V1().NetworkPolicies()
podsInformer := kubeInformerFactory.Core().V1().Pods() // TODO: I don't want to be informed about pods. Just need to list them

modelGenerator, err := controller.NewLoadBalancerModelGenerator(
fileCfg.BackendLayer,
l3portmanager,
servicesInformer.Lister(),
nodesInformer.Lister(),
endpointsInformer.Lister(),
networkPoliciesInformer.Lister(),
podsInformer.Lister(),
)

if fileCfg.BackendLayer != config.BackendLayerNodePort {
Expand All @@ -116,6 +121,7 @@ func main() {
servicesInformer,
nodesInformer,
endpointsInformer,
networkPoliciesInformer,
l3portmanager,
agentController,
modelGenerator,
Expand Down
2 changes: 1 addition & 1 deletion internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (h *ApplyHandlerv1) preflightCheck(w http.ResponseWriter, r *http.Request)
}

func (h *ApplyHandlerv1) ProcessRequest(lbcfg *model.LoadBalancer) (int, string) {
klog.V(1).Infof("received config: %#v", lbcfg)
klog.Infof("received config: %#v", lbcfg)

changed, err := h.KeepalivedConfig.WriteWithRollback(lbcfg)
if err != nil {
Expand Down
131 changes: 124 additions & 7 deletions internal/agent/nftables_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,51 @@ var (
{{ $cfg := . }}
table {{ .FilterTableType }} {{ .FilterTableName }} {
chain {{ .FilterForwardChainName }} {
{{- range $dest := $cfg.PolicyAssignments }}
{{- range $pol := $dest.NetworkPolicies }}
{{- if eq ((index $cfg.NetworkPolicies $pol).Ports | len) 0 }}
ct mark {{ $cfg.FWMarkBits | printf "0x%x" }} and {{ $cfg.FWMarkMask | printf "0x%x" }} ip daddr {{ $dest.Address }} jump {{ $pol }};
{{- else }}
{{- range $port := (index $cfg.NetworkPolicies $pol).Ports }}
ct mark {{ $cfg.FWMarkBits | printf "0x%x" }} and {{ $cfg.FWMarkMask | printf "0x%x" }} ip daddr {{ $dest.Address }} {{ $port.Protocol }} {{- if $port.Port }} dport {{ $port.Port -}} {{- if $port.EndPort -}} - {{- $port.EndPort -}} {{- end -}} {{- end }} jump {{ $pol }};
{{- end }}
{{- end }}
{{- end }}
{{- end }}
ct mark {{ $cfg.NPMarkBits | printf "0x%x" }} or {{ $cfg.FWMarkBits | printf "0x%x" }} drop;
ct mark {{ $cfg.FWMarkBits | printf "0x%x" }} and {{ $cfg.FWMarkMask | printf "0x%x" }} accept;
}
{{- range $policy := $cfg.NetworkPolicies }}
chain {{ $policy.Name }} {
mark set {{ $cfg.NPMarkBits | printf "0x%x" }} or {{ $cfg.FWMarkBits | printf "0x%x" }} ct mark set meta mark
{{- range $index, $ipblock := $policy.AllowedIPBlocks }}
ip saddr {{ $ipblock.Cidr }} {{ if eq ($ipblock.Except | len) 0 -}} accept {{- else -}} jump {{ $policy.Name }}-cidr{{ $index -}} {{- end }};
{{- end }}
return;
}
{{- range $index, $ipblock := $policy.AllowedIPBlocks }}
{{- if ne ($ipblock.Except | len) 0 }}
chain {{ $policy.Name }}-cidr{{ $index }} {
{{- range $except := $ipblock.Except }}
ip saddr {{ $except }} return;
{{- end }}
accept;
}
{{- end }}
{{- end }}
{{- end }}
}
table ip {{ .NATTableName }} {
chain {{ .NATPreroutingChainName }} {
{{ range $fwd := .Forwards }}
{{ if ne ($fwd.DestinationAddresses | len) 0 }}
ip daddr {{ $fwd.InboundIP }} {{ $fwd.Protocol }} dport {{ $fwd.InboundPort }} mark set {{ $cfg.FWMarkBits | printf "0x%x" }} and {{ $cfg.FWMarkMask | printf "0x%x" }} ct mark set meta mark dnat to numgen inc mod {{ $fwd.DestinationAddresses | len }} map {
{{- range $fwd := .Forwards }}
{{- if ne ($fwd.DestinationAddresses | len) 0 }}
ip daddr {{ $fwd.InboundIP }} {{ $fwd.Protocol }} dport {{ $fwd.InboundPort }} mark set {{ $cfg.FWMarkBits | printf "0x%x" }} and {{ $cfg.FWMarkMask | printf "0x%x" }} ct mark set meta mark dnat to numgen inc mod {{ $fwd.DestinationAddresses | len }} map {
{{- range $index, $daddr := $fwd.DestinationAddresses }}{{ $index }} : {{ $daddr }}, {{ end -}}
} : {{ $fwd.DestinationPort }};
{{ end }}
{{ end }}
{{- end }}
{{- end }}
}
chain {{ .NATPostroutingChainName }} {
Expand All @@ -55,6 +87,28 @@ table ip {{ .NATTableName }} {
ErrProtocolNotSupported = fmt.Errorf("Protocol is not supported")
)

type allowedIPBlock struct {
Cidr string
Except []string
}

type policyPort struct {
Protocol string
Port *int32
EndPort *int32
}

type networkPolicy struct {
Name string
AllowedIPBlocks []allowedIPBlock
Ports []policyPort
}

type policyAssignment struct {
Address string
NetworkPolicies []string
}

type nftablesForward struct {
Protocol string
InboundIP string
Expand All @@ -72,7 +126,10 @@ type nftablesConfig struct {
NATPreroutingChainName string
FWMarkBits uint32
FWMarkMask uint32
NPMarkBits uint32
Forwards []nftablesForward
NetworkPolicies map[string]networkPolicy
PolicyAssignments []policyAssignment
}

type NftablesGenerator struct {
Expand All @@ -85,7 +142,54 @@ func copyAddresses(in []string) []string {
return result
}

func (g *NftablesGenerator) mapProtocol(k8sproto corev1.Protocol) (string, error) {
func copyIPBlocks(in []model.AllowedIPBlock) []allowedIPBlock {
result := make([]allowedIPBlock, len(in))
for i, block := range in {
result[i].Cidr = block.Cidr
result[i].Except = copyAddresses(block.Except)
}
return result
}

func copyPolicyPorts(in []model.PolicyPort) ([]policyPort, error) {
result := make([]policyPort, len(in))
var err error
for i, port := range in {
result[i].Protocol, err = mapProtocol(port.Protocol)
if err != nil {
return nil, err
}
result[i].Port = port.Port
result[i].EndPort = port.EndPort
}
return result, nil
}

func copyNetworkPolicies(in []model.NetworkPolicy) ([]networkPolicy, error) {
result := make([]networkPolicy, len(in))
var err error
for i, policy := range in {
result[i].Name = policy.Name
result[i].AllowedIPBlocks = copyIPBlocks(policy.AllowedIPBlocks)
result[i].Ports, err = copyPolicyPorts(policy.Ports)
if err != nil {
return nil, err
}
}
return result, nil
}

func copyPolicyAssignment(in []model.PolicyAssignment) []policyAssignment {
result := make([]policyAssignment, len(in))
for i, assignment := range in {
result[i].Address = assignment.Address
result[i].NetworkPolicies = copyAddresses(assignment.NetworkPolicies)
}
return result
}

// Maps from k8s.io/api/core/v1.Protocol objects to strings understood by nftables
func mapProtocol(k8sproto corev1.Protocol) (string, error) {
switch k8sproto {
case corev1.ProtocolTCP:
return "tcp", nil
Expand All @@ -96,6 +200,7 @@ func (g *NftablesGenerator) mapProtocol(k8sproto corev1.Protocol) (string, error
}
}

// Generates a config suitable for nftablesTemplate from a LoadBalancer model
func (g *NftablesGenerator) GenerateStructuredConfig(m *model.LoadBalancer) (*nftablesConfig, error) {
result := &nftablesConfig{
FilterTableName: g.Cfg.FilterTableName,
Expand All @@ -106,12 +211,15 @@ func (g *NftablesGenerator) GenerateStructuredConfig(m *model.LoadBalancer) (*nf
NATPreroutingChainName: g.Cfg.NATPreroutingChainName,
FWMarkBits: g.Cfg.FWMarkBits,
FWMarkMask: g.Cfg.FWMarkMask,
NPMarkBits: g.Cfg.NPMarkBits,
Forwards: []nftablesForward{},
NetworkPolicies: map[string]networkPolicy{},
PolicyAssignments: []policyAssignment{},
}

for _, ingress := range m.Ingress {
for _, port := range ingress.Ports {
mappedProtocol, err := g.mapProtocol(port.Protocol)
mappedProtocol, err := mapProtocol(port.Protocol)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -143,6 +251,15 @@ func (g *NftablesGenerator) GenerateStructuredConfig(m *model.LoadBalancer) (*nf
return fwdA.InboundPort < fwdB.InboundPort
})

result.PolicyAssignments = copyPolicyAssignment(m.PolicyAssignments)
policies, err := copyNetworkPolicies(m.NetworkPolicies)
if err != nil {
return nil, err
}
for _, policy := range policies {
result.NetworkPolicies[policy.Name] = policy
}

return result, nil
}

Expand Down
5 changes: 5 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Nftables struct {
NATPostroutingChainName string `toml:"nat-postrouting-chain"`
FWMarkBits uint32 `toml:"fwmark-bits"`
FWMarkMask uint32 `toml:"fwmark-mask"`
NPMarkBits uint32 `toml:"npmark-bits"`

Service ServiceConfig `toml:"service"`
}
Expand Down Expand Up @@ -154,6 +155,10 @@ func FillNftablesConfig(cfg *Nftables) {
cfg.FWMarkMask = 1
}

if cfg.NPMarkBits == 0 {
cfg.NPMarkBits = 2
}

defaultStringList(&cfg.Service.ReloadCommand, []string{"sudo", "systemctl", "reload", "nftables"})
defaultStringList(&cfg.Service.StartCommand, []string{"sudo", "systemctl", "restart", "nftables"})
}
Expand Down
15 changes: 14 additions & 1 deletion internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
networkinginformers "k8s.io/client-go/informers/networking/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -82,6 +83,7 @@ func NewController(
serviceInformer coreinformers.ServiceInformer,
nodeInformer coreinformers.NodeInformer,
endpointsInformer coreinformers.EndpointsInformer,
networkPoliciesInformer networkinginformers.NetworkPolicyInformer,
l3portmanager openstack.L3PortManager,
agentController AgentController,
generator LoadBalancerModelGenerator,
Expand Down Expand Up @@ -167,6 +169,17 @@ func NewController(
})
}

if networkPoliciesInformer != nil {
networkPoliciesInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleAuxUpdated,
UpdateFunc: func(old, new interface{}) {
klog.Info("UpdateFunc called")
controller.handleAuxUpdated(new)
},
DeleteFunc: controller.handleAuxUpdated,
})
}

return controller, nil
}

Expand Down Expand Up @@ -245,7 +258,7 @@ func (c *Controller) periodicCleanup() {
func (c *Controller) handleObject(obj interface{}) {
var object metav1.Object
var ok bool
klog.Info("handleObject called")
klog.Infof("handleObject called with %T", obj)
if object, ok = obj.(metav1.Object); !ok {
klog.V(5).Infof("ignoring non-castable object in handleObject; expecting deletion event")
return
Expand Down
1 change: 1 addition & 0 deletions internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (f *fixture) newController() (*Controller, kubeinformers.SharedInformerFact
k8sI.Core().V1().Services(),
k8sI.Core().V1().Nodes(),
k8sI.Core().V1().Endpoints(),
k8sI.Networking().V1().NetworkPolicies(),
ostesting.NewMockL3PortManager(),
controllertesting.NewMockAgentController(),
controllertesting.NewMockLoadBalancerModelGenerator(),
Expand Down
7 changes: 5 additions & 2 deletions internal/controller/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"

corelisters "k8s.io/client-go/listers/core/v1"
networkinglisters "k8s.io/client-go/listers/networking/v1"

"github.com/cloudandheat/ch-k8s-lbaas/internal/config"
"github.com/cloudandheat/ch-k8s-lbaas/internal/model"
Expand All @@ -33,7 +34,9 @@ func NewLoadBalancerModelGenerator(
l3portmanager openstack.L3PortManager,
services corelisters.ServiceLister,
nodes corelisters.NodeLister,
endpoints corelisters.EndpointsLister) (LoadBalancerModelGenerator, error) {
endpoints corelisters.EndpointsLister,
networkpolicies networkinglisters.NetworkPolicyLister,
pods corelisters.PodLister) (LoadBalancerModelGenerator, error) {
switch backendLayer {
case config.BackendLayerNodePort:
return NewNodePortLoadBalancerModelGenerator(
Expand All @@ -45,7 +48,7 @@ func NewLoadBalancerModelGenerator(
), nil
case config.BackendLayerPod:
return NewPodLoadBalancerModelGenerator(
l3portmanager, services, endpoints,
l3portmanager, services, endpoints, networkpolicies, pods,
), nil
default:
return nil, fmt.Errorf("invalid backend type: %q", backendLayer)
Expand Down
Loading

0 comments on commit 0bb49c8

Please sign in to comment.