Skip to content

Commit

Permalink
Persist IPAM state to local file and use across restarts
Browse files Browse the repository at this point in the history
Persist IPAM state to a file in /var/run (by default), and use this to
recover state across a restart.  Note no state needs to be preserved
across a reboot, since all containers are also restarted.

Removes need[*] for docker/CRI and Kubernetes API from ipamd.

[*] But not "use" of docker/CRI :(
- CRI is necessary for this release to handle upgrades from earlier
  versions - without requiring a reboot of the node.  We can
  drop CRI for real in release after release that contains this PR.
- The CNI K8S_POD_* arguments are still passed in the gRPC request(s)
  to ipamd.  It is expected that pod name/namespace _will_ be necessary
  at some point (to fetch pod annotations).
  • Loading branch information
anguslees authored and mogren committed Jun 5, 2020
1 parent a3d6dc3 commit e38a4d8
Show file tree
Hide file tree
Showing 22 changed files with 1,039 additions and 893 deletions.
5 changes: 1 addition & 4 deletions cmd/aws-k8s-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,12 @@ func _main() int {
return 1
}

discoverController := k8sapi.NewController(kubeClient)
go discoverController.DiscoverLocalK8SPods()

eniConfigController := eniconfig.NewENIConfigController()
if ipamd.UseCustomNetworkCfg() {
go eniConfigController.Start()
}

ipamContext, err := ipamd.New(discoverController, eniConfigController)
ipamContext, err := ipamd.New(kubeClient, eniConfigController)

if err != nil {
log.Errorf("Initialization failure: %v", err)
Expand Down
126 changes: 58 additions & 68 deletions cmd/routed-eni-cni-plugin/cni.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,7 @@ var version string

// NetConf stores the common network config for the CNI plugin
type NetConf struct {
// CNIVersion is the plugin version
CNIVersion string `json:"cniVersion,omitempty"`

// Name is the plugin name
Name string `json:"name"`

// Type is the plugin type
Type string `json:"type"`
types.NetConf

// VethPrefix is the prefix to use when constructing the host-side
// veth device name. It should be no more than four characters, and
Expand Down Expand Up @@ -91,8 +84,8 @@ func init() {

// LoadNetConf converts inputs (i.e. stdin) to NetConf
func LoadNetConf(bytes []byte) (*NetConf, logger.Logger, error) {
conf := &NetConf{}
if err := json.Unmarshal(bytes, conf); err != nil {
var conf NetConf
if err := json.Unmarshal(bytes, &conf); err != nil {
return nil, nil, errors.Wrap(err, "add cmd: error loading config from args")
}

Expand All @@ -118,7 +111,7 @@ func LoadNetConf(bytes []byte) (*NetConf, logger.Logger, error) {
return nil, nil, errors.New("conf.VethPrefix can be at most 4 characters long")
}

return conf, log, nil
return &conf, log, nil
}

func cmdAdd(args *skel.CmdArgs) error {
Expand All @@ -136,7 +129,7 @@ func add(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap
log.Infof("Received CNI add request: ContainerID(%s) Netns(%s) IfName(%s) Args(%s) Path(%s) argsStdinData(%s)",
args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData)

k8sArgs := K8sArgs{}
var k8sArgs K8sArgs
if err := cniTypes.LoadArgs(args.Args, &k8sArgs); err != nil {
log.Errorf("Failed to load k8s config from arg: %v", err)
return errors.Wrap(err, "add cmd: failed to load k8s config from arg")
Expand All @@ -148,11 +141,8 @@ func add(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap
// Set up a connection to the ipamD server.
conn, err := grpcClient.Dial(ipamdAddress, grpc.WithInsecure())
if err != nil {
log.Errorf("Failed to connect to backend server for pod %s namespace %s sandbox %s: %v",
string(k8sArgs.K8S_POD_NAME),
string(k8sArgs.K8S_POD_NAMESPACE),
string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
err)
log.Errorf("Failed to connect to backend server for container %s: %v",
args.ContainerID, err)
return errors.Wrap(err, "add cmd: failed to connect to backend server")
}
defer conn.Close()
Expand All @@ -161,31 +151,30 @@ func add(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap

r, err := c.AddNetwork(context.Background(),
&pb.AddNetworkRequest{
Netns: args.Netns,
K8S_POD_NAME: string(k8sArgs.K8S_POD_NAME),
K8S_POD_NAMESPACE: string(k8sArgs.K8S_POD_NAMESPACE),
K8S_POD_INFRA_CONTAINER_ID: string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
IfName: args.IfName})
Netns: args.Netns,
ContainerID: args.ContainerID,
NetworkName: conf.Name,
IfName: args.IfName,
})

if err != nil {
log.Errorf("Error received from AddNetwork grpc call for pod %s namespace %s sandbox %s: %v",
string(k8sArgs.K8S_POD_NAME),
string(k8sArgs.K8S_POD_NAMESPACE),
string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
log.Errorf("Error received from AddNetwork grpc call for containerID %s: %v",
args.ContainerID,
err)
return errors.Wrap(err, "add cmd: Error received from AddNetwork gRPC call")
}

if !r.Success {
log.Errorf("Failed to assign an IP address to pod %s, namespace %s sandbox %s",
string(k8sArgs.K8S_POD_NAME),
string(k8sArgs.K8S_POD_NAMESPACE),
string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID))
log.Errorf("Failed to assign an IP address to container %s",
args.ContainerID)
return errors.New("add cmd: failed to assign an IP address to container")
}

log.Infof("Received add network response for pod %s namespace %s sandbox %s: %s, table %d, external-SNAT: %v, vpcCIDR: %v",
string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
log.Infof("Received add network response for container %s interface %s: %s, table %d, external-SNAT: %v, vpcCIDR: %v",
args.ContainerID, args.IfName,
r.IPv4Addr, r.DeviceNumber, r.UseExternalSNAT, r.VPCcidrs)

addr := &net.IPNet{
Expand All @@ -195,30 +184,33 @@ func add(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap

// build hostVethName
// Note: the maximum length for linux interface name is 15
hostVethName := generateHostVethName(conf.VethPrefix, string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
hostVethName := generateHostVethName(conf.VethPrefix, conf.Name, args.ContainerID, args.IfName)

err = driverClient.SetupNS(hostVethName, args.IfName, args.Netns, addr, int(r.DeviceNumber), r.VPCcidrs, r.UseExternalSNAT, mtu, log)

if err != nil {
log.Errorf("Failed SetupPodNetwork for pod %s namespace %s sandbox %s: %v",
string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID), err)
log.Errorf("Failed SetupPodNetwork for container %s: %v",
args.ContainerID, err)

// return allocated IP back to IP pool
r, delErr := c.DelNetwork(context.Background(),
&pb.DelNetworkRequest{
K8S_POD_NAME: string(k8sArgs.K8S_POD_NAME),
K8S_POD_NAMESPACE: string(k8sArgs.K8S_POD_NAMESPACE),
K8S_POD_INFRA_CONTAINER_ID: string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
Reason: "SetupNSFailed"})
r, delErr := c.DelNetwork(context.Background(), &pb.DelNetworkRequest{
K8S_POD_NAME: string(k8sArgs.K8S_POD_NAME),
K8S_POD_NAMESPACE: string(k8sArgs.K8S_POD_NAMESPACE),
K8S_POD_INFRA_CONTAINER_ID: string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
ContainerID: args.ContainerID,
IfName: args.IfName,
NetworkName: conf.Name,
Reason: "SetupNSFailed",
})

if delErr != nil {
log.Errorf("Error received from DelNetwork grpc call for pod %s namespace %s sandbox %s: %v",
string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID), delErr)
log.Errorf("Error received from DelNetwork grpc call for container %s: %v",
args.ContainerID, delErr)
}

if !r.Success {
log.Errorf("Failed to release IP of pod %s namespace %s sandbox %s: %v",
string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID), delErr)
log.Errorf("Failed to release IP of container %s: %v",
args.ContainerID, delErr)
}
return errors.Wrap(err, "add command: failed to setup network")
}
Expand All @@ -238,9 +230,9 @@ func add(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap
}

// generateHostVethName returns a name to be used on the host-side veth device.
func generateHostVethName(prefix, namespace, podname string) string {
func generateHostVethName(prefix, netname, containerid, ifname string) string {
h := sha1.New()
h.Write([]byte(fmt.Sprintf("%s.%s", namespace, podname)))
fmt.Fprintf(h, "%s.%s.%s", netname, containerid, ifname)
return fmt.Sprintf("%s%s", prefix, hex.EncodeToString(h.Sum(nil))[:11])
}

Expand All @@ -251,15 +243,15 @@ func cmdDel(args *skel.CmdArgs) error {
func del(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrapper.GRPC, rpcClient rpcwrapper.RPC,
driverClient driver.NetworkAPIs) error {

_, log, err := LoadNetConf(args.StdinData)
conf, log, err := LoadNetConf(args.StdinData)
if err != nil {
return errors.Wrap(err, "add cmd: error loading config from args")
}

log.Infof("Received CNI del request: ContainerID(%s) Netns(%s) IfName(%s) Args(%s) Path(%s) argsStdinData(%s)",
args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData)

k8sArgs := K8sArgs{}
var k8sArgs K8sArgs
if err := cniTypes.LoadArgs(args.Args, &k8sArgs); err != nil {
log.Errorf("Failed to load k8s config from args: %v", err)
return errors.Wrap(err, "del cmd: failed to load k8s config from args")
Expand All @@ -269,42 +261,41 @@ func del(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap
// Set up a connection to the server.
conn, err := grpcClient.Dial(ipamdAddress, grpc.WithInsecure())
if err != nil {
log.Errorf("Failed to connect to backend server for pod %s namespace %s sandbox %s: %v",
string(k8sArgs.K8S_POD_NAME),
string(k8sArgs.K8S_POD_NAMESPACE),
string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
err)
log.Errorf("Failed to connect to backend server for container %s: %v",
args.ContainerID, err)

return errors.Wrap(err, "del cmd: failed to connect to backend server")
}
defer conn.Close()

c := rpcClient.NewCNIBackendClient(conn)

r, err := c.DelNetwork(context.Background(),
&pb.DelNetworkRequest{
K8S_POD_NAME: string(k8sArgs.K8S_POD_NAME),
K8S_POD_NAMESPACE: string(k8sArgs.K8S_POD_NAMESPACE),
K8S_POD_INFRA_CONTAINER_ID: string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
Reason: "PodDeleted"})
r, err := c.DelNetwork(context.Background(), &pb.DelNetworkRequest{
K8S_POD_NAME: string(k8sArgs.K8S_POD_NAME),
K8S_POD_NAMESPACE: string(k8sArgs.K8S_POD_NAMESPACE),
K8S_POD_INFRA_CONTAINER_ID: string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
NetworkName: conf.Name,
ContainerID: args.ContainerID,
IfName: args.IfName,
Reason: "PodDeleted",
})

if err != nil {
if strings.Contains(err.Error(), datastore.ErrUnknownPod.Error()) {
// Plugins should generally complete a DEL action without error even if some resources are missing. For example,
// an IPAM plugin should generally release an IP allocation and return success even if the container network
// namespace no longer exists, unless that network namespace is critical for IPAM management
log.Infof("Pod %s in namespace %s not found", string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE))
log.Infof("Container %s not found", args.ContainerID)
return nil
} else {
log.Errorf("Error received from DelNetwork gRPC call for pod %s namespace %s sandbox %s: %v",
string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID), err)
return errors.Wrap(err, "del cmd: error received from DelNetwork gRPC call")
}
log.Errorf("Error received from DelNetwork gRPC call for container %s: %v",
args.ContainerID, err)
return errors.Wrap(err, "del cmd: error received from DelNetwork gRPC call")
}

if !r.Success {
log.Errorf("Failed to process delete request for pod %s namespace %s sandbox %s: Success == false",
string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID))
log.Errorf("Failed to process delete request for container %s: Success == false",
args.ContainerID)
return errors.New("del cmd: failed to process delete request")
}

Expand All @@ -316,13 +307,12 @@ func del(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap
}
err = driverClient.TeardownNS(addr, int(r.DeviceNumber), log)
if err != nil {
log.Errorf("Failed on TeardownPodNetwork for pod %s namespace %s sandbox %s: %v",
string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID), err)
log.Errorf("Failed on TeardownPodNetwork for container ID %s: %v",
args.ContainerID, err)
return errors.Wrap(err, "del cmd: failed on tear down pod network")
}
} else {
log.Warnf("Pod %s in namespace %s did not have a valid IP %s", string(k8sArgs.K8S_POD_NAME),
string(k8sArgs.K8S_POD_NAMESPACE), r.IPv4Addr)
log.Warnf("Container %s did not have a valid IP %s", args.ContainerID, r.IPv4Addr)
}
return nil
}
Expand Down
10 changes: 7 additions & 3 deletions cmd/routed-eni-cni-plugin/cni_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/containernetworking/cni/pkg/skel"
"github.com/containernetworking/cni/pkg/types"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
Expand All @@ -45,9 +46,12 @@ const (
devNum = 4
)

var netConf = &NetConf{CNIVersion: cniVersion,
Name: cniName,
Type: cniType,
var netConf = &NetConf{
NetConf: types.NetConf{
CNIVersion: cniVersion,
Name: cniName,
Type: cniType,
},
PluginLogLevel: pluginLogLevel,
PluginLogFile: pluginLogFile,
}
Expand Down
31 changes: 11 additions & 20 deletions config/master/aws-k8s-cni-cn.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,9 @@
- "apiGroups":
- "crd.k8s.amazonaws.com"
"resources":
- "*"
- "eniconfigs"
"verbs":
- "*"
- "apiGroups":
- ""
"resources":
- "pods"
- "nodes"
- "namespaces"
"verbs":
- "list"
- "watch"
- "get"
- "apiGroups":
- "extensions"
"resources":
- "daemonsets"
"verbs":
- "list"
- "watch"
---
Expand Down Expand Up @@ -151,6 +136,8 @@
"name": "cni-net-dir"
- "mountPath": "/host/var/log/aws-routed-eni"
"name": "log-dir"
- "mountPath": "/var/run/aws-node"
"name": "run-dir"
- "mountPath": "/var/run/docker.sock"
"name": "dockersock"
- "mountPath": "/var/run/dockershim.sock"
Expand All @@ -176,16 +163,20 @@
- "hostPath":
"path": "/etc/cni/net.d"
"name": "cni-net-dir"
- "hostPath":
"path": "/var/log/aws-routed-eni"
"type": "DirectoryOrCreate"
"name": "log-dir"
- "hostPath":
"path": "/var/run/docker.sock"
"name": "dockersock"
- "hostPath":
"path": "/var/run/dockershim.sock"
"name": "dockershim"
- "hostPath":
"path": "/var/log/aws-routed-eni"
"type": "DirectoryOrCreate"
"name": "log-dir"
- "hostPath":
"path": "/var/run/aws-node"
"type": "DirectoryOrCreate"
"name": "run-dir"
"updateStrategy":
"rollingUpdate":
"maxUnavailable": "10%"
Expand Down
Loading

0 comments on commit e38a4d8

Please sign in to comment.