diff --git a/cmd/aws-k8s-agent/main.go b/cmd/aws-k8s-agent/main.go index e0da5944918..efb1e1d27eb 100644 --- a/cmd/aws-k8s-agent/main.go +++ b/cmd/aws-k8s-agent/main.go @@ -18,7 +18,6 @@ import ( "github.com/aws/amazon-vpc-cni-k8s/pkg/eniconfig" "github.com/aws/amazon-vpc-cni-k8s/pkg/ipamd" - "github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi" "github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger" ) @@ -39,21 +38,12 @@ func _main() int { log.Infof("Starting L-IPAMD %s ...", version) - kubeClient, err := k8sapi.CreateKubeClient() - if err != nil { - log.Errorf("Failed to create client: %v", err) - return 1 - } - - discoverController := k8sapi.NewController(kubeClient) - go discoverController.DiscoverLocalK8SPods() - eniConfigController := eniconfig.NewENIConfigController() if ipamd.UseCustomNetworkCfg() { go eniConfigController.Start() } - ipamContext, err := ipamd.New(discoverController, eniConfigController) + ipamContext, err := ipamd.New(eniConfigController) if err != nil { log.Errorf("Initialization failure: %v", err) diff --git a/config/next/aws-k8s-cni-cn.yaml b/config/next/aws-k8s-cni-cn.yaml new file mode 100644 index 00000000000..7041cb9feb5 --- /dev/null +++ b/config/next/aws-k8s-cni-cn.yaml @@ -0,0 +1,162 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: aws-node +rules: + - apiGroups: + - crd.k8s.amazonaws.com + resources: + - "*" + verbs: + - "*" + - apiGroups: [""] + resources: + - pods + - nodes + - namespaces + verbs: ["list", "watch", "get"] + - apiGroups: ["extensions"] + resources: + - daemonsets + verbs: ["list", "watch"] + +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: aws-node + namespace: kube-system + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: aws-node +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: aws-node +subjects: + - kind: ServiceAccount + name: aws-node + namespace: kube-system + +--- +kind: DaemonSet +apiVersion: apps/v1 +metadata: + name: aws-node + namespace: kube-system + labels: + k8s-app: aws-node +spec: + updateStrategy: + type: RollingUpdate + rollingUpdate: + maxUnavailable: "10%" + selector: + matchLabels: + k8s-app: aws-node + template: + metadata: + labels: + k8s-app: aws-node + spec: + priorityClassName: system-node-critical + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: "kubernetes.io/os" + operator: In + values: + - linux + - key: "kubernetes.io/arch" + operator: In + values: + - amd64 + - key: "eks.amazonaws.com/compute-type" + operator: NotIn + values: + - fargate + serviceAccountName: aws-node + hostNetwork: true + tolerations: + - operator: Exists + containers: + - image: 961992271922.dkr.ecr.cn-northwest-1.amazonaws.com.cn/amazon-k8s-cni:v1.6.1 + imagePullPolicy: Always + ports: + - containerPort: 61678 + name: metrics + name: aws-node + readinessProbe: + exec: + command: ["/app/grpc-health-probe", "-addr=:50051"] + initialDelaySeconds: 35 + livenessProbe: + exec: + command: ["/app/grpc-health-probe", "-addr=:50051"] + initialDelaySeconds: 35 + env: + - name: AWS_VPC_K8S_CNI_LOGLEVEL + value: DEBUG + - name: AWS_VPC_K8S_CNI_VETHPREFIX + value: eni + - name: AWS_VPC_ENI_MTU + value: "9001" + - name: MY_NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + resources: + requests: + cpu: 10m + securityContext: + privileged: true + volumeMounts: + - mountPath: /host/opt/cni/bin + name: cni-bin-dir + - mountPath: /host/etc/cni/net.d + name: cni-net-dir + - mountPath: /host/var/log + name: log-dir + - mountPath: /var/run/docker.sock + name: dockersock + - mountPath: /var/run/dockershim.sock + name: dockershim + volumes: + - name: cni-bin-dir + hostPath: + path: /opt/cni/bin + - name: cni-net-dir + hostPath: + path: /etc/cni/net.d + - name: log-dir + hostPath: + path: /var/log + - name: dockersock + hostPath: + path: /var/run/docker.sock + - name: dockershim + hostPath: + path: /var/run/dockershim.sock + +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: eniconfigs.crd.k8s.amazonaws.com +spec: + scope: Cluster + group: crd.k8s.amazonaws.com + versions: + - name: v1alpha1 + served: true + storage: true + names: + plural: eniconfigs + singular: eniconfig + kind: ENIConfig diff --git a/config/next/aws-k8s-cni-us-gov-east-1.yaml b/config/next/aws-k8s-cni-us-gov-east-1.yaml new file mode 100644 index 00000000000..b5aec5b332f --- /dev/null +++ b/config/next/aws-k8s-cni-us-gov-east-1.yaml @@ -0,0 +1,175 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: aws-node +rules: + - apiGroups: + - crd.k8s.amazonaws.com + resources: + - "*" + verbs: + - "*" + - apiGroups: [""] + resources: + - pods + - nodes + - namespaces + verbs: ["list", "watch", "get"] + - apiGroups: ["extensions"] + resources: + - daemonsets + verbs: ["list", "watch"] + +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: aws-node + namespace: kube-system + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: aws-node +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: aws-node +subjects: + - kind: ServiceAccount + name: aws-node + namespace: kube-system + +--- +kind: DaemonSet +apiVersion: apps/v1 +metadata: + name: aws-node + namespace: kube-system + labels: + k8s-app: aws-node +spec: + updateStrategy: + type: RollingUpdate + rollingUpdate: + maxUnavailable: "10%" + selector: + matchLabels: + k8s-app: aws-node + template: + metadata: + labels: + k8s-app: aws-node + spec: + priorityClassName: system-node-critical + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: "beta.kubernetes.io/os" + operator: In + values: + - linux + - key: "beta.kubernetes.io/arch" + operator: In + values: + - amd64 + - key: "eks.amazonaws.com/compute-type" + operator: NotIn + values: + - fargate + - matchExpressions: + - key: "kubernetes.io/os" + operator: In + values: + - linux + - key: "kubernetes.io/arch" + operator: In + values: + - amd64 + - key: "eks.amazonaws.com/compute-type" + operator: NotIn + values: + - fargate + serviceAccountName: aws-node + hostNetwork: true + tolerations: + - operator: Exists + containers: + - image: 151742754352.dkr.ecr.us-gov-east-1.amazonaws.com/amazon-k8s-cni:v1.6.1 + imagePullPolicy: Always + ports: + - containerPort: 61678 + name: metrics + name: aws-node + readinessProbe: + exec: + command: ["/app/grpc-health-probe", "-addr=:50051"] + initialDelaySeconds: 35 + livenessProbe: + exec: + command: ["/app/grpc-health-probe", "-addr=:50051"] + initialDelaySeconds: 35 + env: + - name: AWS_VPC_K8S_CNI_LOGLEVEL + value: DEBUG + - name: AWS_VPC_K8S_CNI_VETHPREFIX + value: eni + - name: AWS_VPC_ENI_MTU + value: "9001" + - name: MY_NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + resources: + requests: + cpu: 10m + securityContext: + privileged: true + volumeMounts: + - mountPath: /host/opt/cni/bin + name: cni-bin-dir + - mountPath: /host/etc/cni/net.d + name: cni-net-dir + - mountPath: /host/var/log + name: log-dir + - mountPath: /var/run/docker.sock + name: dockersock + - mountPath: /var/run/dockershim.sock + name: dockershim + volumes: + - name: cni-bin-dir + hostPath: + path: /opt/cni/bin + - name: cni-net-dir + hostPath: + path: /etc/cni/net.d + - name: log-dir + hostPath: + path: /var/log + - name: dockersock + hostPath: + path: /var/run/docker.sock + - name: dockershim + hostPath: + path: /var/run/dockershim.sock + +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: eniconfigs.crd.k8s.amazonaws.com +spec: + scope: Cluster + group: crd.k8s.amazonaws.com + versions: + - name: v1alpha1 + served: true + storage: true + names: + plural: eniconfigs + singular: eniconfig + kind: ENIConfig diff --git a/config/next/aws-k8s-cni-us-gov-west-1.yaml b/config/next/aws-k8s-cni-us-gov-west-1.yaml new file mode 100644 index 00000000000..266b0359377 --- /dev/null +++ b/config/next/aws-k8s-cni-us-gov-west-1.yaml @@ -0,0 +1,175 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: aws-node +rules: + - apiGroups: + - crd.k8s.amazonaws.com + resources: + - "*" + verbs: + - "*" + - apiGroups: [""] + resources: + - pods + - nodes + - namespaces + verbs: ["list", "watch", "get"] + - apiGroups: ["extensions"] + resources: + - daemonsets + verbs: ["list", "watch"] + +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: aws-node + namespace: kube-system + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: aws-node +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: aws-node +subjects: + - kind: ServiceAccount + name: aws-node + namespace: kube-system + +--- +kind: DaemonSet +apiVersion: apps/v1 +metadata: + name: aws-node + namespace: kube-system + labels: + k8s-app: aws-node +spec: + updateStrategy: + type: RollingUpdate + rollingUpdate: + maxUnavailable: "10%" + selector: + matchLabels: + k8s-app: aws-node + template: + metadata: + labels: + k8s-app: aws-node + spec: + priorityClassName: system-node-critical + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: "beta.kubernetes.io/os" + operator: In + values: + - linux + - key: "beta.kubernetes.io/arch" + operator: In + values: + - amd64 + - key: "eks.amazonaws.com/compute-type" + operator: NotIn + values: + - fargate + - matchExpressions: + - key: "kubernetes.io/os" + operator: In + values: + - linux + - key: "kubernetes.io/arch" + operator: In + values: + - amd64 + - key: "eks.amazonaws.com/compute-type" + operator: NotIn + values: + - fargate + serviceAccountName: aws-node + hostNetwork: true + tolerations: + - operator: Exists + containers: + - image: 013241004608.dkr.ecr.us-gov-west-1.amazonaws.com/amazon-k8s-cni:v1.6.1 + imagePullPolicy: Always + ports: + - containerPort: 61678 + name: metrics + name: aws-node + readinessProbe: + exec: + command: ["/app/grpc-health-probe", "-addr=:50051"] + initialDelaySeconds: 35 + livenessProbe: + exec: + command: ["/app/grpc-health-probe", "-addr=:50051"] + initialDelaySeconds: 35 + env: + - name: AWS_VPC_K8S_CNI_LOGLEVEL + value: DEBUG + - name: AWS_VPC_K8S_CNI_VETHPREFIX + value: eni + - name: AWS_VPC_ENI_MTU + value: "9001" + - name: MY_NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + resources: + requests: + cpu: 10m + securityContext: + privileged: true + volumeMounts: + - mountPath: /host/opt/cni/bin + name: cni-bin-dir + - mountPath: /host/etc/cni/net.d + name: cni-net-dir + - mountPath: /host/var/log + name: log-dir + - mountPath: /var/run/docker.sock + name: dockersock + - mountPath: /var/run/dockershim.sock + name: dockershim + volumes: + - name: cni-bin-dir + hostPath: + path: /opt/cni/bin + - name: cni-net-dir + hostPath: + path: /etc/cni/net.d + - name: log-dir + hostPath: + path: /var/log + - name: dockersock + hostPath: + path: /var/run/docker.sock + - name: dockershim + hostPath: + path: /var/run/dockershim.sock + +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: eniconfigs.crd.k8s.amazonaws.com +spec: + scope: Cluster + group: crd.k8s.amazonaws.com + versions: + - name: v1alpha1 + served: true + storage: true + names: + plural: eniconfigs + singular: eniconfig + kind: ENIConfig diff --git a/config/next/aws-k8s-cni.yaml b/config/next/aws-k8s-cni.yaml new file mode 100644 index 00000000000..a2c97b5acef --- /dev/null +++ b/config/next/aws-k8s-cni.yaml @@ -0,0 +1,166 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: aws-node +rules: + - apiGroups: + - crd.k8s.amazonaws.com + resources: + - eniconfigs + verbs: + - get + - list + - watch + +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: aws-node + namespace: kube-system + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: aws-node +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: aws-node +subjects: + - kind: ServiceAccount + name: aws-node + namespace: kube-system + +--- +kind: DaemonSet +apiVersion: apps/v1 +metadata: + name: aws-node + namespace: kube-system + labels: + k8s-app: aws-node +spec: + updateStrategy: + type: RollingUpdate + rollingUpdate: + maxUnavailable: "10%" + selector: + matchLabels: + k8s-app: aws-node + template: + metadata: + labels: + k8s-app: aws-node + spec: + priorityClassName: system-node-critical + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: "beta.kubernetes.io/os" + operator: In + values: + - linux + - key: "beta.kubernetes.io/arch" + operator: In + values: + - amd64 + - key: "eks.amazonaws.com/compute-type" + operator: NotIn + values: + - fargate + - matchExpressions: + - key: "kubernetes.io/os" + operator: In + values: + - linux + - key: "kubernetes.io/arch" + operator: In + values: + - amd64 + - key: "eks.amazonaws.com/compute-type" + operator: NotIn + values: + - fargate + serviceAccountName: aws-node + hostNetwork: true + tolerations: + - operator: Exists + containers: + - image: 602401143452.dkr.ecr.us-west-2.amazonaws.com/amazon-k8s-cni:latest + imagePullPolicy: Always + ports: + - containerPort: 61678 + name: metrics + name: aws-node + readinessProbe: + exec: + command: ["/app/grpc-health-probe", "-addr=:50051"] + initialDelaySeconds: 35 + livenessProbe: + exec: + command: ["/app/grpc-health-probe", "-addr=:50051"] + initialDelaySeconds: 35 + env: + - name: AWS_VPC_K8S_CNI_LOGLEVEL + value: DEBUG + - name: AWS_VPC_K8S_CNI_LOG_FILE + value: /var/log/ipamd.log + - name: AWS_VPC_K8S_CNI_VETHPREFIX + value: eni + - name: AWS_VPC_ENI_MTU + value: "9001" + - name: MY_NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + resources: + requests: + cpu: 10m + securityContext: + privileged: true + volumeMounts: + - mountPath: /host/opt/cni/bin + name: cni-bin-dir + - mountPath: /host/etc/cni/net.d + name: cni-net-dir + - mountPath: /var/log + name: log-dir + - mountPath: /var/run/aws-routed-eni + name: datadir + volumes: + - name: cni-bin-dir + hostPath: + path: /opt/cni/bin + - name: cni-net-dir + hostPath: + path: /etc/cni/net.d + - name: log-dir + hostPath: + path: /var/log/aws-routed-eni + type: DirectoryOrCreate + - name: datadir + hostPath: + path: /var/run/aws-routed-eni + type: DirectoryOrCreate + +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: eniconfigs.crd.k8s.amazonaws.com +spec: + scope: Cluster + group: crd.k8s.amazonaws.com + versions: + - name: v1alpha1 + served: true + storage: true + names: + plural: eniconfigs + singular: eniconfig + kind: ENIConfig diff --git a/config/next/calico.yaml b/config/next/calico.yaml new file mode 100644 index 00000000000..89b591de26a --- /dev/null +++ b/config/next/calico.yaml @@ -0,0 +1,757 @@ +--- +kind: DaemonSet +apiVersion: apps/v1 +metadata: + name: calico-node + namespace: kube-system + labels: + k8s-app: calico-node +spec: + selector: + matchLabels: + k8s-app: calico-node + updateStrategy: + type: RollingUpdate + rollingUpdate: + maxUnavailable: 1 + template: + metadata: + labels: + k8s-app: calico-node + spec: + priorityClassName: system-node-critical + nodeSelector: + beta.kubernetes.io/os: linux + hostNetwork: true + serviceAccountName: calico-node + # Minimize downtime during a rolling upgrade or deletion; tell Kubernetes to do a "force + # deletion": https://kubernetes.io/docs/concepts/workloads/pods/pod/#termination-of-pods. + terminationGracePeriodSeconds: 0 + containers: + # Runs calico/node container on each Kubernetes node. This + # container programs network policy and routes on each + # host. + - name: calico-node + image: quay.io/calico/node:v3.13.0 + env: + # Use Kubernetes API as the backing datastore. + - name: DATASTORE_TYPE + value: "kubernetes" + # Use eni not cali for interface prefix + - name: FELIX_INTERFACEPREFIX + value: "eni" + # Enable felix info logging. + - name: FELIX_LOGSEVERITYSCREEN + value: "info" + # Don't enable BGP. + - name: CALICO_NETWORKING_BACKEND + value: "none" + # Cluster type to identify the deployment type + - name: CLUSTER_TYPE + value: "k8s,ecs" + # Disable file logging so `kubectl logs` works. + - name: CALICO_DISABLE_FILE_LOGGING + value: "true" + - name: FELIX_TYPHAK8SSERVICENAME + value: "calico-typha" + # Set Felix endpoint to host default action to ACCEPT. + - name: FELIX_DEFAULTENDPOINTTOHOSTACTION + value: "ACCEPT" + # This will make Felix honor AWS VPC CNI's mangle table + # rules. + - name: FELIX_IPTABLESMANGLEALLOWACTION + value: Return + # Disable IPV6 on Kubernetes. + - name: FELIX_IPV6SUPPORT + value: "false" + # Wait for the datastore. + - name: WAIT_FOR_DATASTORE + value: "true" + - name: FELIX_LOGSEVERITYSYS + value: "none" + - name: FELIX_PROMETHEUSMETRICSENABLED + value: "true" + - name: NO_DEFAULT_POOLS + value: "true" + # Set based on the k8s node name. + - name: NODENAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + # No IP address needed. + - name: IP + value: "" + - name: FELIX_HEALTHENABLED + value: "true" + securityContext: + privileged: true + livenessProbe: + exec: + command: + - /bin/calico-node + - -felix-live + periodSeconds: 10 + initialDelaySeconds: 10 + failureThreshold: 6 + readinessProbe: + exec: + command: + - /bin/calico-node + - -felix-ready + periodSeconds: 10 + volumeMounts: + - mountPath: /lib/modules + name: lib-modules + readOnly: true + - mountPath: /run/xtables.lock + name: xtables-lock + readOnly: false + - mountPath: /var/run/calico + name: var-run-calico + readOnly: false + - mountPath: /var/lib/calico + name: var-lib-calico + readOnly: false + volumes: + # Used to ensure proper kmods are installed. + - name: lib-modules + hostPath: + path: /lib/modules + - name: var-run-calico + hostPath: + path: /var/run/calico + - name: var-lib-calico + hostPath: + path: /var/lib/calico + - name: xtables-lock + hostPath: + path: /run/xtables.lock + type: FileOrCreate + tolerations: + # Make sure calico/node gets scheduled on all nodes. + - effect: NoSchedule + operator: Exists + # Mark the pod as a critical add-on for rescheduling. + - key: CriticalAddonsOnly + operator: Exists + - effect: NoExecute + operator: Exists + +--- + +# Create all the CustomResourceDefinitions needed for +# Calico policy-only mode. + +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: felixconfigurations.crd.projectcalico.org +spec: + scope: Cluster + group: crd.projectcalico.org + versions: + - name: v1 + served: true + storage: true + names: + kind: FelixConfiguration + plural: felixconfigurations + singular: felixconfiguration + +--- + +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: ipamblocks.crd.projectcalico.org +spec: + scope: Cluster + group: crd.projectcalico.org + versions: + - name: v1 + served: true + storage: true + names: + kind: IPAMBlock + plural: ipamblocks + singular: ipamblock + +--- + +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: blockaffinities.crd.projectcalico.org +spec: + scope: Cluster + group: crd.projectcalico.org + versions: + - name: v1 + served: true + storage: true + names: + kind: BlockAffinity + plural: blockaffinities + singular: blockaffinity + +--- + +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: bgpconfigurations.crd.projectcalico.org +spec: + scope: Cluster + group: crd.projectcalico.org + versions: + - name: v1 + served: true + storage: true + names: + kind: BGPConfiguration + plural: bgpconfigurations + singular: bgpconfiguration + +--- +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: bgppeers.crd.projectcalico.org +spec: + scope: Cluster + group: crd.projectcalico.org + versions: + - name: v1 + served: true + storage: true + names: + kind: BGPPeer + plural: bgppeers + singular: bgppeer +--- + +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: ippools.crd.projectcalico.org +spec: + scope: Cluster + group: crd.projectcalico.org + versions: + - name: v1 + served: true + storage: true + names: + kind: IPPool + plural: ippools + singular: ippool + +--- + +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: hostendpoints.crd.projectcalico.org +spec: + scope: Cluster + group: crd.projectcalico.org + versions: + - name: v1 + served: true + storage: true + names: + kind: HostEndpoint + plural: hostendpoints + singular: hostendpoint + +--- + +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: clusterinformations.crd.projectcalico.org +spec: + scope: Cluster + group: crd.projectcalico.org + versions: + - name: v1 + served: true + storage: true + names: + kind: ClusterInformation + plural: clusterinformations + singular: clusterinformation + +--- + +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: globalnetworkpolicies.crd.projectcalico.org +spec: + scope: Cluster + group: crd.projectcalico.org + versions: + - name: v1 + served: true + storage: true + names: + kind: GlobalNetworkPolicy + plural: globalnetworkpolicies + singular: globalnetworkpolicy + +--- + +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: globalnetworksets.crd.projectcalico.org +spec: + scope: Cluster + group: crd.projectcalico.org + versions: + - name: v1 + served: true + storage: true + names: + kind: GlobalNetworkSet + plural: globalnetworksets + singular: globalnetworkset + +--- + +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: networkpolicies.crd.projectcalico.org +spec: + scope: Namespaced + group: crd.projectcalico.org + versions: + - name: v1 + served: true + storage: true + names: + kind: NetworkPolicy + plural: networkpolicies + singular: networkpolicy + +--- + +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: networksets.crd.projectcalico.org +spec: + scope: Namespaced + group: crd.projectcalico.org + versions: + - name: v1 + served: true + storage: true + names: + kind: NetworkSet + plural: networksets + singular: networkset + +--- + +# Create the ServiceAccount and roles necessary for Calico. + +apiVersion: v1 +kind: ServiceAccount +metadata: + name: calico-node + namespace: kube-system + +--- + +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: calico-node +rules: + # The CNI plugin needs to get pods, nodes, and namespaces. + - apiGroups: [""] + resources: + - pods + - nodes + - namespaces + verbs: + - get + - apiGroups: [""] + resources: + - endpoints + - services + verbs: + # Used to discover service IPs for advertisement. + - watch + - list + # Used to discover Typhas. + - get + - apiGroups: [""] + resources: + - nodes/status + verbs: + # Needed for clearing NodeNetworkUnavailable flag. + - patch + # Calico stores some configuration information in node annotations. + - update + # Watch for changes to Kubernetes NetworkPolicies. + - apiGroups: ["networking.k8s.io"] + resources: + - networkpolicies + verbs: + - watch + - list + # Used by Calico for policy information. + - apiGroups: [""] + resources: + - pods + - namespaces + - serviceaccounts + verbs: + - list + - watch + # The CNI plugin patches pods/status. + - apiGroups: [""] + resources: + - pods/status + verbs: + - patch + # Calico monitors various CRDs for config. + - apiGroups: ["crd.projectcalico.org"] + resources: + - globalfelixconfigs + - felixconfigurations + - bgppeers + - globalbgpconfigs + - bgpconfigurations + - ippools + - ipamblocks + - globalnetworkpolicies + - globalnetworksets + - networkpolicies + - networksets + - clusterinformations + - hostendpoints + - blockaffinities + verbs: + - get + - list + - watch + # Calico must create and update some CRDs on startup. + - apiGroups: ["crd.projectcalico.org"] + resources: + - ippools + - felixconfigurations + - clusterinformations + verbs: + - create + - update + # Calico stores some configuration information on the node. + - apiGroups: [""] + resources: + - nodes + verbs: + - get + - list + - watch + # These permissions are only requried for upgrade from v2.6, and can + # be removed after upgrade or on fresh installations. + - apiGroups: ["crd.projectcalico.org"] + resources: + - bgpconfigurations + - bgppeers + verbs: + - create + - update + # These permissions are required for Calico CNI to perform IPAM allocations. + - apiGroups: ["crd.projectcalico.org"] + resources: + - blockaffinities + - ipamblocks + - ipamhandles + verbs: + - get + - list + - create + - update + - delete + - apiGroups: ["crd.projectcalico.org"] + resources: + - ipamconfigs + verbs: + - get + # Block affinities must also be watchable by confd for route aggregation. + - apiGroups: ["crd.projectcalico.org"] + resources: + - blockaffinities + verbs: + - watch + # The Calico IPAM migration needs to get daemonsets. These permissions can be + # removed if not upgrading from an installation using host-local IPAM. + - apiGroups: ["apps"] + resources: + - daemonsets + verbs: + - get + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: calico-node +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: calico-node +subjects: + - kind: ServiceAccount + name: calico-node + namespace: kube-system + +--- + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: calico-typha + namespace: kube-system + labels: + k8s-app: calico-typha +spec: + revisionHistoryLimit: 2 + selector: + matchLabels: + k8s-app: calico-typha + template: + metadata: + labels: + k8s-app: calico-typha + annotations: + cluster-autoscaler.kubernetes.io/safe-to-evict: 'true' + spec: + priorityClassName: system-cluster-critical + nodeSelector: + beta.kubernetes.io/os: linux + tolerations: + # Mark the pod as a critical add-on for rescheduling. + - key: CriticalAddonsOnly + operator: Exists + hostNetwork: true + serviceAccountName: calico-node + # fsGroup allows using projected serviceaccount tokens as described here kubernetes/kubernetes#82573 + securityContext: + fsGroup: 65534 + containers: + - image: quay.io/calico/typha:v3.13.0 + name: calico-typha + ports: + - containerPort: 5473 + name: calico-typha + protocol: TCP + env: + # Use eni not cali for interface prefix + - name: FELIX_INTERFACEPREFIX + value: "eni" + - name: TYPHA_LOGFILEPATH + value: "none" + - name: TYPHA_LOGSEVERITYSYS + value: "none" + - name: TYPHA_LOGSEVERITYSCREEN + value: "info" + - name: TYPHA_PROMETHEUSMETRICSENABLED + value: "true" + - name: TYPHA_CONNECTIONREBALANCINGMODE + value: "kubernetes" + - name: TYPHA_PROMETHEUSMETRICSPORT + value: "9093" + - name: TYPHA_DATASTORETYPE + value: "kubernetes" + - name: TYPHA_MAXCONNECTIONSLOWERLIMIT + value: "1" + - name: TYPHA_HEALTHENABLED + value: "true" + # This will make Felix honor AWS VPC CNI's mangle table + # rules. + - name: FELIX_IPTABLESMANGLEALLOWACTION + value: Return + livenessProbe: + httpGet: + path: /liveness + port: 9098 + host: localhost + periodSeconds: 30 + initialDelaySeconds: 30 + securityContext: + runAsNonRoot: true + allowPrivilegeEscalation: false + readinessProbe: + httpGet: + path: /readiness + port: 9098 + host: localhost + periodSeconds: 10 + +--- + +# This manifest creates a Pod Disruption Budget for Typha to allow K8s Cluster Autoscaler to evict +apiVersion: policy/v1beta1 +kind: PodDisruptionBudget +metadata: + name: calico-typha + namespace: kube-system + labels: + k8s-app: calico-typha +spec: + maxUnavailable: 1 + selector: + matchLabels: + k8s-app: calico-typha + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: typha-cpha +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: typha-cpha +subjects: + - kind: ServiceAccount + name: typha-cpha + namespace: kube-system + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: typha-cpha +rules: + - apiGroups: [""] + resources: ["nodes"] + verbs: ["watch", "list"] + +--- + +kind: ConfigMap +apiVersion: v1 +metadata: + name: calico-typha-horizontal-autoscaler + namespace: kube-system +data: + ladder: |- + { + "coresToReplicas": [], + "nodesToReplicas": + [ + [1, 1], + [10, 2], + [100, 3], + [250, 4], + [500, 5], + [1000, 6], + [1500, 7], + [2000, 8] + ] + } + +--- + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: calico-typha-horizontal-autoscaler + namespace: kube-system + labels: + k8s-app: calico-typha-autoscaler +spec: + selector: + matchLabels: + k8s-app: calico-typha-autoscaler + replicas: 1 + template: + metadata: + labels: + k8s-app: calico-typha-autoscaler + spec: + priorityClassName: system-cluster-critical + nodeSelector: + beta.kubernetes.io/os: linux + containers: + - image: k8s.gcr.io/cluster-proportional-autoscaler-amd64:1.7.1 + name: autoscaler + command: + - /cluster-proportional-autoscaler + - --namespace=kube-system + - --configmap=calico-typha-horizontal-autoscaler + - --target=deployment/calico-typha + - --logtostderr=true + - --v=2 + resources: + requests: + cpu: 10m + limits: + cpu: 10m + serviceAccountName: typha-cpha + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: typha-cpha + namespace: kube-system +rules: + - apiGroups: [""] + resources: ["configmaps"] + verbs: ["get"] + - apiGroups: ["extensions", "apps"] + resources: ["deployments/scale"] + verbs: ["get", "update"] + +--- + +apiVersion: v1 +kind: ServiceAccount +metadata: + name: typha-cpha + namespace: kube-system + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: typha-cpha + namespace: kube-system +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: typha-cpha +subjects: + - kind: ServiceAccount + name: typha-cpha + namespace: kube-system + +--- + +apiVersion: v1 +kind: Service +metadata: + name: calico-typha + namespace: kube-system + labels: + k8s-app: calico-typha +spec: + ports: + - port: 5473 + protocol: TCP + targetPort: calico-typha + name: calico-typha + selector: + k8s-app: calico-typha diff --git a/config/next/cni-metrics-helper-cn.yaml b/config/next/cni-metrics-helper-cn.yaml new file mode 100644 index 00000000000..508b178ba53 --- /dev/null +++ b/config/next/cni-metrics-helper-cn.yaml @@ -0,0 +1,83 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: cni-metrics-helper +rules: + - apiGroups: [""] + resources: + - nodes + - pods + - pods/proxy + - services + - resourcequotas + - replicationcontrollers + - limitranges + - persistentvolumeclaims + - persistentvolumes + - namespaces + - endpoints + verbs: ["list", "watch", "get"] + - apiGroups: ["extensions"] + resources: + - daemonsets + - deployments + - replicasets + verbs: ["list", "watch"] + - apiGroups: ["apps"] + resources: + - statefulsets + verbs: ["list", "watch"] + - apiGroups: ["batch"] + resources: + - cronjobs + - jobs + verbs: ["list", "watch"] + - apiGroups: ["autoscaling"] + resources: + - horizontalpodautoscalers + verbs: ["list", "watch"] +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: cni-metrics-helper + namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: cni-metrics-helper +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: cni-metrics-helper +subjects: + - kind: ServiceAccount + name: cni-metrics-helper + namespace: kube-system +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: cni-metrics-helper + namespace: kube-system + labels: + k8s-app: cni-metrics-helper +spec: + selector: + matchLabels: + k8s-app: cni-metrics-helper + template: + metadata: + labels: + k8s-app: cni-metrics-helper + spec: + serviceAccountName: cni-metrics-helper + containers: + - image: 961992271922.dkr.ecr.cn-northwest-1.amazonaws.com.cn/cni-metrics-helper:v1.6.1 + imagePullPolicy: Always + name: cni-metrics-helper + env: + - name: USE_CLOUDWATCH + value: "true" diff --git a/config/next/cni-metrics-helper-us-east-1.yaml b/config/next/cni-metrics-helper-us-east-1.yaml new file mode 100644 index 00000000000..3640d402a14 --- /dev/null +++ b/config/next/cni-metrics-helper-us-east-1.yaml @@ -0,0 +1,83 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: cni-metrics-helper +rules: + - apiGroups: [""] + resources: + - nodes + - pods + - pods/proxy + - services + - resourcequotas + - replicationcontrollers + - limitranges + - persistentvolumeclaims + - persistentvolumes + - namespaces + - endpoints + verbs: ["list", "watch", "get"] + - apiGroups: ["extensions"] + resources: + - daemonsets + - deployments + - replicasets + verbs: ["list", "watch"] + - apiGroups: ["apps"] + resources: + - statefulsets + verbs: ["list", "watch"] + - apiGroups: ["batch"] + resources: + - cronjobs + - jobs + verbs: ["list", "watch"] + - apiGroups: ["autoscaling"] + resources: + - horizontalpodautoscalers + verbs: ["list", "watch"] +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: cni-metrics-helper + namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: cni-metrics-helper +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: cni-metrics-helper +subjects: + - kind: ServiceAccount + name: cni-metrics-helper + namespace: kube-system +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: cni-metrics-helper + namespace: kube-system + labels: + k8s-app: cni-metrics-helper +spec: + selector: + matchLabels: + k8s-app: cni-metrics-helper + template: + metadata: + labels: + k8s-app: cni-metrics-helper + spec: + serviceAccountName: cni-metrics-helper + containers: + - image: 151742754352.dkr.ecr.us-gov-east-1.amazonaws.com/cni-metrics-helper:v1.6.1 + imagePullPolicy: Always + name: cni-metrics-helper + env: + - name: USE_CLOUDWATCH + value: "true" diff --git a/config/next/cni-metrics-helper-us-gov-west-1.yaml b/config/next/cni-metrics-helper-us-gov-west-1.yaml new file mode 100644 index 00000000000..49450512fdf --- /dev/null +++ b/config/next/cni-metrics-helper-us-gov-west-1.yaml @@ -0,0 +1,83 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: cni-metrics-helper +rules: + - apiGroups: [""] + resources: + - nodes + - pods + - pods/proxy + - services + - resourcequotas + - replicationcontrollers + - limitranges + - persistentvolumeclaims + - persistentvolumes + - namespaces + - endpoints + verbs: ["list", "watch", "get"] + - apiGroups: ["extensions"] + resources: + - daemonsets + - deployments + - replicasets + verbs: ["list", "watch"] + - apiGroups: ["apps"] + resources: + - statefulsets + verbs: ["list", "watch"] + - apiGroups: ["batch"] + resources: + - cronjobs + - jobs + verbs: ["list", "watch"] + - apiGroups: ["autoscaling"] + resources: + - horizontalpodautoscalers + verbs: ["list", "watch"] +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: cni-metrics-helper + namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: cni-metrics-helper +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: cni-metrics-helper +subjects: + - kind: ServiceAccount + name: cni-metrics-helper + namespace: kube-system +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: cni-metrics-helper + namespace: kube-system + labels: + k8s-app: cni-metrics-helper +spec: + selector: + matchLabels: + k8s-app: cni-metrics-helper + template: + metadata: + labels: + k8s-app: cni-metrics-helper + spec: + serviceAccountName: cni-metrics-helper + containers: + - image: 013241004608.dkr.ecr.us-gov-west-1.amazonaws.com/cni-metrics-helper:v1.6.1 + imagePullPolicy: Always + name: cni-metrics-helper + env: + - name: USE_CLOUDWATCH + value: "true" diff --git a/config/next/cni-metrics-helper.yaml b/config/next/cni-metrics-helper.yaml new file mode 100644 index 00000000000..8e35af660fc --- /dev/null +++ b/config/next/cni-metrics-helper.yaml @@ -0,0 +1,83 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: cni-metrics-helper +rules: + - apiGroups: [""] + resources: + - nodes + - pods + - pods/proxy + - services + - resourcequotas + - replicationcontrollers + - limitranges + - persistentvolumeclaims + - persistentvolumes + - namespaces + - endpoints + verbs: ["list", "watch", "get"] + - apiGroups: ["extensions"] + resources: + - daemonsets + - deployments + - replicasets + verbs: ["list", "watch"] + - apiGroups: ["apps"] + resources: + - statefulsets + verbs: ["list", "watch"] + - apiGroups: ["batch"] + resources: + - cronjobs + - jobs + verbs: ["list", "watch"] + - apiGroups: ["autoscaling"] + resources: + - horizontalpodautoscalers + verbs: ["list", "watch"] +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: cni-metrics-helper + namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: cni-metrics-helper +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: cni-metrics-helper +subjects: + - kind: ServiceAccount + name: cni-metrics-helper + namespace: kube-system +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: cni-metrics-helper + namespace: kube-system + labels: + k8s-app: cni-metrics-helper +spec: + selector: + matchLabels: + k8s-app: cni-metrics-helper + template: + metadata: + labels: + k8s-app: cni-metrics-helper + spec: + serviceAccountName: cni-metrics-helper + containers: + - image: 602401143452.dkr.ecr.us-west-2.amazonaws.com/cni-metrics-helper:v1.6.1 + imagePullPolicy: Always + name: cni-metrics-helper + env: + - name: USE_CLOUDWATCH + value: "true" diff --git a/pkg/cri/cri.go b/pkg/cri/cri.go index e2fe304d39f..12dc5b25265 100644 --- a/pkg/cri/cri.go +++ b/pkg/cri/cri.go @@ -89,10 +89,8 @@ func (c *Client) GetRunningPodSandboxes(log logger.Logger) (map[string]*SandboxI } sandboxInfos[uid] = &SandboxInfo{ - ID: sandbox.Id, - Namespace: sandbox.Metadata.Namespace, - Name: sandbox.Metadata.Name, - K8SUID: uid} + ID: sandbox.Id, + K8SUID: uid} } return sandboxInfos, nil } diff --git a/pkg/ipamd/datastore/checkpoint.go b/pkg/ipamd/datastore/checkpoint.go new file mode 100644 index 00000000000..a65513908e6 --- /dev/null +++ b/pkg/ipamd/datastore/checkpoint.go @@ -0,0 +1,113 @@ +package datastore + +import ( + "encoding/json" + "io/ioutil" + "os" + "path/filepath" +) + +// Checkpointer can persist data and (hopefully) restore it later +type Checkpointer interface { + Checkpoint(data interface{}) error + Restore(into interface{}) error +} + +// NullCheckpoint discards data and always returns "not found". For testing only! +type NullCheckpoint struct{} + +// Checkpoint implements the Checkpointer interface in the most +// trivial sense, by just discarding data. +func (c NullCheckpoint) Checkpoint(data interface{}) error { + return nil +} + +// Restore implements the Checkpointer interface in the most trivial +// sense, by always returning "not found". +func (c NullCheckpoint) Restore(into interface{}) error { + return os.ErrNotExist +} + +// TestCheckpoint maintains a snapshot in memory. +type TestCheckpoint struct { + Error error + Data interface{} +} + +// NewTestCheckpoint creates a new TestCheckpoint. +func NewTestCheckpoint(data interface{}) *TestCheckpoint { + return &TestCheckpoint{Data: data} +} + +// Checkpoint implements the Checkpointer interface. +func (c *TestCheckpoint) Checkpoint(data interface{}) error { + if c.Error != nil { + return c.Error + } + c.Data = data + return nil +} + +// Restore implements the Checkpointer interface. +func (c *TestCheckpoint) Restore(into interface{}) error { + if c.Error != nil { + return c.Error + } + // `into` is always a pointer to interface{}, but we can't + // actually make the Restore() function *interface{}, because + // that doesn't match the (widely used) `encoding.Unmarshal` + // interface :( + // Round trip through json strings instead because copying is + // hard. + buf, err := json.Marshal(c.Data) + if err != nil { + return err + } + return json.Unmarshal(buf, into) +} + +// JSONFile is a checkpointer that writes to a JSON file +type JSONFile struct { + path string +} + +// NewJSONFile creates a new JsonFile +func NewJSONFile(path string) *JSONFile { + return &JSONFile{path: path} +} + +// Checkpoint implements the Checkpointer interface +func (c *JSONFile) Checkpoint(data interface{}) error { + f, err := ioutil.TempFile(filepath.Dir(c.path), filepath.Base(c.path)+".tmp*") + if err != nil { + return err + } + + if err := json.NewEncoder(f).Encode(&data); err != nil { + os.Remove(f.Name()) + return err + } + + if err := f.Sync(); err != nil { + os.Remove(f.Name()) + return err + } + + if err := os.Rename(f.Name(), c.path); err != nil { + os.Remove(f.Name()) + return err + } + + return nil +} + +// Restore implements the Checkpointer interface +func (c *JSONFile) Restore(into interface{}) error { + f, err := os.Open(c.path) + if err != nil { + return err + } + defer f.Close() + + return json.NewDecoder(f).Decode(into) +} diff --git a/pkg/ipamd/datastore/data_store.go b/pkg/ipamd/datastore/data_store.go index 4a2bda2941e..f97c8d50a17 100644 --- a/pkg/ipamd/datastore/data_store.go +++ b/pkg/ipamd/datastore/data_store.go @@ -14,11 +14,11 @@ package datastore import ( - "sort" + "fmt" + "os" "sync" "time" - "github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi" "github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -95,15 +95,12 @@ var ( // ENIIPPool contains ENI/IP Pool information. Exported fields will be marshaled for introspection. type ENIIPPool struct { - createTime time.Time - lastUnassignedTime time.Time + createTime time.Time // IsPrimary indicates whether ENI is a primary ENI IsPrimary bool ID string // DeviceNumber is the device number of ENI (0 means the primary ENI) DeviceNumber int - // AssignedIPv4Addresses is the number of IP addresses already been assigned - AssignedIPv4Addresses int // IPv4Addresses shows whether each address is assigned, the key is IP address, which must // be in dot-decimal notation with no leading zeros and no whitespace(eg: "10.1.0.253") IPv4Addresses map[string]*AddressInfo @@ -112,20 +109,66 @@ type ENIIPPool struct { // AddressInfo contains information about an IP, Exported fields will be marshaled for introspection. type AddressInfo struct { Address string - Assigned bool // true if it is assigned to a pod + SandboxID string UnassignedTime time.Time } -// PodKey is used to locate pod IP -type PodKey struct { - name string - namespace string - sandbox string +func (e *ENIIPPool) findAddressForSandbox(sandboxID string) *AddressInfo { + for _, addr := range e.IPv4Addresses { + if addr.SandboxID == sandboxID { + return addr + } + } + return nil +} + +// AssignedIPv4Addresses is the number of IP addresses already assigned +func (e *ENIIPPool) AssignedIPv4Addresses() int { + count := 0 + for _, addr := range e.IPv4Addresses { + if addr.Assigned() { + count++ + } + } + return count +} + +// Assigned returns true iff the address is allocated to a pod/sandbox. +func (addr AddressInfo) Assigned() bool { + return addr.SandboxID != "" +} + +// InCoolingPeriod checks whether an addr is in addressCoolingPeriod +func (addr AddressInfo) inCoolingPeriod() bool { + return time.Since(addr.UnassignedTime) <= addressCoolingPeriod +} + +// ENIPool is a collection of ENIIPPool, keyed by ENI ID +type ENIPool map[string]*ENIIPPool + +// AssignedIPv4Addresses is the number of IP addresses already assigned +func (p *ENIPool) AssignedIPv4Addresses() int { + count := 0 + for _, eni := range *p { + count += eni.AssignedIPv4Addresses() + } + return count +} + +// FindAddressForSandbox returns ENI and AddressInfo or (nil, nil) if not found +func (p *ENIPool) FindAddressForSandbox(sandboxID string) (*ENIIPPool, *AddressInfo) { + for _, eni := range *p { + if addr := eni.findAddressForSandbox(sandboxID); addr != nil { + return eni, addr + } + } + return nil, nil } // PodIPInfo contains pod's IP and the device number of the ENI type PodIPInfo struct { - // IP is the IP address of pod + SandboxID string + // IP is the IPv4 address of pod IP string // DeviceNumber is the device number of the ENI DeviceNumber int @@ -133,17 +176,14 @@ type PodIPInfo struct { // DataStore contains node level ENI/IP type DataStore struct { - total int - assigned int - eniIPPools map[string]*ENIIPPool - podsIP map[PodKey]PodIPInfo - lock sync.RWMutex - log logger.Logger + total int + assigned int + eniIPPools ENIPool + lock sync.Mutex + log logger.Logger + backingStore Checkpointer } -// PodInfos contains pods IP information which uses key name_namespace_sandbox -type PodInfos map[string]PodIPInfo - // ENIInfos contains ENI IP information type ENIInfos struct { // TotalIPs is the total number of IP addresses @@ -166,15 +206,102 @@ func prometheusRegister() { } // NewDataStore returns DataStore structure -func NewDataStore(log logger.Logger) *DataStore { +func NewDataStore(log logger.Logger, backingStore Checkpointer) *DataStore { prometheusRegister() return &DataStore{ - eniIPPools: make(map[string]*ENIIPPool), - podsIP: make(map[PodKey]PodIPInfo), - log: log, + eniIPPools: make(map[string]*ENIIPPool), + log: log, + backingStore: backingStore, } } +// CheckpointFormatVersion is the version stamp used on stored checkpoints. +const CheckpointFormatVersion = "vpc-cni-ipam/1" + +// CheckpointData is the format of stored checkpoints. Note this is +// deliberately a "dumb" format since efficiency is less important +// than version stability here. +type CheckpointData struct { + Version string `json:"version"` + Allocations []CheckpointEntry `json:"allocations"` +} + +// CheckpointEntry is a "row" in the conceptual IPAM datastore, as stored +// in checkpoints. +type CheckpointEntry struct { + SandboxID string `json:"sandboxID"` + IPv4 string `json:"ipv4"` +} + +// ReadBackingStore initialises the IP allocation state from the +// configured backing store. Should be called before using data +// store. +func (ds *DataStore) ReadBackingStore() error { + ds.log.Infof("Reading ipam state from backing store") + + var data CheckpointData + err := ds.backingStore.Restore(&data) + ds.log.Debugf("backing store restore returned err %v", err) + if os.IsNotExist(err) { + // Ok, and a no-op. + return nil + } else if err != nil { + return fmt.Errorf("datastore: error reading backing store: %v", err) + } + + if data.Version != CheckpointFormatVersion { + return fmt.Errorf("datastore: unknown backing store format (%s != %s) - wrong CNI/ipamd version? (Rebooting this node will restart local pods and probably help)", data.Version, CheckpointFormatVersion) + } + + ds.lock.Lock() + defer ds.lock.Unlock() + + eniIPs := make(map[string]*ENIIPPool) + for _, eni := range ds.eniIPPools { + for _, addr := range eni.IPv4Addresses { + eniIPs[addr.Address] = eni + } + } + + for _, allocation := range data.Allocations { + eni := eniIPs[allocation.IPv4] + if eni == nil { + ds.log.Infof("datastore: Sandbox %s uses unknown IPv4 %s - presuming stale/dead", allocation.SandboxID, allocation.IPv4) + continue + } + + addr := eni.IPv4Addresses[allocation.IPv4] + ds.assignPodIPv4AddressUnsafe(allocation.SandboxID, eni, addr) + ds.log.Debugf("Recovered %s => %s/%s", allocation.SandboxID, eni.ID, addr.Address) + } + + ds.log.Debugf("Completed reading ipam state from backing store") + return nil +} + +func (ds *DataStore) writeBackingStoreUnsafe() error { + allocations := make([]CheckpointEntry, 0, ds.assigned) + + for _, eni := range ds.eniIPPools { + for _, addr := range eni.IPv4Addresses { + if addr.Assigned() { + entry := CheckpointEntry{ + SandboxID: addr.SandboxID, + IPv4: addr.Address, + } + allocations = append(allocations, entry) + } + } + } + + data := CheckpointData{ + Version: CheckpointFormatVersion, + Allocations: allocations, + } + + return ds.backingStore.Checkpoint(&data) +} + // AddENI add ENI to data store func (ds *DataStore) AddENI(eniID string, deviceNumber int, isPrimary bool) error { ds.lock.Lock() @@ -216,7 +343,7 @@ func (ds *DataStore) AddIPv4AddressToStore(eniID string, ipv4 string) error { // Prometheus gauge totalIPs.Set(float64(ds.total)) - curENI.IPv4Addresses[ipv4] = &AddressInfo{Address: ipv4, Assigned: false} + curENI.IPv4Addresses[ipv4] = &AddressInfo{Address: ipv4} ds.log.Infof("Added ENI(%s)'s IP %s to datastore", eniID, ipv4) return nil } @@ -236,18 +363,16 @@ func (ds *DataStore) DelIPv4AddressFromStore(eniID string, ipv4 string, force bo return errors.New(UnknownIPError) } - if ipAddr.Assigned { + if ipAddr.Assigned() { if !force { return errors.New(IPInUseError) } ds.log.Warnf("Force deleting assigned ip %s on eni %s", ipv4, eniID) forceRemovedIPs.Inc() - decrementAssignedCount(ds, curENI, ipAddr) - for key, info := range ds.podsIP { - if info.IP == ipv4 { - delete(ds.podsIP, key) - break - } + ds.unassignPodIPv4AddressUnsafe(curENI, ipAddr) + if err := ds.writeBackingStoreUnsafe(); err != nil { + ds.log.Warnf("Unable to update backing store: %v", err) + // Continuing because 'force' } } @@ -263,79 +388,62 @@ func (ds *DataStore) DelIPv4AddressFromStore(eniID string, ipv4 string, force bo // AssignPodIPv4Address assigns an IPv4 address to pod // It returns the assigned IPv4 address, device number, error -func (ds *DataStore) AssignPodIPv4Address(k8sPod *k8sapi.K8SPodInfo) (ip string, deviceNumber int, err error) { +func (ds *DataStore) AssignPodIPv4Address(sandboxID string) (string, int, error) { ds.lock.Lock() defer ds.lock.Unlock() ds.log.Debugf("AssignIPv4Address: IP address pool stats: total: %d, assigned %d", ds.total, ds.assigned) - podKey := PodKey{ - name: k8sPod.Name, - namespace: k8sPod.Namespace, - sandbox: k8sPod.Sandbox, - } - ipAddr, ok := ds.podsIP[podKey] - if ok { - if ipAddr.IP == k8sPod.IP && k8sPod.IP != "" { - // The caller invoke multiple times to assign(PodName/NameSpace --> same IPAddress). It is not a error, but not very efficient. - ds.log.Infof("AssignPodIPv4Address: duplicate pod assign for IP %s, name %s, namespace %s, sandbox %s", - k8sPod.IP, k8sPod.Name, k8sPod.Namespace, k8sPod.Sandbox) - return ipAddr.IP, ipAddr.DeviceNumber, nil - } - ds.log.Errorf("AssignPodIPv4Address: current IP %s is changed to IP %s for pod(name %s, namespace %s, sandbox %s)", - ipAddr, k8sPod.IP, k8sPod.Name, k8sPod.Namespace, k8sPod.Sandbox) - return "", 0, errors.New("AssignPodIPv4Address: invalid pod with multiple IP addresses") + if eni, addr := ds.eniIPPools.FindAddressForSandbox(sandboxID); addr != nil { + ds.log.Infof("AssignPodIPv4Address: duplicate pod assign for sandbox %s", sandboxID) + return addr.Address, eni.DeviceNumber, nil } - return ds.assignPodIPv4AddressUnsafe(podKey, k8sPod) -} -// It returns the assigned IPv4 address, device number, error -func (ds *DataStore) assignPodIPv4AddressUnsafe(podKey PodKey, k8sPod *k8sapi.K8SPodInfo) (ip string, deviceNumber int, err error) { for _, eni := range ds.eniIPPools { - if (k8sPod.IP == "") && (len(eni.IPv4Addresses) == eni.AssignedIPv4Addresses) { - // Skip this ENI, since it has no available IP addresses - ds.log.Debugf("AssignPodIPv4Address: Skip ENI %s that does not have available addresses", eni.ID) - continue - } for _, addr := range eni.IPv4Addresses { - if k8sPod.IP == addr.Address { - // After L-IPAM restart and built IP warm-pool, it needs to take the existing running pod IP out of the pool. - if !addr.Assigned { - incrementAssignedCount(ds, eni, addr) + if !addr.Assigned() && !addr.inCoolingPeriod() { + ds.assignPodIPv4AddressUnsafe(sandboxID, eni, addr) + if err := ds.writeBackingStoreUnsafe(); err != nil { + ds.log.Warnf("Failed to update backing store: %v", err) + // Important! Unwind assignment + ds.unassignPodIPv4AddressUnsafe(eni, addr) + return "", 0, err } - ds.log.Infof("AssignPodIPv4Address: Reassign IP %v to pod (name %s, namespace %s)", - addr.Address, k8sPod.Name, k8sPod.Namespace) - ds.podsIP[podKey] = PodIPInfo{IP: addr.Address, DeviceNumber: eni.DeviceNumber} - return addr.Address, eni.DeviceNumber, nil - } - if !addr.Assigned && k8sPod.IP == "" && !addr.inCoolingPeriod() { - // This is triggered by a pod's Add Network command from CNI plugin - incrementAssignedCount(ds, eni, addr) - ds.log.Infof("AssignPodIPv4Address: Assign IP %v to pod (name %s, namespace %s sandbox %s)", - addr.Address, k8sPod.Name, k8sPod.Namespace, k8sPod.Sandbox) - ds.podsIP[podKey] = PodIPInfo{IP: addr.Address, DeviceNumber: eni.DeviceNumber} + return addr.Address, eni.DeviceNumber, nil } } + ds.log.Debugf("AssignPodIPv4Address: ENI %s does not have available addresses", eni.ID) } ds.log.Errorf("DataStore has no available IP addresses") return "", 0, errors.New("assignPodIPv4AddressUnsafe: no available IP addresses") } -func incrementAssignedCount(ds *DataStore, eni *ENIIPPool, addr *AddressInfo) { +// It returns the assigned IPv4 address, device number +func (ds *DataStore) assignPodIPv4AddressUnsafe(sandboxID string, eni *ENIIPPool, addr *AddressInfo) (string, int) { + ds.log.Infof("AssignPodIPv4Address: Assign IP %v to sandbox %s", + addr.Address, sandboxID) + + if addr.Assigned() { + panic("addr already assigned") + } + addr.SandboxID = sandboxID // This marks the addr as assigned + ds.assigned++ - eni.AssignedIPv4Addresses++ - addr.Assigned = true // Prometheus gauge assignedIPs.Set(float64(ds.assigned)) + + return addr.Address, eni.DeviceNumber } -func decrementAssignedCount(ds *DataStore, eni *ENIIPPool, addr *AddressInfo) { +func (ds *DataStore) unassignPodIPv4AddressUnsafe(eni *ENIIPPool, addr *AddressInfo) { + if addr.SandboxID == "" { + // Already unassigned + return + } + ds.log.Infof("UnAssignPodIPv4Address: Unassign IP %v from sandbox %s", + addr.Address, addr.SandboxID) + addr.SandboxID = "" // unassign the addr ds.assigned-- - eni.AssignedIPv4Addresses-- - addr.Assigned = false - curTime := time.Now() - eni.lastUnassignedTime = curTime - addr.UnassignedTime = curTime // Prometheus gauge assignedIPs.Set(float64(ds.assigned)) } @@ -351,7 +459,7 @@ func (ds *DataStore) isRequiredForWarmIPTarget(warmIPTarget int, eni *ENIIPPool) otherWarmIPs := 0 for _, other := range ds.eniIPPools { if other.ID != eni.ID { - otherWarmIPs += len(other.IPv4Addresses) - other.AssignedIPv4Addresses + otherWarmIPs += len(other.IPv4Addresses) - other.AssignedIPv4Addresses() } } return otherWarmIPs < warmIPTarget @@ -414,31 +522,29 @@ func (e *ENIIPPool) isTooYoung() bool { // HasIPInCooling returns true if an IP address was unassigned recently. func (e *ENIIPPool) hasIPInCooling() bool { - return time.Since(e.lastUnassignedTime) < addressENICoolingPeriod + for _, addr := range e.IPv4Addresses { + if addr.inCoolingPeriod() { + return true + } + } + return false } // HasPods returns true if the ENI has pods assigned to it. func (e *ENIIPPool) hasPods() bool { - return e.AssignedIPv4Addresses != 0 + return e.AssignedIPv4Addresses() != 0 } // GetENINeedsIP finds an ENI in the datastore that needs more IP addresses allocated func (ds *DataStore) GetENINeedsIP(maxIPperENI int, skipPrimary bool) *ENIIPPool { - // NOTE(jaypipes): Some tests rely on key order so we iterate over the IP - // pool structs here in sorted key order. - // TODO(jaypipes): Don't use a map as the primary iterator vehicle. - // Instead, use a slice of *ENIPool and use a map for existence checks only - eniIDs := make([]string, 0) - for eniID, eni := range ds.eniIPPools { + ds.lock.Lock() + defer ds.lock.Unlock() + + for _, eni := range ds.eniIPPools { if skipPrimary && eni.IsPrimary { ds.log.Debugf("Skip the primary ENI for need IP check") continue } - eniIDs = append(eniIDs, eniID) - } - sort.Strings(eniIDs) - for _, eniID := range eniIDs { - eni := ds.eniIPPools[eniID] if len(eni.IPv4Addresses) < maxIPperENI { ds.log.Debugf("Found ENI %s that has less than the maximum number of IP addresses allocated: cur=%d, max=%d", eni.ID, len(eni.IPv4Addresses), maxIPperENI) @@ -490,18 +596,17 @@ func (ds *DataStore) RemoveENIFromDataStore(eni string, force bool) error { // This scenario can occur if the reconciliation process discovered this eni was detached // from the EC2 instance outside of the control of ipamd. If this happens, there's nothing // we can do other than force all pods to be unassigned from the IPs on this eni. - ds.log.Warnf("Force removing eni %s with %d assigned pods", eni, eniIPPool.AssignedIPv4Addresses) + ds.log.Warnf("Force removing eni %s with %d assigned pods", eni, eniIPPool.AssignedIPv4Addresses()) forceRemovedENIs.Inc() - forceRemovedIPs.Add(float64(eniIPPool.AssignedIPv4Addresses)) + forceRemovedIPs.Add(float64(eniIPPool.AssignedIPv4Addresses())) for _, addr := range eniIPPool.IPv4Addresses { - if addr.Assigned { - decrementAssignedCount(ds, eniIPPool, addr) + if addr.Assigned() { + ds.unassignPodIPv4AddressUnsafe(eniIPPool, addr) } } - for key, info := range ds.podsIP { - if info.DeviceNumber == eniIPPool.DeviceNumber { - delete(ds.podsIP, key) - } + if err := ds.writeBackingStoreUnsafe(); err != nil { + ds.log.Warnf("Unable to update backing store: %v", err) + // Continuing, because 'force' } } @@ -517,55 +622,75 @@ func (ds *DataStore) RemoveENIFromDataStore(eni string, force bool) error { // UnassignPodIPv4Address a) find out the IP address based on PodName and PodNameSpace // b) mark IP address as unassigned c) returns IP address, ENI's device number, error -func (ds *DataStore) UnassignPodIPv4Address(k8sPod *k8sapi.K8SPodInfo) (ip string, deviceNumber int, err error) { +func (ds *DataStore) UnassignPodIPv4Address(sandboxID string) (ip string, deviceNumber int, err error) { ds.lock.Lock() defer ds.lock.Unlock() - ds.log.Debugf("UnassignPodIPv4Address: IP address pool stats: total:%d, assigned %d, pod(Name: %s, Namespace: %s, Sandbox %s)", - ds.total, ds.assigned, k8sPod.Name, k8sPod.Namespace, k8sPod.Sandbox) + ds.log.Debugf("UnassignPodIPv4Address: IP address pool stats: total:%d, assigned %d, sandbox %s", + ds.total, ds.assigned, sandboxID) - podKey := PodKey{ - name: k8sPod.Name, - namespace: k8sPod.Namespace, - sandbox: k8sPod.Sandbox, - } - ipAddr, ok := ds.podsIP[podKey] - if !ok { - ds.log.Warnf("UnassignPodIPv4Address: Failed to find pod %s namespace %q, sandbox %q", - k8sPod.Name, k8sPod.Namespace, k8sPod.Sandbox) + eni, addr := ds.eniIPPools.FindAddressForSandbox(sandboxID) + + if addr == nil { + ds.log.Warnf("UnassignPodIPv4Address: Failed to find sandbox %q", + sandboxID) return "", 0, ErrUnknownPod } + ds.unassignPodIPv4AddressUnsafe(eni, addr) + if err := ds.writeBackingStoreUnsafe(); err != nil { + // Unwind un-assignment + ds.assignPodIPv4AddressUnsafe(sandboxID, eni, addr) + return "", 0, err + } + addr.UnassignedTime = time.Now() + + ds.log.Infof("UnassignPodIPv4Address: sandbox %s's ipAddr %s, DeviceNumber %d", + sandboxID, addr.Address, eni.DeviceNumber) + return addr.Address, eni.DeviceNumber, nil +} + +// AllocatedIPs returns a recent snapshot of allocated sandbox<->IPs. +// Note result may already be stale by the time you look at it. +func (ds *DataStore) AllocatedIPs() []PodIPInfo { + ds.lock.Lock() + defer ds.lock.Unlock() + + ret := make([]PodIPInfo, 0, ds.eniIPPools.AssignedIPv4Addresses()) for _, eni := range ds.eniIPPools { - ip, ok := eni.IPv4Addresses[ipAddr.IP] - if ok && ip.Assigned { - decrementAssignedCount(ds, eni, ip) - ds.log.Infof("UnassignPodIPv4Address: pod (Name: %s, NameSpace %s Sandbox %s)'s ipAddr %s, DeviceNumber%d", - k8sPod.Name, k8sPod.Namespace, k8sPod.Sandbox, ip.Address, eni.DeviceNumber) - delete(ds.podsIP, podKey) - return ip.Address, eni.DeviceNumber, nil + for _, addr := range eni.IPv4Addresses { + if addr.Assigned() { + info := PodIPInfo{ + SandboxID: addr.SandboxID, + IP: addr.Address, + DeviceNumber: eni.DeviceNumber, + } + ret = append(ret, info) + } } } - - ds.log.Warnf("UnassignPodIPv4Address: Failed to find pod %s namespace %s sandbox %s using IP %s", - k8sPod.Name, k8sPod.Namespace, k8sPod.Sandbox, ipAddr.IP) - return "", 0, ErrUnknownPodIP + return ret } -// GetPodInfos provides pod IP information to introspection endpoint -func (ds *DataStore) GetPodInfos() *map[string]PodIPInfo { +// FreeableIPs returns a list of unused and potentially freeable IPs. +// Note result may already be stale by the time you look at it. +func (ds *DataStore) FreeableIPs(eniID string) []string { ds.lock.Lock() defer ds.lock.Unlock() - var podInfos = make(map[string]PodIPInfo, len(ds.podsIP)) + eni := ds.eniIPPools[eniID] + if eni == nil { + // Can't free any IPs from an ENI we don't know about... + return []string{} + } - for podKey, podInfo := range ds.podsIP { - key := podKey.name + "_" + podKey.namespace + "_" + podKey.sandbox - podInfos[key] = podInfo - ds.log.Debugf("GetPodInfos: key %s", key) + freeable := make([]string, 0, len(eni.IPv4Addresses)) + for _, addr := range eni.IPv4Addresses { + if !addr.Assigned() { + freeable = append(freeable, addr.Address) + } } - ds.log.Debugf("GetPodInfos: len %d", len(ds.podsIP)) - return &podInfos + return freeable } // GetENIInfos provides ENI IP information to introspection endpoint @@ -608,8 +733,3 @@ func (ds *DataStore) GetENIIPPools(eni string) (map[string]*AddressInfo, error) } return ipPool, nil } - -// InCoolingPeriod checks whether an addr is in addressCoolingPeriod -func (addr AddressInfo) inCoolingPeriod() bool { - return time.Since(addr.UnassignedTime) <= addressCoolingPeriod -} diff --git a/pkg/ipamd/datastore/data_store_test.go b/pkg/ipamd/datastore/data_store_test.go index 2f6ae8c1c74..2efd2000db6 100644 --- a/pkg/ipamd/datastore/data_store_test.go +++ b/pkg/ipamd/datastore/data_store_test.go @@ -14,12 +14,12 @@ package datastore import ( + "errors" "testing" "time" "github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger" - "github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi" "github.com/stretchr/testify/assert" ) @@ -32,7 +32,7 @@ var logConfig = logger.Configuration{ var log = logger.New(&logConfig) func TestAddENI(t *testing.T) { - ds := NewDataStore(log) + ds := NewDataStore(log, NullCheckpoint{}) err := ds.AddENI("eni-1", 1, true) assert.NoError(t, err) @@ -50,7 +50,7 @@ func TestAddENI(t *testing.T) { } func TestDeleteENI(t *testing.T) { - ds := NewDataStore(log) + ds := NewDataStore(log, NullCheckpoint{}) err := ds.AddENI("eni-1", 1, true) assert.NoError(t, err) @@ -79,12 +79,7 @@ func TestDeleteENI(t *testing.T) { // Add an IP and assign a pod. err = ds.AddIPv4AddressToStore("eni-1", "1.1.1.1") assert.NoError(t, err) - podInfo := &k8sapi.K8SPodInfo{ - Name: "pod-1", - Namespace: "ns-1", - IP: "1.1.1.1", - } - ip, device, err := ds.AssignPodIPv4Address(podInfo) + ip, device, err := ds.AssignPodIPv4Address("sandbox-1") assert.NoError(t, err) assert.Equal(t, "1.1.1.1", ip) assert.Equal(t, 1, device) @@ -98,7 +93,7 @@ func TestDeleteENI(t *testing.T) { } func TestAddENIIPv4Address(t *testing.T) { - ds := NewDataStore(log) + ds := NewDataStore(log, NullCheckpoint{}) err := ds.AddENI("eni-1", 1, true) assert.NoError(t, err) @@ -136,7 +131,7 @@ func TestAddENIIPv4Address(t *testing.T) { } func TestGetENIIPPools(t *testing.T) { - ds := NewDataStore(log) + ds := NewDataStore(log, NullCheckpoint{}) err := ds.AddENI("eni-1", 1, true) assert.NoError(t, err) @@ -169,7 +164,7 @@ func TestGetENIIPPools(t *testing.T) { } func TestDelENIIPv4Address(t *testing.T) { - ds := NewDataStore(log) + ds := NewDataStore(log, NullCheckpoint{}) err := ds.AddENI("eni-1", 1, true) assert.NoError(t, err) @@ -178,6 +173,12 @@ func TestDelENIIPv4Address(t *testing.T) { assert.Equal(t, ds.total, 1) assert.Equal(t, len(ds.eniIPPools["eni-1"].IPv4Addresses), 1) + // Assign a pod. + ip, device, err := ds.AssignPodIPv4Address("sandbox-1") + assert.NoError(t, err) + assert.Equal(t, "1.1.1.1", ip) + assert.Equal(t, 1, device) + err = ds.AddIPv4AddressToStore("eni-1", "1.1.1.2") assert.NoError(t, err) assert.Equal(t, ds.total, 2) @@ -199,17 +200,6 @@ func TestDelENIIPv4Address(t *testing.T) { assert.Equal(t, ds.total, 2) assert.Equal(t, len(ds.eniIPPools["eni-1"].IPv4Addresses), 2) - // Assign a pod. - podInfo := &k8sapi.K8SPodInfo{ - Name: "pod-1", - Namespace: "ns-1", - IP: "1.1.1.1", - } - ip, device, err := ds.AssignPodIPv4Address(podInfo) - assert.NoError(t, err) - assert.Equal(t, "1.1.1.1", ip) - assert.Equal(t, 1, device) - // Test force removal. The first call fails because the IP has a pod assigned to it, but the // second call force-removes it and succeeds. err = ds.DelIPv4AddressFromStore("eni-1", "1.1.1.1", false) @@ -224,7 +214,8 @@ func TestDelENIIPv4Address(t *testing.T) { } func TestPodIPv4Address(t *testing.T) { - ds := NewDataStore(log) + checkpoint := NewTestCheckpoint(struct{}{}) + ds := NewDataStore(log, checkpoint) ds.AddENI("eni-1", 1, true) @@ -232,113 +223,86 @@ func TestPodIPv4Address(t *testing.T) { ds.AddIPv4AddressToStore("eni-1", "1.1.1.1") - ds.AddIPv4AddressToStore("eni-1", "1.1.1.2") + ip, _, err := ds.AssignPodIPv4Address("sandbox-1") - ds.AddIPv4AddressToStore("eni-2", "1.1.2.2") - - podInfo := k8sapi.K8SPodInfo{ - Name: "pod-1", - Namespace: "ns-1", - IP: "1.1.1.1", - } - - ip, _, err := ds.AssignPodIPv4Address(&podInfo) - - assert.NoError(t, err) - assert.Equal(t, "1.1.1.1", ip) - assert.Equal(t, 3, ds.total) - assert.Equal(t, 2, len(ds.eniIPPools["eni-1"].IPv4Addresses)) - assert.Equal(t, 1, ds.eniIPPools["eni-1"].AssignedIPv4Addresses) - - ip, _, err = ds.AssignPodIPv4Address(&podInfo) assert.NoError(t, err) assert.Equal(t, "1.1.1.1", ip) + assert.Equal(t, 1, ds.total) + assert.Equal(t, 1, len(ds.eniIPPools["eni-1"].IPv4Addresses)) + assert.Equal(t, 1, ds.eniIPPools["eni-1"].AssignedIPv4Addresses()) + assert.Equal(t, checkpoint.Data, &CheckpointData{ + Version: CheckpointFormatVersion, + Allocations: []CheckpointEntry{ + {SandboxID: "sandbox-1", IPv4: "1.1.1.1"}, + }, + }) + + podsInfos := ds.AllocatedIPs() + assert.Equal(t, len(podsInfos), 1) - podsInfos := ds.GetPodInfos() - assert.Equal(t, len(*podsInfos), 1) + ds.AddIPv4AddressToStore("eni-2", "1.1.2.2") // duplicate add - ip, _, err = ds.AssignPodIPv4Address(&podInfo) + ip, _, err = ds.AssignPodIPv4Address("sandbox-1") // same id assert.NoError(t, err) assert.Equal(t, ip, "1.1.1.1") - assert.Equal(t, ds.total, 3) - assert.Equal(t, len(ds.eniIPPools["eni-1"].IPv4Addresses), 2) - assert.Equal(t, ds.eniIPPools["eni-1"].AssignedIPv4Addresses, 1) - - // wrong ip address - podInfo = k8sapi.K8SPodInfo{ - Name: "pod-1", - Namespace: "ns-1", - IP: "1.1.2.10", - } + assert.Equal(t, ds.total, 2) + assert.Equal(t, ds.assigned, 1) + assert.Equal(t, len(ds.eniIPPools["eni-1"].IPv4Addresses), 1) + assert.Equal(t, ds.eniIPPools["eni-1"].AssignedIPv4Addresses(), 1) + assert.Equal(t, len(ds.eniIPPools["eni-2"].IPv4Addresses), 1) + assert.Equal(t, ds.eniIPPools["eni-2"].AssignedIPv4Addresses(), 0) - _, _, err = ds.AssignPodIPv4Address(&podInfo) + // Checkpoint error + checkpoint.Error = errors.New("fake checkpoint error") + _, _, err = ds.AssignPodIPv4Address("sandbox-2") assert.Error(t, err) + assert.Equal(t, checkpoint.Data, &CheckpointData{ + Version: CheckpointFormatVersion, + Allocations: []CheckpointEntry{ + {SandboxID: "sandbox-1", IPv4: "1.1.1.1"}, + }, + }) + checkpoint.Error = nil - podInfo = k8sapi.K8SPodInfo{ - Name: "pod-1", - Namespace: "ns-2", - IP: "1.1.2.2", - } - - ip, pod1Ns2Device, err := ds.AssignPodIPv4Address(&podInfo) + ip, pod1Ns2Device, err := ds.AssignPodIPv4Address("sandbox-2") assert.NoError(t, err) assert.Equal(t, ip, "1.1.2.2") - assert.Equal(t, ds.total, 3) + assert.Equal(t, ds.total, 2) assert.Equal(t, ds.assigned, 2) assert.Equal(t, len(ds.eniIPPools["eni-2"].IPv4Addresses), 1) - assert.Equal(t, ds.eniIPPools["eni-2"].AssignedIPv4Addresses, 1) + assert.Equal(t, ds.eniIPPools["eni-2"].AssignedIPv4Addresses(), 1) + assert.Equal(t, len(checkpoint.Data.(*CheckpointData).Allocations), 2) - podsInfos = ds.GetPodInfos() - assert.Equal(t, len(*podsInfos), 2) + podsInfos = ds.AllocatedIPs() + assert.Equal(t, len(podsInfos), 2) - podInfo = k8sapi.K8SPodInfo{ - Name: "pod-1", - Namespace: "ns-3", - Sandbox: "container-1", - } + ds.AddIPv4AddressToStore("eni-1", "1.1.1.2") - ip, _, err = ds.AssignPodIPv4Address(&podInfo) + ip, _, err = ds.AssignPodIPv4Address("sandbox-3") assert.NoError(t, err) assert.Equal(t, ip, "1.1.1.2") assert.Equal(t, ds.total, 3) assert.Equal(t, ds.assigned, 3) assert.Equal(t, len(ds.eniIPPools["eni-1"].IPv4Addresses), 2) - assert.Equal(t, ds.eniIPPools["eni-1"].AssignedIPv4Addresses, 2) + assert.Equal(t, ds.eniIPPools["eni-1"].AssignedIPv4Addresses(), 2) + assert.Equal(t, len(checkpoint.Data.(*CheckpointData).Allocations), 3) // no more IP addresses - podInfo = k8sapi.K8SPodInfo{ - Name: "pod-2", - Namespace: "ns-3", - } - - _, _, err = ds.AssignPodIPv4Address(&podInfo) + _, _, err = ds.AssignPodIPv4Address("sandbox-4") assert.Error(t, err) // Unassign unknown Pod - _, _, err = ds.UnassignPodIPv4Address(&podInfo) - assert.Error(t, err) - - // Unassign pod which have same name/namespace, but different container - podInfo = k8sapi.K8SPodInfo{ - Name: "pod-1", - Namespace: "ns-3", - Sandbox: "container-2", - } - _, _, err = ds.UnassignPodIPv4Address(&podInfo) + _, _, err = ds.UnassignPodIPv4Address("sandbox-4") assert.Error(t, err) - podInfo = k8sapi.K8SPodInfo{ - Name: "pod-1", - Namespace: "ns-2", - } - - _, deviceNum, err := ds.UnassignPodIPv4Address(&podInfo) + _, deviceNum, err := ds.UnassignPodIPv4Address("sandbox-2") assert.NoError(t, err) assert.Equal(t, ds.total, 3) assert.Equal(t, ds.assigned, 2) assert.Equal(t, deviceNum, pod1Ns2Device) assert.Equal(t, len(ds.eniIPPools["eni-2"].IPv4Addresses), 1) - assert.Equal(t, ds.eniIPPools["eni-2"].AssignedIPv4Addresses, 0) + assert.Equal(t, ds.eniIPPools["eni-2"].AssignedIPv4Addresses(), 0) + assert.Equal(t, len(checkpoint.Data.(*CheckpointData).Allocations), 2) noWarmIPTarget := 0 noMinimumIPTarget := 0 @@ -348,7 +312,7 @@ func TestPodIPv4Address(t *testing.T) { assert.True(t, eni == "") ds.eniIPPools["eni-2"].createTime = time.Time{} - ds.eniIPPools["eni-2"].lastUnassignedTime = time.Time{} + ds.eniIPPools["eni-2"].IPv4Addresses["1.1.2.2"].UnassignedTime = time.Time{} eni = ds.RemoveUnusedENIFromStore(noWarmIPTarget, noMinimumIPTarget) assert.Equal(t, eni, "eni-2") @@ -357,39 +321,28 @@ func TestPodIPv4Address(t *testing.T) { } func TestWarmENIInteractions(t *testing.T) { - ds := NewDataStore(log) + ds := NewDataStore(log, NullCheckpoint{}) ds.AddENI("eni-1", 1, true) ds.AddENI("eni-2", 2, false) ds.AddENI("eni-3", 3, false) + ds.AddIPv4AddressToStore("eni-1", "1.1.1.1") + _, _, err := ds.AssignPodIPv4Address("sandbox-1") + assert.NoError(t, err) + ds.AddIPv4AddressToStore("eni-1", "1.1.1.2") + _, _, err = ds.AssignPodIPv4Address("sandbox-2") + assert.NoError(t, err) + ds.AddIPv4AddressToStore("eni-2", "1.1.2.1") ds.AddIPv4AddressToStore("eni-2", "1.1.2.2") ds.AddIPv4AddressToStore("eni-3", "1.1.3.1") - podInfo := k8sapi.K8SPodInfo{ - Name: "pod-1", - Namespace: "ns-1", - IP: "1.1.1.1", - } - _, _, err := ds.AssignPodIPv4Address(&podInfo) - assert.NoError(t, err) - - podInfo = k8sapi.K8SPodInfo{ - Name: "pod-2", - Namespace: "ns-2", - IP: "1.1.1.2", - } - _, _, err = ds.AssignPodIPv4Address(&podInfo) - assert.NoError(t, err) - noWarmIPTarget := 0 ds.eniIPPools["eni-2"].createTime = time.Time{} - ds.eniIPPools["eni-2"].lastUnassignedTime = time.Time{} ds.eniIPPools["eni-3"].createTime = time.Time{} - ds.eniIPPools["eni-3"].lastUnassignedTime = time.Time{} // We have three ENIs, 5 IPs and two pods on ENI 1. Each ENI can handle two pods. // We should not be able to remove any ENIs if either warmIPTarget >= 3 or minimumWarmIPTarget >= 5 diff --git a/pkg/ipamd/introspect.go b/pkg/ipamd/introspect.go index dacef6f40f1..17037a2be42 100644 --- a/pkg/ipamd/introspect.go +++ b/pkg/ipamd/introspect.go @@ -84,7 +84,6 @@ func (c *IPAMContext) setupIntrospectionServer() *http.Server { serverFunctions := map[string]func(w http.ResponseWriter, r *http.Request){ "/v1/enis": eniV1RequestHandler(c), "/v1/eni-configs": eniConfigRequestHandler(c), - "/v1/pods": podV1RequestHandler(c), "/v1/networkutils-env-settings": networkEnvV1RequestHandler(), "/v1/ipamd-env-settings": ipamdEnvV1RequestHandler(), } @@ -141,18 +140,6 @@ func eniV1RequestHandler(ipam *IPAMContext) func(http.ResponseWriter, *http.Requ } } -func podV1RequestHandler(ipam *IPAMContext) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - responseJSON, err := json.Marshal(ipam.dataStore.GetPodInfos()) - if err != nil { - log.Errorf("Failed to marshal pod data: %v", err) - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } - logErr(w.Write(responseJSON)) - } -} - func eniConfigRequestHandler(ipam *IPAMContext) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { responseJSON, err := json.Marshal(ipam.eniConfig.Getter()) diff --git a/pkg/ipamd/ipamd.go b/pkg/ipamd/ipamd.go index 3ddb66c094e..77eca32cbc4 100644 --- a/pkg/ipamd/ipamd.go +++ b/pkg/ipamd/ipamd.go @@ -28,13 +28,10 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "k8s.io/apimachinery/pkg/util/sets" "github.com/aws/amazon-vpc-cni-k8s/pkg/awsutils" - "github.com/aws/amazon-vpc-cni-k8s/pkg/cri" "github.com/aws/amazon-vpc-cni-k8s/pkg/eniconfig" "github.com/aws/amazon-vpc-cni-k8s/pkg/ipamd/datastore" - "github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi" "github.com/aws/amazon-vpc-cni-k8s/pkg/networkutils" ) @@ -113,6 +110,10 @@ const ( // disableENIProvisioning is used to specify that ENI doesn't need to be synced during initializing a pod. envDisableENIProvisioning = "DISABLE_NETWORK_RESOURCE_PROVISIONING" noDisableENIProvisioning = false + + // Specify where ipam should persist its current IP<->container allocations. + envBackingStorePath = "AWS_VPC_K8S_CNI_BACKING_STORE" + defaultBackingStorePath = "/var/run/aws-routed-eni/ipam.json" ) var log = logger.Get() @@ -171,10 +172,8 @@ var ( type IPAMContext struct { awsClient awsutils.APIs dataStore *datastore.DataStore - k8sClient k8sapi.K8SAPIs useCustomNetworking bool eniConfig eniconfig.ENIConfig - criClient cri.APIs networkClient networkutils.NetworkAPIs maxIPsPerENI int maxENI int @@ -288,13 +287,11 @@ func prometheusRegister() { // New retrieves IP address usage information from Instance MetaData service and Kubelet // then initializes IP address pool data store -func New(k8sapiClient k8sapi.K8SAPIs, eniConfig *eniconfig.ENIConfigController) (*IPAMContext, error) { +func New(eniConfig *eniconfig.ENIConfigController) (*IPAMContext, error) { prometheusRegister() c := &IPAMContext{} - c.k8sClient = k8sapiClient c.networkClient = networkutils.New() - c.criClient = cri.New() c.eniConfig = eniConfig client, err := awsutils.New() @@ -311,6 +308,9 @@ func New(k8sapiClient k8sapi.K8SAPIs, eniConfig *eniconfig.ENIConfigController) c.useCustomNetworking = UseCustomNetworkCfg() c.disableENIProvisioning = disablingENIProvisioning() + checkpointer := datastore.NewJSONFile(dsBackingStorePath()) + c.dataStore = datastore.NewDataStore(log, checkpointer) + err = c.nodeInit() if err != nil { return nil, err @@ -325,14 +325,6 @@ func (c *IPAMContext) nodeInit() error { log.Debugf("Start node init") - eniMetadata, tagMap, err := c.awsClient.DescribeAllENIs() - if err != nil { - return errors.New("ipamd init: failed to retrieve attached ENIs info") - } else { - log.Debugf("DescribeAllENIs success: ENIs: %d, tagged: %d", len(eniMetadata), len(tagMap)) - } - c.setUnmanagedENIs(tagMap) - enis := c.filterUnmanagedENIs(eniMetadata) nodeMaxENI, err := c.getMaxENI() if err != nil { log.Error("Failed to get ENI limit") @@ -362,7 +354,14 @@ func (c *IPAMContext) nodeInit() error { return errors.Wrap(err, "ipamd init: failed to set up host network") } - c.dataStore = datastore.NewDataStore(log) + eniMetadata, tagMap, err := c.awsClient.DescribeAllENIs() + if err != nil { + return errors.New("ipamd init: failed to retrieve attached ENIs info") + } + log.Debugf("DescribeAllENIs success: ENIs: %d, tagged: %d", len(eniMetadata), len(tagMap)) + c.setUnmanagedENIs(tagMap) + enis := c.filterUnmanagedENIs(eniMetadata) + for _, eni := range enis { log.Debugf("Discovered ENI %s, trying to set it up", eni.ENIID) // Retry ENI sync @@ -390,14 +389,10 @@ func (c *IPAMContext) nodeInit() error { time.Sleep(eniAttachTime) } } - localPods, err := c.getLocalPodsWithRetry() - if err != nil { - log.Warnf("During ipamd init, failed to get Pod information from Kubernetes API Server %v", err) - ipamdErrInc("nodeInitK8SGetLocalPodIPsFailed") - // This can happens when L-IPAMD starts before kubelet. - return errors.Wrap(err, "failed to get running pods!") + + if err := c.dataStore.ReadBackingStore(); err != nil { + return err } - log.Debugf("getLocalPodsWithRetry() found %d local pods", len(localPods)) rules, err := c.networkClient.GetRuleList() if err != nil { @@ -405,28 +400,15 @@ func (c *IPAMContext) nodeInit() error { return nil } - for _, ip := range localPods { - if ip.Sandbox == "" { - log.Infof("Skipping Pod %s, Namespace %s, due to no matching sandbox", ip.Name, ip.Namespace) - continue - } - if ip.IP == "" { - log.Infof("Skipping Pod %s, Namespace %s, due to no IP", ip.Name, ip.Namespace) - continue - } - log.Infof("Recovered AddNetwork for Pod %s, Namespace %s, Sandbox %s", ip.Name, ip.Namespace, ip.Sandbox) - _, _, err = c.dataStore.AssignPodIPv4Address(ip) - if err != nil { - ipamdErrInc("nodeInitAssignPodIPv4AddressFailed") - log.Warnf("During ipamd init, failed to use pod IP %s returned from Kubernetes API Server %v", ip.IP, err) - } + for _, info := range c.dataStore.AllocatedIPs() { + // TODO(gus): This should really be done via CNI CHECK calls, rather than in ipam (requires upstream k8s changes). // Update ip rules in case there is a change in VPC CIDRs, AWS_VPC_K8S_CNI_EXTERNALSNAT setting - srcIPNet := net.IPNet{IP: net.ParseIP(ip.IP), Mask: net.IPv4Mask(255, 255, 255, 255)} + srcIPNet := net.IPNet{IP: net.ParseIP(info.IP), Mask: net.IPv4Mask(255, 255, 255, 255)} err = c.networkClient.UpdateRuleListBySrc(rules, srcIPNet, pbVPCcidrs, !c.networkClient.UseExternalSNAT()) if err != nil { - log.Errorf("UpdateRuleListBySrc in nodeInit() failed for IP %s: %v", ip.IP, err) + log.Errorf("UpdateRuleListBySrc in nodeInit() failed for IP %s: %v", info.IP, err) } } // For a new node, attach IPs @@ -442,68 +424,6 @@ func (c *IPAMContext) updateIPStats(unmanaged int) { enisMax.Set(float64(c.maxENI - unmanaged)) } -func (c *IPAMContext) getLocalPodsWithRetry() ([]*k8sapi.K8SPodInfo, error) { - var pods []*k8sapi.K8SPodInfo - var err error - for retry := 1; retry <= maxK8SRetries; retry++ { - pods, err = c.k8sClient.K8SGetLocalPodIPs() - if err == nil { - // Check for pods with no IP since the API server might not have the latest state of the node. - allPodsHaveAnIP := true - for _, pod := range pods { - if pod.IP == "" { - log.Infof("Pod %s, Namespace %s, has no IP", pod.Name, pod.Namespace) - allPodsHaveAnIP = false - } - } - if allPodsHaveAnIP { - break - } - log.Warnf("Not all pods have an IP, trying again in %v seconds.", retryK8SInterval.Seconds()) - } - log.Infof("Not able to get local pods yet (attempt %d/%d): %v", retry, maxK8SRetries, err) - time.Sleep(retryK8SInterval) - } - - if err != nil { - return nil, errors.Wrap(err, "no pods because apiserver not running.") - } - - if pods == nil { - return nil, nil - } - - // Ask the CRI for the set of running pod sandboxes. These sandboxes are - // what the CNI operates on, but the Kubernetes API doesn't expose any - // information about them. If we relied only on the Kubernetes API, we - // could leak IPs or unassign an IP from a still-running pod. - var sandboxes map[string]*cri.SandboxInfo - for retry := 1; retry <= maxK8SRetries; retry++ { - sandboxes, err = c.criClient.GetRunningPodSandboxes(log) - if err == nil { - break - } - log.Infof("Not able to get local pod sandboxes yet (attempt %d/%d): %v", retry, maxK8SRetries, err) - time.Sleep(retryK8SInterval) - } - if err != nil { - return nil, errors.Wrap(err, "Unable to get local pod sandboxes") - } - - // TODO consider using map - for _, pod := range pods { - // Fill in the sandbox ID by matching against the pod's UID - for _, sandbox := range sandboxes { - if sandbox.K8SUID == pod.UID { - log.Debugf("Found pod(%v)'s sandbox ID: %v ", sandbox.Name, sandbox.ID) - pod.Sandbox = sandbox.ID - break - } - } - } - return pods, nil -} - // StartNodeIPPoolManager monitors the IP pool, add or del them when it is required. func (c *IPAMContext) StartNodeIPPoolManager() { sleepDuration := ipPoolMonitorInterval / 2 @@ -621,38 +541,14 @@ func (c *IPAMContext) tryUnassignIPsFromAll() { // findFreeableIPs finds and returns IPs that are not assigned to Pods but are attached // to ENIs on the node. func (c *IPAMContext) findFreeableIPs(eni string) ([]string, error) { - podIPInfos := c.dataStore.GetPodInfos() - usedIPs := sets.String{} - // Get IPs that are currently in use by pods - for _, pod := range *podIPInfos { - usedIPs.Insert(pod.IP) - } - - // Get IPs that are currently attached to the instance - eniInfos := c.dataStore.GetENIInfos() - eniIPPools := eniInfos.ENIIPPools - - pool, ok := eniIPPools[eni] - if !ok { - return nil, fmt.Errorf("error finding available IPs: eni %s does not exist", eni) - } - - allocatedIPs := sets.String{} - for _, ip := range pool.IPv4Addresses { - allocatedIPs.Insert(ip.Address) - } - - availableIPs := allocatedIPs.Difference(usedIPs).UnsortedList() - var freeableIPs []string + freeableIPs := c.dataStore.FreeableIPs(eni) // Free the number of IPs `over` the warm IP target, unless `over` is greater than the number of available IPs on // this ENI. In that case we should only free the number of available IPs. _, over, _ := c.ipTargetState() - numFreeable := min(over, len(availableIPs)) + numFreeable := min(over, len(freeableIPs)) + freeableIPs = freeableIPs[:numFreeable] - for _, ip := range availableIPs[:numFreeable] { - freeableIPs = append(freeableIPs, ip) - } return freeableIPs, nil } @@ -1124,6 +1020,13 @@ func UseCustomNetworkCfg() bool { return false } +func dsBackingStorePath() string { + if value := os.Getenv(envBackingStorePath); value != "" { + return value + } + return defaultBackingStorePath +} + func getWarmIPTarget() int { inputStr, found := os.LookupEnv(envWarmIPTarget) diff --git a/pkg/ipamd/ipamd_test.go b/pkg/ipamd/ipamd_test.go index 5e4f008a3b1..8f73e773c0f 100644 --- a/pkg/ipamd/ipamd_test.go +++ b/pkg/ipamd/ipamd_test.go @@ -22,12 +22,8 @@ import ( "github.com/aws/amazon-vpc-cni-k8s/pkg/apis/crd/v1alpha1" "github.com/aws/amazon-vpc-cni-k8s/pkg/awsutils" mock_awsutils "github.com/aws/amazon-vpc-cni-k8s/pkg/awsutils/mocks" - "github.com/aws/amazon-vpc-cni-k8s/pkg/cri" - mock_cri "github.com/aws/amazon-vpc-cni-k8s/pkg/cri/mocks" mock_eniconfig "github.com/aws/amazon-vpc-cni-k8s/pkg/eniconfig/mocks" "github.com/aws/amazon-vpc-cni-k8s/pkg/ipamd/datastore" - "github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi" - mock_k8sapi "github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi/mocks" mock_networkutils "github.com/aws/amazon-vpc-cni-k8s/pkg/networkutils/mocks" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" @@ -55,41 +51,45 @@ const ( func setup(t *testing.T) (*gomock.Controller, *mock_awsutils.MockAPIs, - *mock_k8sapi.MockK8SAPIs, - *mock_cri.MockAPIs, *mock_networkutils.MockNetworkAPIs, *mock_eniconfig.MockENIConfig) { ctrl := gomock.NewController(t) return ctrl, mock_awsutils.NewMockAPIs(ctrl), - mock_k8sapi.NewMockK8SAPIs(ctrl), - mock_cri.NewMockAPIs(ctrl), mock_networkutils.NewMockNetworkAPIs(ctrl), mock_eniconfig.NewMockENIConfig(ctrl) } func TestNodeInit(t *testing.T) { - ctrl, mockAWS, mockK8S, mockCRI, mockNetwork, _ := setup(t) + ctrl, mockAWS, mockNetwork, _ := setup(t) defer ctrl.Finish() + fakeCheckpoint := datastore.CheckpointData{ + Version: datastore.CheckpointFormatVersion, + Allocations: []datastore.CheckpointEntry{ + {SandboxID: "sandbox-id", IPv4: ipaddr02}, + }, + } + mockContext := &IPAMContext{ awsClient: mockAWS, - k8sClient: mockK8S, maxIPsPerENI: 14, maxENI: 4, warmENITarget: 1, warmIPTarget: 3, primaryIP: make(map[string]string), terminating: int32(0), - criClient: mockCRI, - networkClient: mockNetwork} + networkClient: mockNetwork, + dataStore: datastore.NewDataStore(log, datastore.NewTestCheckpoint(fakeCheckpoint)), + } - eni1, eni2 := getDummyENIMetdata() + eni1, eni2 := getDummyENIMetadata() var cidrs []*string mockAWS.EXPECT().GetENILimit().Return(4, nil) mockAWS.EXPECT().GetENIipLimit().Return(14, nil) - mockAWS.EXPECT().GetIPv4sFromEC2(eni1.ENIID).Return(eni1.IPv4Addresses, nil) + mockAWS.EXPECT().GetIPv4sFromEC2(eni1.ENIID).AnyTimes().Return(eni1.IPv4Addresses, nil) + mockAWS.EXPECT().GetIPv4sFromEC2(eni2.ENIID).AnyTimes().Return(eni2.IPv4Addresses, nil) mockAWS.EXPECT().GetVPCIPv4CIDR().Return(vpcCIDR) _, parsedVPCCIDR, _ := net.ParseCIDR(vpcCIDR) @@ -105,14 +105,6 @@ func TestNodeInit(t *testing.T) { mockNetwork.EXPECT().SetupENINetwork(gomock.Any(), secMAC, secDevice, secSubnet) mockAWS.EXPECT().GetLocalIPv4().Return(ipaddr01) - k8sName := "/k8s_POD_" + "pod1" + "_" + "default" + "_" + "pod-uid" + "_0" - mockK8S.EXPECT().K8SGetLocalPodIPs().Return([]*k8sapi.K8SPodInfo{{Name: "pod1", - Namespace: "default", UID: "pod-uid", IP: ipaddr02}}, nil) - - var criList = make(map[string]*cri.SandboxInfo, 0) - criList["pod-uid"] = &cri.SandboxInfo{ID: "sandbox-id", - Name: k8sName, K8SUID: "pod-uid"} - mockCRI.EXPECT().GetRunningPodSandboxes(gomock.Any()).Return(criList, nil) var rules []netlink.Rule mockNetwork.EXPECT().GetRuleList().Return(rules, nil) @@ -126,7 +118,7 @@ func TestNodeInit(t *testing.T) { assert.NoError(t, err) } -func getDummyENIMetdata() (awsutils.ENIMetadata, awsutils.ENIMetadata) { +func getDummyENIMetadata() (awsutils.ENIMetadata, awsutils.ENIMetadata) { primary := true notPrimary := false testAddr1 := ipaddr01 @@ -176,12 +168,11 @@ func TestIncreaseIPPoolCustomENI(t *testing.T) { } func testIncreaseIPPool(t *testing.T, useENIConfig bool) { - ctrl, mockAWS, mockK8S, _, mockNetwork, mockENIConfig := setup(t) + ctrl, mockAWS, mockNetwork, mockENIConfig := setup(t) defer ctrl.Finish() mockContext := &IPAMContext{ awsClient: mockAWS, - k8sClient: mockK8S, maxIPsPerENI: 14, maxENI: 4, warmENITarget: 1, @@ -192,7 +183,7 @@ func testIncreaseIPPool(t *testing.T, useENIConfig bool) { terminating: int32(0), } - mockContext.dataStore = datastore.NewDataStore(log) + mockContext.dataStore = testDatastore() primary := true notPrimary := false @@ -261,7 +252,7 @@ func testIncreaseIPPool(t *testing.T, useENIConfig bool) { func TestTryAddIPToENI(t *testing.T) { _ = os.Unsetenv(envCustomNetworkCfg) - ctrl, mockAWS, mockK8S, _, mockNetwork, mockENIConfig := setup(t) + ctrl, mockAWS, mockNetwork, mockENIConfig := setup(t) defer ctrl.Finish() primary := true @@ -274,7 +265,6 @@ func TestTryAddIPToENI(t *testing.T) { warmIpTarget := 3 mockContext := &IPAMContext{ awsClient: mockAWS, - k8sClient: mockK8S, maxIPsPerENI: 14, maxENI: 4, warmENITarget: 1, @@ -285,7 +275,7 @@ func TestTryAddIPToENI(t *testing.T) { terminating: int32(0), } - mockContext.dataStore = datastore.NewDataStore(log) + mockContext.dataStore = testDatastore() podENIConfig := &v1alpha1.ENIConfigSpec{ SecurityGroups: []string{"sg1-id", "sg2-id"}, @@ -336,18 +326,17 @@ func TestTryAddIPToENI(t *testing.T) { } func TestNodeIPPoolReconcile(t *testing.T) { - ctrl, mockAWS, mockK8S, _, mockNetwork, _ := setup(t) + ctrl, mockAWS, mockNetwork, _ := setup(t) defer ctrl.Finish() mockContext := &IPAMContext{ awsClient: mockAWS, - k8sClient: mockK8S, networkClient: mockNetwork, primaryIP: make(map[string]string), terminating: int32(0), } - mockContext.dataStore = datastore.NewDataStore(log) + mockContext.dataStore = testDatastore() primary := true notPrimary := false @@ -410,7 +399,7 @@ func TestNodeIPPoolReconcile(t *testing.T) { } func TestGetWarmENITarget(t *testing.T) { - ctrl, _, _, _, _, _ := setup(t) + ctrl, _, _, _ := setup(t) defer ctrl.Finish() _ = os.Setenv("WARM_IP_TARGET", "5") @@ -427,18 +416,17 @@ func TestGetWarmENITarget(t *testing.T) { } func TestGetWarmIPTargetState(t *testing.T) { - ctrl, mockAWS, mockK8S, _, mockNetwork, _ := setup(t) + ctrl, mockAWS, mockNetwork, _ := setup(t) defer ctrl.Finish() mockContext := &IPAMContext{ awsClient: mockAWS, - k8sClient: mockK8S, networkClient: mockNetwork, primaryIP: make(map[string]string), terminating: int32(0), } - mockContext.dataStore = datastore.NewDataStore(log) + mockContext.dataStore = testDatastore() _, _, warmIPTargetDefined := mockContext.ipTargetState() assert.False(t, warmIPTargetDefined) @@ -471,7 +459,7 @@ func TestGetWarmIPTargetState(t *testing.T) { } func TestIPAMContext_nodeIPPoolTooLow(t *testing.T) { - ctrl, mockAWS, mockK8S, _, mockNetwork, mockENIConfig := setup(t) + ctrl, mockAWS, mockNetwork, mockENIConfig := setup(t) defer ctrl.Finish() type fields struct { @@ -486,9 +474,9 @@ func TestIPAMContext_nodeIPPoolTooLow(t *testing.T) { fields fields want bool }{ - {"Test new ds, all defaults", fields{14, 1, 0, datastore.NewDataStore(log)}, true}, - {"Test new ds, 0 ENIs", fields{14, 0, 0, datastore.NewDataStore(log)}, true}, - {"Test new ds, 3 warm IPs", fields{14, 0, 3, datastore.NewDataStore(log)}, true}, + {"Test new ds, all defaults", fields{14, 1, 0, testDatastore()}, true}, + {"Test new ds, 0 ENIs", fields{14, 0, 0, testDatastore()}, true}, + {"Test new ds, 3 warm IPs", fields{14, 0, 3, testDatastore()}, true}, {"Test 3 unused IPs, 1 warm", fields{3, 1, 1, datastoreWith3FreeIPs()}, false}, {"Test 1 used, 1 warm ENI", fields{3, 1, 0, datastoreWith1Pod1()}, true}, {"Test 1 used, 0 warm ENI", fields{3, 0, 0, datastoreWith1Pod1()}, false}, @@ -500,7 +488,6 @@ func TestIPAMContext_nodeIPPoolTooLow(t *testing.T) { c := &IPAMContext{ awsClient: mockAWS, dataStore: tt.fields.datastore, - k8sClient: mockK8S, useCustomNetworking: false, eniConfig: mockENIConfig, networkClient: mockNetwork, @@ -516,8 +503,12 @@ func TestIPAMContext_nodeIPPoolTooLow(t *testing.T) { } } +func testDatastore() *datastore.DataStore { + return datastore.NewDataStore(log, datastore.NewTestCheckpoint(datastore.CheckpointData{Version: datastore.CheckpointFormatVersion})) +} + func datastoreWith3FreeIPs() *datastore.DataStore { - datastoreWith3FreeIPs := datastore.NewDataStore(log) + datastoreWith3FreeIPs := testDatastore() _ = datastoreWith3FreeIPs.AddENI(primaryENIid, 1, true) _ = datastoreWith3FreeIPs.AddIPv4AddressToStore(primaryENIid, ipaddr01) _ = datastoreWith3FreeIPs.AddIPv4AddressToStore(primaryENIid, ipaddr02) @@ -528,45 +519,23 @@ func datastoreWith3FreeIPs() *datastore.DataStore { func datastoreWith1Pod1() *datastore.DataStore { datastoreWith1Pod1 := datastoreWith3FreeIPs() - podInfo1 := k8sapi.K8SPodInfo{ - Name: "pod-1", - Namespace: "ns-1", - IP: ipaddr01, - } - _, _, _ = datastoreWith1Pod1.AssignPodIPv4Address(&podInfo1) + _, _, _ = datastoreWith1Pod1.AssignPodIPv4Address("sandbox-1") return datastoreWith1Pod1 } func datastoreWith3Pods() *datastore.DataStore { datastoreWith3Pods := datastoreWith3FreeIPs() - podInfo1 := k8sapi.K8SPodInfo{ - Name: "pod-1", - Namespace: "ns-1", - IP: ipaddr01, - } - _, _, _ = datastoreWith3Pods.AssignPodIPv4Address(&podInfo1) - - podInfo2 := k8sapi.K8SPodInfo{ - Name: "pod-2", - Namespace: "ns-1", - IP: ipaddr02, - } - _, _, _ = datastoreWith3Pods.AssignPodIPv4Address(&podInfo2) - - podInfo3 := k8sapi.K8SPodInfo{ - Name: "pod-3", - Namespace: "ns-1", - IP: ipaddr03, - } - _, _, _ = datastoreWith3Pods.AssignPodIPv4Address(&podInfo3) + _, _, _ = datastoreWith3Pods.AssignPodIPv4Address("sandbox-1") + _, _, _ = datastoreWith3Pods.AssignPodIPv4Address("sandbox-2") + _, _, _ = datastoreWith3Pods.AssignPodIPv4Address("sandbox-3") return datastoreWith3Pods } func TestIPAMContext_filterUnmanagedENIs(t *testing.T) { ctrl := gomock.NewController(t) - eni1, eni2 := getDummyENIMetdata() + eni1, eni2 := getDummyENIMetadata() allENIs := []awsutils.ENIMetadata{eni1, eni2} primaryENIonly := []awsutils.ENIMetadata{eni1} eni1TagMap := map[string]awsutils.TagMap{eni1.ENIID: {"hi": "tag", eniNoManageTagKey: "true"}} @@ -597,7 +566,7 @@ func TestIPAMContext_filterUnmanagedENIs(t *testing.T) { } func TestDisablingENIProvisioning(t *testing.T) { - ctrl, _, _, _, _, _ := setup(t) + ctrl, _, _, _ := setup(t) defer ctrl.Finish() _ = os.Setenv(envDisableENIProvisioning, "true") diff --git a/pkg/ipamd/rpc_handler.go b/pkg/ipamd/rpc_handler.go index 9ba52fb3000..816dae9e2f7 100644 --- a/pkg/ipamd/rpc_handler.go +++ b/pkg/ipamd/rpc_handler.go @@ -27,8 +27,6 @@ import ( healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" - "github.com/aws/amazon-vpc-cni-k8s/pkg/ipamd/datastore" - "github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi" "github.com/aws/amazon-vpc-cni-k8s/rpc" ) @@ -44,13 +42,10 @@ type server struct { // AddNetwork processes CNI add network request and return an IP address for container func (s *server) AddNetwork(ctx context.Context, in *rpc.AddNetworkRequest) (*rpc.AddNetworkReply, error) { - log.Infof("Received AddNetwork for NS %s, Pod %s, NameSpace %s, Sandbox %s, ifname %s", - in.Netns, in.K8S_POD_NAME, in.K8S_POD_NAMESPACE, in.K8S_POD_INFRA_CONTAINER_ID, in.IfName) + log.Infof("Received AddNetwork for NS %s, Sandbox %s, ifname %s", + in.Netns, in.K8S_POD_INFRA_CONTAINER_ID, in.IfName) - addr, deviceNumber, err := s.ipamContext.dataStore.AssignPodIPv4Address(&k8sapi.K8SPodInfo{ - Name: in.K8S_POD_NAME, - Namespace: in.K8S_POD_NAMESPACE, - Sandbox: in.K8S_POD_INFRA_CONTAINER_ID}) + addr, deviceNumber, err := s.ipamContext.dataStore.AssignPodIPv4Address(in.K8S_POD_INFRA_CONTAINER_ID) var pbVPCcidrs []string for _, cidr := range s.ipamContext.awsClient.GetVPCIPv4CIDRs() { @@ -80,21 +75,12 @@ func (s *server) AddNetwork(ctx context.Context, in *rpc.AddNetworkRequest) (*rp } func (s *server) DelNetwork(ctx context.Context, in *rpc.DelNetworkRequest) (*rpc.DelNetworkReply, error) { - log.Infof("Received DelNetwork for Pod %s, Namespace %s, Sandbox %s", - in.K8S_POD_NAME, in.K8S_POD_NAMESPACE, in.K8S_POD_INFRA_CONTAINER_ID) + log.Infof("Received DelNetwork for Sandbox %s", + in.K8S_POD_INFRA_CONTAINER_ID) delIPCnt.With(prometheus.Labels{"reason": in.Reason}).Inc() - ip, deviceNumber, err := s.ipamContext.dataStore.UnassignPodIPv4Address(&k8sapi.K8SPodInfo{ - Name: in.K8S_POD_NAME, - Namespace: in.K8S_POD_NAMESPACE, - Sandbox: in.K8S_POD_INFRA_CONTAINER_ID}) + ip, deviceNumber, err := s.ipamContext.dataStore.UnassignPodIPv4Address(in.K8S_POD_INFRA_CONTAINER_ID) - if err != nil && err == datastore.ErrUnknownPod { - // If L-IPAMD restarts, the pod's IP address are assigned by only pod's name and namespace due to kubelet's introspection. - ip, deviceNumber, err = s.ipamContext.dataStore.UnassignPodIPv4Address(&k8sapi.K8SPodInfo{ - Name: in.K8S_POD_NAME, - Namespace: in.K8S_POD_NAMESPACE}) - } log.Infof("Send DelNetworkReply: IPv4Addr %s, DeviceNumber: %d, err: %v", ip, deviceNumber, err) return &rpc.DelNetworkReply{Success: err == nil, IPv4Addr: ip, DeviceNumber: int32(deviceNumber)}, err diff --git a/pkg/ipamd/rpc_handler_test.go b/pkg/ipamd/rpc_handler_test.go index 8e9057d8470..827336bcd3c 100644 --- a/pkg/ipamd/rpc_handler_test.go +++ b/pkg/ipamd/rpc_handler_test.go @@ -26,19 +26,17 @@ import ( ) func TestServer_AddNetwork(t *testing.T) { - ctrl, mockAWS, mockK8S, mockCRI, mockNetwork, _ := setup(t) + ctrl, mockAWS, mockNetwork, _ := setup(t) defer ctrl.Finish() mockContext := &IPAMContext{ awsClient: mockAWS, - k8sClient: mockK8S, maxIPsPerENI: 14, maxENI: 4, warmENITarget: 1, warmIPTarget: 3, - criClient: mockCRI, networkClient: mockNetwork, - dataStore: datastore.NewDataStore(log), + dataStore: datastore.NewDataStore(log, datastore.NullCheckpoint{}), } rpcServer := server{ipamContext: mockContext} diff --git a/rpc/rpc.proto b/rpc/rpc.proto index bb3281f1ce9..34c68a99ea4 100644 --- a/rpc/rpc.proto +++ b/rpc/rpc.proto @@ -9,8 +9,6 @@ service CNIBackend { } message AddNetworkRequest { - string K8S_POD_NAME = 1; - string K8S_POD_NAMESPACE = 2; string K8S_POD_INFRA_CONTAINER_ID = 3; string Netns = 4; string IfName = 5; @@ -27,8 +25,6 @@ message AddNetworkReply { } message DelNetworkRequest { - string K8S_POD_NAME = 1; - string K8S_POD_NAMESPACE = 2; string K8S_POD_INFRA_CONTAINER_ID = 3; string Reason = 5; // next field: 6