Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist IPAM state to local file and use across restarts #972

Merged
merged 1 commit into from
Jun 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions cmd/aws-k8s-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,12 @@ func _main() int {
return 1
}

discoverController := k8sapi.NewController(kubeClient)
go discoverController.DiscoverLocalK8SPods()

Comment on lines -48 to -50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting rid of this, and just reading it from the CRI socket will resolve #711 and replace #738 as well...

eniConfigController := eniconfig.NewENIConfigController()
if ipamd.UseCustomNetworkCfg() {
go eniConfigController.Start()
}

ipamContext, err := ipamd.New(discoverController, eniConfigController)
ipamContext, err := ipamd.New(kubeClient, eniConfigController)

if err != nil {
log.Errorf("Initialization failure: %v", err)
Expand Down
126 changes: 58 additions & 68 deletions cmd/routed-eni-cni-plugin/cni.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,7 @@ var version string

// NetConf stores the common network config for the CNI plugin
type NetConf struct {
// CNIVersion is the plugin version
CNIVersion string `json:"cniVersion,omitempty"`

// Name is the plugin name
Name string `json:"name"`

// Type is the plugin type
Type string `json:"type"`
types.NetConf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


// VethPrefix is the prefix to use when constructing the host-side
// veth device name. It should be no more than four characters, and
Expand Down Expand Up @@ -91,8 +84,8 @@ func init() {

// LoadNetConf converts inputs (i.e. stdin) to NetConf
func LoadNetConf(bytes []byte) (*NetConf, logger.Logger, error) {
conf := &NetConf{}
if err := json.Unmarshal(bytes, conf); err != nil {
var conf NetConf
if err := json.Unmarshal(bytes, &conf); err != nil {
return nil, nil, errors.Wrap(err, "add cmd: error loading config from args")
}

Expand All @@ -118,7 +111,7 @@ func LoadNetConf(bytes []byte) (*NetConf, logger.Logger, error) {
return nil, nil, errors.New("conf.VethPrefix can be at most 4 characters long")
}

return conf, log, nil
return &conf, log, nil
}

func cmdAdd(args *skel.CmdArgs) error {
Expand All @@ -136,7 +129,7 @@ func add(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap
log.Infof("Received CNI add request: ContainerID(%s) Netns(%s) IfName(%s) Args(%s) Path(%s) argsStdinData(%s)",
args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData)

k8sArgs := K8sArgs{}
var k8sArgs K8sArgs
if err := cniTypes.LoadArgs(args.Args, &k8sArgs); err != nil {
log.Errorf("Failed to load k8s config from arg: %v", err)
return errors.Wrap(err, "add cmd: failed to load k8s config from arg")
Expand All @@ -148,11 +141,8 @@ func add(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap
// Set up a connection to the ipamD server.
conn, err := grpcClient.Dial(ipamdAddress, grpc.WithInsecure())
if err != nil {
log.Errorf("Failed to connect to backend server for pod %s namespace %s sandbox %s: %v",
string(k8sArgs.K8S_POD_NAME),
string(k8sArgs.K8S_POD_NAMESPACE),
string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
err)
log.Errorf("Failed to connect to backend server for container %s: %v",
args.ContainerID, err)
return errors.Wrap(err, "add cmd: failed to connect to backend server")
}
defer conn.Close()
Expand All @@ -161,31 +151,30 @@ func add(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap

r, err := c.AddNetwork(context.Background(),
&pb.AddNetworkRequest{
Netns: args.Netns,
K8S_POD_NAME: string(k8sArgs.K8S_POD_NAME),
K8S_POD_NAMESPACE: string(k8sArgs.K8S_POD_NAMESPACE),
K8S_POD_INFRA_CONTAINER_ID: string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
IfName: args.IfName})
Netns: args.Netns,
ContainerID: args.ContainerID,
NetworkName: conf.Name,
IfName: args.IfName,
})

if err != nil {
log.Errorf("Error received from AddNetwork grpc call for pod %s namespace %s sandbox %s: %v",
string(k8sArgs.K8S_POD_NAME),
string(k8sArgs.K8S_POD_NAMESPACE),
string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
log.Errorf("Error received from AddNetwork grpc call for containerID %s: %v",
args.ContainerID,
err)
return errors.Wrap(err, "add cmd: Error received from AddNetwork gRPC call")
}

if !r.Success {
log.Errorf("Failed to assign an IP address to pod %s, namespace %s sandbox %s",
string(k8sArgs.K8S_POD_NAME),
string(k8sArgs.K8S_POD_NAMESPACE),
string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID))
log.Errorf("Failed to assign an IP address to container %s",
args.ContainerID)
return errors.New("add cmd: failed to assign an IP address to container")
}

log.Infof("Received add network response for pod %s namespace %s sandbox %s: %s, table %d, external-SNAT: %v, vpcCIDR: %v",
string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
log.Infof("Received add network response for container %s interface %s: %s, table %d, external-SNAT: %v, vpcCIDR: %v",
args.ContainerID, args.IfName,
r.IPv4Addr, r.DeviceNumber, r.UseExternalSNAT, r.VPCcidrs)

addr := &net.IPNet{
Expand All @@ -195,30 +184,33 @@ func add(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap

// build hostVethName
// Note: the maximum length for linux interface name is 15
hostVethName := generateHostVethName(conf.VethPrefix, string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
hostVethName := generateHostVethName(conf.VethPrefix, conf.Name, args.ContainerID, args.IfName)

err = driverClient.SetupNS(hostVethName, args.IfName, args.Netns, addr, int(r.DeviceNumber), r.VPCcidrs, r.UseExternalSNAT, mtu, log)

if err != nil {
log.Errorf("Failed SetupPodNetwork for pod %s namespace %s sandbox %s: %v",
string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID), err)
log.Errorf("Failed SetupPodNetwork for container %s: %v",
args.ContainerID, err)

// return allocated IP back to IP pool
r, delErr := c.DelNetwork(context.Background(),
&pb.DelNetworkRequest{
K8S_POD_NAME: string(k8sArgs.K8S_POD_NAME),
K8S_POD_NAMESPACE: string(k8sArgs.K8S_POD_NAMESPACE),
K8S_POD_INFRA_CONTAINER_ID: string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
Reason: "SetupNSFailed"})
r, delErr := c.DelNetwork(context.Background(), &pb.DelNetworkRequest{
K8S_POD_NAME: string(k8sArgs.K8S_POD_NAME),
K8S_POD_NAMESPACE: string(k8sArgs.K8S_POD_NAMESPACE),
K8S_POD_INFRA_CONTAINER_ID: string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
ContainerID: args.ContainerID,
IfName: args.IfName,
NetworkName: conf.Name,
Reason: "SetupNSFailed",
})

if delErr != nil {
log.Errorf("Error received from DelNetwork grpc call for pod %s namespace %s sandbox %s: %v",
string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID), delErr)
log.Errorf("Error received from DelNetwork grpc call for container %s: %v",
args.ContainerID, delErr)
}

if !r.Success {
log.Errorf("Failed to release IP of pod %s namespace %s sandbox %s: %v",
string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID), delErr)
log.Errorf("Failed to release IP of container %s: %v",
args.ContainerID, delErr)
}
return errors.Wrap(err, "add command: failed to setup network")
}
Expand All @@ -238,9 +230,9 @@ func add(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap
}

// generateHostVethName returns a name to be used on the host-side veth device.
func generateHostVethName(prefix, namespace, podname string) string {
func generateHostVethName(prefix, netname, containerid, ifname string) string {
h := sha1.New()
h.Write([]byte(fmt.Sprintf("%s.%s", namespace, podname)))
fmt.Fprintf(h, "%s.%s.%s", netname, containerid, ifname)
return fmt.Sprintf("%s%s", prefix, hex.EncodeToString(h.Sum(nil))[:11])
}

Expand All @@ -251,15 +243,15 @@ func cmdDel(args *skel.CmdArgs) error {
func del(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrapper.GRPC, rpcClient rpcwrapper.RPC,
driverClient driver.NetworkAPIs) error {

_, log, err := LoadNetConf(args.StdinData)
conf, log, err := LoadNetConf(args.StdinData)
if err != nil {
return errors.Wrap(err, "add cmd: error loading config from args")
}

log.Infof("Received CNI del request: ContainerID(%s) Netns(%s) IfName(%s) Args(%s) Path(%s) argsStdinData(%s)",
args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData)

k8sArgs := K8sArgs{}
var k8sArgs K8sArgs
if err := cniTypes.LoadArgs(args.Args, &k8sArgs); err != nil {
log.Errorf("Failed to load k8s config from args: %v", err)
return errors.Wrap(err, "del cmd: failed to load k8s config from args")
Expand All @@ -269,42 +261,41 @@ func del(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap
// Set up a connection to the server.
conn, err := grpcClient.Dial(ipamdAddress, grpc.WithInsecure())
if err != nil {
log.Errorf("Failed to connect to backend server for pod %s namespace %s sandbox %s: %v",
string(k8sArgs.K8S_POD_NAME),
string(k8sArgs.K8S_POD_NAMESPACE),
string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
err)
log.Errorf("Failed to connect to backend server for container %s: %v",
args.ContainerID, err)

return errors.Wrap(err, "del cmd: failed to connect to backend server")
}
defer conn.Close()

c := rpcClient.NewCNIBackendClient(conn)

r, err := c.DelNetwork(context.Background(),
&pb.DelNetworkRequest{
K8S_POD_NAME: string(k8sArgs.K8S_POD_NAME),
K8S_POD_NAMESPACE: string(k8sArgs.K8S_POD_NAMESPACE),
K8S_POD_INFRA_CONTAINER_ID: string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
Reason: "PodDeleted"})
r, err := c.DelNetwork(context.Background(), &pb.DelNetworkRequest{
K8S_POD_NAME: string(k8sArgs.K8S_POD_NAME),
K8S_POD_NAMESPACE: string(k8sArgs.K8S_POD_NAMESPACE),
K8S_POD_INFRA_CONTAINER_ID: string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
NetworkName: conf.Name,
ContainerID: args.ContainerID,
IfName: args.IfName,
Reason: "PodDeleted",
})

if err != nil {
if strings.Contains(err.Error(), datastore.ErrUnknownPod.Error()) {
// Plugins should generally complete a DEL action without error even if some resources are missing. For example,
// an IPAM plugin should generally release an IP allocation and return success even if the container network
// namespace no longer exists, unless that network namespace is critical for IPAM management
log.Infof("Pod %s in namespace %s not found", string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE))
log.Infof("Container %s not found", args.ContainerID)
return nil
} else {
log.Errorf("Error received from DelNetwork gRPC call for pod %s namespace %s sandbox %s: %v",
string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID), err)
return errors.Wrap(err, "del cmd: error received from DelNetwork gRPC call")
}
log.Errorf("Error received from DelNetwork gRPC call for container %s: %v",
args.ContainerID, err)
return errors.Wrap(err, "del cmd: error received from DelNetwork gRPC call")
}

if !r.Success {
log.Errorf("Failed to process delete request for pod %s namespace %s sandbox %s: Success == false",
string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID))
log.Errorf("Failed to process delete request for container %s: Success == false",
args.ContainerID)
return errors.New("del cmd: failed to process delete request")
}

Expand All @@ -316,13 +307,12 @@ func del(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap
}
err = driverClient.TeardownNS(addr, int(r.DeviceNumber), log)
if err != nil {
log.Errorf("Failed on TeardownPodNetwork for pod %s namespace %s sandbox %s: %v",
string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID), err)
log.Errorf("Failed on TeardownPodNetwork for container ID %s: %v",
args.ContainerID, err)
return errors.Wrap(err, "del cmd: failed on tear down pod network")
}
} else {
log.Warnf("Pod %s in namespace %s did not have a valid IP %s", string(k8sArgs.K8S_POD_NAME),
string(k8sArgs.K8S_POD_NAMESPACE), r.IPv4Addr)
log.Warnf("Container %s did not have a valid IP %s", args.ContainerID, r.IPv4Addr)
}
return nil
}
Expand Down
10 changes: 7 additions & 3 deletions cmd/routed-eni-cni-plugin/cni_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/containernetworking/cni/pkg/skel"
"github.com/containernetworking/cni/pkg/types"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
Expand All @@ -45,9 +46,12 @@ const (
devNum = 4
)

var netConf = &NetConf{CNIVersion: cniVersion,
Name: cniName,
Type: cniType,
var netConf = &NetConf{
NetConf: types.NetConf{
CNIVersion: cniVersion,
Name: cniName,
Type: cniType,
},
PluginLogLevel: pluginLogLevel,
PluginLogFile: pluginLogFile,
}
Expand Down
31 changes: 11 additions & 20 deletions config/master/aws-k8s-cni-cn.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,9 @@
- "apiGroups":
- "crd.k8s.amazonaws.com"
"resources":
- "*"
- "eniconfigs"
"verbs":
- "*"
- "apiGroups":
- ""
"resources":
- "pods"
- "nodes"
- "namespaces"
"verbs":
- "list"
- "watch"
- "get"
- "apiGroups":
- "extensions"
"resources":
- "daemonsets"
"verbs":
- "list"
- "watch"
---
Expand Down Expand Up @@ -151,6 +136,8 @@
"name": "cni-net-dir"
- "mountPath": "/host/var/log/aws-routed-eni"
"name": "log-dir"
- "mountPath": "/var/run/aws-node"
"name": "run-dir"
- "mountPath": "/var/run/docker.sock"
"name": "dockersock"
- "mountPath": "/var/run/dockershim.sock"
Expand All @@ -176,16 +163,20 @@
- "hostPath":
"path": "/etc/cni/net.d"
"name": "cni-net-dir"
- "hostPath":
"path": "/var/log/aws-routed-eni"
"type": "DirectoryOrCreate"
"name": "log-dir"
- "hostPath":
"path": "/var/run/docker.sock"
"name": "dockersock"
- "hostPath":
"path": "/var/run/dockershim.sock"
"name": "dockershim"
- "hostPath":
"path": "/var/log/aws-routed-eni"
"type": "DirectoryOrCreate"
"name": "log-dir"
- "hostPath":
"path": "/var/run/aws-node"
"type": "DirectoryOrCreate"
"name": "run-dir"
"updateStrategy":
"rollingUpdate":
"maxUnavailable": "10%"
Expand Down
Loading