diff --git a/local-volume/provisioner/cmd/main.go b/local-volume/provisioner/cmd/main.go index 347c9197026..e43731eaf95 100644 --- a/local-volume/provisioner/cmd/main.go +++ b/local-volume/provisioner/cmd/main.go @@ -29,9 +29,22 @@ import ( "k8s.io/client-go/kubernetes" ) +const ( + defaultEnableMonitor = false + defaultEnableProvisioner = true +) + +var ( + enableMonitor *bool + enableProvisioner *bool +) + var provisionerConfig common.ProvisionerConfiguration func init() { + enableMonitor = flag.Bool("enable-monitor", defaultEnableMonitor, "If the monitor is enabled") + enableProvisioner = flag.Bool("enable-provisioner", defaultEnableProvisioner, "If the provisioner is enabled") + provisionerConfig = common.ProvisionerConfiguration{ StorageClassConfig: make(map[string]common.MountConfig), } @@ -55,10 +68,13 @@ func main() { glog.Info("Starting controller\n") controller.StartLocalController(client, &common.UserConfig{ - Node: node, - DiscoveryMap: provisionerConfig.StorageClassConfig, - NodeLabelsForPV: provisionerConfig.NodeLabelsForPV, - }) + Node: node, + DiscoveryMap: provisionerConfig.StorageClassConfig, + NodeLabelsForPV: provisionerConfig.NodeLabelsForPV, + LabelSelectorForPV: provisionerConfig.LabelSelectorForPV, + }, + *enableMonitor, + *enableProvisioner) } func getNode(client *kubernetes.Clientset, name string) *v1.Node { diff --git a/local-volume/provisioner/deployment/kubernetes/monitor/admin_account.yaml b/local-volume/provisioner/deployment/kubernetes/monitor/admin_account.yaml new file mode 100644 index 00000000000..21bf643dc44 --- /dev/null +++ b/local-volume/provisioner/deployment/kubernetes/monitor/admin_account.yaml @@ -0,0 +1,33 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: local-storage-admin +--- +apiVersion: rbac.authorization.k8s.io/v1beta1 +kind: ClusterRoleBinding +metadata: + name: local-storage-provisioner-pv-binding + namespace: default +subjects: +- kind: ServiceAccount + name: local-storage-admin + namespace: default +roleRef: + kind: ClusterRole + name: system:persistent-volume-monitor + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: rbac.authorization.k8s.io/v1beta1 +kind: ClusterRoleBinding +metadata: + name: local-storage-provisioner-node-binding + namespace: default +subjects: +- kind: ServiceAccount + name: local-storage-admin + namespace: default +roleRef: + kind: ClusterRole + name: system:node + apiGroup: rbac.authorization.k8s.io + diff --git a/local-volume/provisioner/deployment/kubernetes/monitor/monitor-config.yaml b/local-volume/provisioner/deployment/kubernetes/monitor/monitor-config.yaml new file mode 100644 index 00000000000..0f2b66905fc --- /dev/null +++ b/local-volume/provisioner/deployment/kubernetes/monitor/monitor-config.yaml @@ -0,0 +1,9 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: local-volume-config +data: + storageClassMap: | + local-storage: + hostDir: "/mnt/disks/vol" + mountDir: "/local-disks" diff --git a/local-volume/provisioner/deployment/kubernetes/monitor/monitor_cluster_role.yaml b/local-volume/provisioner/deployment/kubernetes/monitor/monitor_cluster_role.yaml new file mode 100644 index 00000000000..1877cc79e83 --- /dev/null +++ b/local-volume/provisioner/deployment/kubernetes/monitor/monitor_cluster_role.yaml @@ -0,0 +1,17 @@ +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1beta1 +metadata: + name: system:persistent-volume-monitor +rules: + - apiGroups: [""] + resources: ["persistentvolumes"] + verbs: ["get", "list", "watch", "create", "delete", "update"] + - apiGroups: [""] + resources: ["persistentvolumeclaims"] + verbs: ["get", "list", "watch", "update"] + - apiGroups: ["storage.k8s.io"] + resources: ["storageclasses"] + verbs: ["get", "list", "watch"] + - apiGroups: [""] + resources: ["events"] + verbs: ["watch", "create", "update", "patch"] diff --git a/local-volume/provisioner/deployment/kubernetes/monitor/provisioner-monitor-daemonset.yaml b/local-volume/provisioner/deployment/kubernetes/monitor/provisioner-monitor-daemonset.yaml new file mode 100644 index 00000000000..92e392fd757 --- /dev/null +++ b/local-volume/provisioner/deployment/kubernetes/monitor/provisioner-monitor-daemonset.yaml @@ -0,0 +1,42 @@ +apiVersion: extensions/v1beta1 +kind: DaemonSet +metadata: + name: local-volume-provisioner +spec: + template: + metadata: + labels: + app: local-volume-provisioner + spec: + containers: + - name: provisioner + image: "quay.io/external_storage/local-volume-provisioner:latest" + args: + - "-enable-monitor=true" + imagePullPolicy: Always + securityContext: + privileged: true + volumeMounts: + - name: discovery-vol + mountPath: "/local-disks" + - name: local-volume-config + mountPath: "/etc/provisioner/config/" + env: + - name: MY_NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + # If you want provisioner to use a kubeconfig file to access API server, instead of the default + # in-cluster config, then specify the following environment variable: + # - name: KUBECONFIG + # value: /path/to/kubeconfig + + volumes: + - name: discovery-vol + hostPath: + path: "/mnt/disks/vol" + - name: local-volume-config + configMap: + name: local-volume-config + serviceAccount: local-storage-admin + diff --git a/local-volume/provisioner/pkg/common/common.go b/local-volume/provisioner/pkg/common/common.go index 7c4f1613c8a..aad034bde99 100644 --- a/local-volume/provisioner/pkg/common/common.go +++ b/local-volume/provisioner/pkg/common/common.go @@ -67,6 +67,8 @@ const ( ProvisonerStorageClassConfig = "storageClassMap" // ProvisionerNodeLabelsForPV contains a list of node labels to be copied to the PVs created by the provisioner ProvisionerNodeLabelsForPV = "nodeLabelsForPV" + // MonitorLabelSelectorForPV is the label selector for monitor to filter PVs + MonitorLabelSelectorForPV = "labelSelectorForPV" // VolumeDelete copied from k8s.io/kubernetes/pkg/controller/volume/events VolumeDelete = "VolumeDelete" @@ -84,6 +86,8 @@ type UserConfig struct { DiscoveryMap map[string]MountConfig // Labels and their values that are added to PVs created by the provisioner NodeLabelsForPV []string + // LabelSelectorForPV is the label selector for monitor to filter PVs + LabelSelectorForPV string } // MountConfig stores a configuration for discoverying a specific storageclass @@ -144,6 +148,9 @@ type ProvisionerConfiguration struct { // NodeLabelsForPV contains a list of node labels to be copied to the PVs created by the provisioner // +optional NodeLabelsForPV []string `json:"nodeLabelsForPV" yaml:"nodeLabelsForPV"` + // LabelSelectorForPV is the label selector for monitor to filter PVs + // +optional + LabelSelectorForPV string `json:"labelSelectorForPV" yaml:"labelSelectorForPV"` } // CreateLocalPVSpec returns a PV spec that can be used for PV creation @@ -221,6 +228,9 @@ func VolumeConfigToConfigMapData(config *ProvisionerConfiguration) (map[string]s } configMapData[ProvisionerNodeLabelsForPV] = string(nodeLabels) } + if len(config.LabelSelectorForPV) > 0 { + configMapData[MonitorLabelSelectorForPV] = config.LabelSelectorForPV + } return configMapData, nil } diff --git a/local-volume/provisioner/pkg/controller/controller.go b/local-volume/provisioner/pkg/controller/controller.go index 86aa7076f72..d9707f42409 100644 --- a/local-volume/provisioner/pkg/controller/controller.go +++ b/local-volume/provisioner/pkg/controller/controller.go @@ -26,10 +26,12 @@ import ( "github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/common" "github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/deleter" "github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/discovery" + "github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/monitor" "github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/populator" "github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/util" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -38,7 +40,7 @@ import ( ) // StartLocalController starts the sync loop for the local PV discovery and deleter -func StartLocalController(client *kubernetes.Clientset, config *common.UserConfig) { +func StartLocalController(client *kubernetes.Clientset, config *common.UserConfig, enableMonitor, enableProvisioner bool) { glog.Info("Initializing volume cache\n") provisionerName := fmt.Sprintf("local-volume-provisioner-%v-%v", config.Node.Name, config.Node.UID) @@ -59,21 +61,30 @@ func StartLocalController(client *kubernetes.Clientset, config *common.UserConfi Mounter: mount.New("" /* default mount path */), } - populator := populator.NewPopulator(runtimeConfig) - populator.Start() - - ptable := deleter.NewProcTable() - discoverer, err := discovery.NewDiscoverer(runtimeConfig, ptable) - if err != nil { - glog.Fatalf("Error starting discoverer: %v", err) + if enableMonitor { + glog.Info("Monitor started\n") + monitor := monitor.NewMonitor(runtimeConfig) + go monitor.Run(wait.NeverStop) } - deleter := deleter.NewDeleter(runtimeConfig, ptable) + if enableProvisioner { + populator := populator.NewPopulator(runtimeConfig) + populator.Start() + + ptable := deleter.NewProcTable() + discoverer, err := discovery.NewDiscoverer(runtimeConfig, ptable) + if err != nil { + glog.Fatalf("Error starting discoverer: %v", err) + } - glog.Info("Controller started\n") - for { - deleter.DeletePVs() - discoverer.DiscoverLocalVolumes() - time.Sleep(10 * time.Second) + deleter := deleter.NewDeleter(runtimeConfig, ptable) + + glog.Info("Controller started\n") + for { + deleter.DeletePVs() + discoverer.DiscoverLocalVolumes() + time.Sleep(10 * time.Second) + } } + } diff --git a/local-volume/provisioner/pkg/discovery/discovery.go b/local-volume/provisioner/pkg/discovery/discovery.go index 4b70970827d..12251a3223c 100644 --- a/local-volume/provisioner/pkg/discovery/discovery.go +++ b/local-volume/provisioner/pkg/discovery/discovery.go @@ -23,8 +23,8 @@ import ( "github.com/golang/glog" "github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/common" + "github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/util" - esUtil "github.com/kubernetes-incubator/external-storage/lib/util" "github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/deleter" "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/api/v1/helper" @@ -210,7 +210,7 @@ func (d *Discoverer) createPV(file, class string, config common.MountConfig, cap pvSpec := common.CreateLocalPVSpec(&common.LocalPVConfig{ Name: pvName, HostPath: outsidePath, - Capacity: roundDownCapacityPretty(capacityByte), + Capacity: util.RoundDownCapacityPretty(capacityByte), StorageClass: class, ProvisionerName: d.Name, AffinityAnn: d.nodeAffinityAnn, @@ -224,20 +224,3 @@ func (d *Discoverer) createPV(file, class string, config common.MountConfig, cap } glog.Infof("Created PV %q for volume at %q", pvName, outsidePath) } - -// Round down the capacity to an easy to read value. -func roundDownCapacityPretty(capacityBytes int64) int64 { - - easyToReadUnitsBytes := []int64{esUtil.GiB, esUtil.MiB} - - // Round down to the nearest easy to read unit - // such that there are at least 10 units at that size. - for _, easyToReadUnitBytes := range easyToReadUnitsBytes { - // Round down the capacity to the nearest unit. - size := capacityBytes / easyToReadUnitBytes - if size >= 10 { - return size * easyToReadUnitBytes - } - } - return capacityBytes -} diff --git a/local-volume/provisioner/pkg/discovery/discovery_test.go b/local-volume/provisioner/pkg/discovery/discovery_test.go index 3c26fef9207..a5a3a84088c 100644 --- a/local-volume/provisioner/pkg/discovery/discovery_test.go +++ b/local-volume/provisioner/pkg/discovery/discovery_test.go @@ -408,7 +408,7 @@ func verifyCapacity(t *testing.T, createdPV *v1.PersistentVolume, expectedPV *te if !ok { t.Errorf("Unable to convert resource storage into int64") } - if roundDownCapacityPretty(capacityInt) != expectedPV.capacity { + if util.RoundDownCapacityPretty(capacityInt) != expectedPV.capacity { t.Errorf("Expected capacity %d, got %d", expectedPV.capacity, capacityInt) } } @@ -504,7 +504,7 @@ func TestRoundDownCapacityPretty(t *testing.T) { {3*esUtil.TiB + 2*esUtil.GiB + 1*esUtil.MiB, 3*esUtil.TiB + 2*esUtil.GiB}, } for _, tt := range capTests { - actual := roundDownCapacityPretty(tt.n) + actual := util.RoundDownCapacityPretty(tt.n) if actual != tt.expected { t.Errorf("roundDownCapacityPretty(%d): expected %d, actual %d", tt.n, tt.expected, actual) } diff --git a/local-volume/provisioner/pkg/monitor/localVolumeCache.go b/local-volume/provisioner/pkg/monitor/localVolumeCache.go new file mode 100644 index 00000000000..a089b2d9bb3 --- /dev/null +++ b/local-volume/provisioner/pkg/monitor/localVolumeCache.go @@ -0,0 +1,84 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package monitor + +import ( + "sync" + + "k8s.io/api/core/v1" +) + +// LocalVolumeMap is the interface to store local volumes +type LocalVolumeMap interface { + AddLocalVolume(pv *v1.PersistentVolume) + + UpdateLocalVolume(newPV *v1.PersistentVolume) + + DeleteLocalVolume(pv *v1.PersistentVolume) + + GetPVs() []*v1.PersistentVolume +} + +type localVolumeMap struct { + // for guarding access to pvs map + sync.RWMutex + + // local storage PV map of unique pv name and pv obj + volumeMap map[string]*v1.PersistentVolume +} + +// NewLocalVolumeMap returns new LocalVolumeMap which acts as a cache +// for holding local storage PVs. +func NewLocalVolumeMap() LocalVolumeMap { + localVolumeMap := &localVolumeMap{} + localVolumeMap.volumeMap = make(map[string]*v1.PersistentVolume) + return localVolumeMap +} + +// TODO: just add local storage PVs which belongs to the specific node +func (lvm *localVolumeMap) AddLocalVolume(pv *v1.PersistentVolume) { + lvm.Lock() + defer lvm.Unlock() + + lvm.volumeMap[pv.Name] = pv +} + +func (lvm *localVolumeMap) UpdateLocalVolume(newPV *v1.PersistentVolume) { + lvm.Lock() + defer lvm.Unlock() + + lvm.volumeMap[newPV.Name] = newPV +} + +func (lvm *localVolumeMap) DeleteLocalVolume(pv *v1.PersistentVolume) { + lvm.Lock() + defer lvm.Unlock() + + delete(lvm.volumeMap, pv.Name) +} + +func (lvm *localVolumeMap) GetPVs() []*v1.PersistentVolume { + lvm.Lock() + defer lvm.Unlock() + + pvs := []*v1.PersistentVolume{} + for _, pv := range lvm.volumeMap { + pvs = append(pvs, pv) + } + + return pvs +} diff --git a/local-volume/provisioner/pkg/monitor/monitor.go b/local-volume/provisioner/pkg/monitor/monitor.go new file mode 100644 index 00000000000..b87ae69466c --- /dev/null +++ b/local-volume/provisioner/pkg/monitor/monitor.go @@ -0,0 +1,479 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package monitor + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/golang/glog" + "github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/common" + "github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/util" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/api/v1/helper" +) + +const ( + // DefaultInformerResyncPeriod is the resync period of informer + DefaultInformerResyncPeriod = 15 * time.Second + + // DefaultMonitorResyncPeriod is the resync period of monitor + DefaultMonitorResyncPeriod = 1 * time.Minute + + // UpdatePVRetryCount is the retry count of PV updating + UpdatePVRetryCount = 5 + + // UpdatePVInterval is the interval of PV updating + UpdatePVInterval = 5 * time.Millisecond +) + +// marking event related const vars +const ( + MarkPVFailed = "MarkPVFailed" + UnMarkPVFailed = "UnMarkPVFailed" + MarkPVSucceeded = "MarkPVSucceeded" + UnMarkPVSucceeded = "UnMarkPVSucceeded" + + HostPathNotExist = "HostPathNotExist" + MisMatchedVolSize = "MisMatchedVolSize" + NotMountPoint = "NotMountPoint" + + FirstMarkTime = "FirstMarkTime" +) + +// PVUnhealthyKeys stores all the unhealthy marking keys +var PVUnhealthyKeys []string + +func init() { + PVUnhealthyKeys = append(PVUnhealthyKeys, HostPathNotExist) + PVUnhealthyKeys = append(PVUnhealthyKeys, MisMatchedVolSize) + PVUnhealthyKeys = append(PVUnhealthyKeys, NotMountPoint) +} + +// Monitor checks PVs' health condition and taint them if they are unhealthy +type Monitor struct { + *common.RuntimeConfig + + volumeLW cache.ListerWatcher + volumeController cache.Controller + + localVolumeMap LocalVolumeMap + + hasRun bool + hasRunLock *sync.Mutex +} + +// NewMonitor creates a monitor object that will scan through +// the configured directories and check volume status +func NewMonitor(config *common.RuntimeConfig) *Monitor { + monitor := &Monitor{ + RuntimeConfig: config, + hasRun: false, + hasRunLock: &sync.Mutex{}, + } + + labelOps := metav1.ListOptions{ + LabelSelector: labels.Everything().String(), + } + if len(monitor.UserConfig.LabelSelectorForPV) > 0 { + labelOps.LabelSelector = monitor.UserConfig.LabelSelectorForPV + } + + monitor.localVolumeMap = NewLocalVolumeMap() + + monitor.volumeLW = &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return config.Client.CoreV1().PersistentVolumes().List(labelOps) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return config.Client.CoreV1().PersistentVolumes().Watch(labelOps) + }, + } + _, monitor.volumeController = cache.NewInformer( + monitor.volumeLW, + &v1.PersistentVolume{}, + DefaultInformerResyncPeriod, + cache.ResourceEventHandlerFuncs{ + AddFunc: monitor.addVolume, + UpdateFunc: monitor.updateVolume, + DeleteFunc: monitor.deleteVolume, + }, + ) + + // fill map at first with data from ETCD + monitor.flushFromETCDFirst() + + return monitor +} + +// flushFromETCDFirst fill map with data from etcd at first +func (monitor *Monitor) flushFromETCDFirst() error { + pvs, err := monitor.Client.CoreV1().PersistentVolumes().List(metav1.ListOptions{}) + if err != nil { + return err + } + if len(pvs.Items) == 0 { + glog.Infof("no pv in ETCD at first") + return nil + } + + for _, pv := range pvs.Items { + monitor.localVolumeMap.AddLocalVolume(&pv) + } + return nil +} + +func (monitor *Monitor) addVolume(obj interface{}) { + volume, ok := obj.(*v1.PersistentVolume) + if !ok { + glog.Errorf("Expected PersistentVolume but handler received %#v", obj) + return + } + + monitor.localVolumeMap.AddLocalVolume(volume) + +} + +func (monitor *Monitor) updateVolume(oldObj, newObj interface{}) { + newVolume, ok := newObj.(*v1.PersistentVolume) + if !ok { + glog.Errorf("Expected PersistentVolume but handler received %#v", newObj) + return + } + + monitor.localVolumeMap.UpdateLocalVolume(newVolume) +} + +func (monitor *Monitor) deleteVolume(obj interface{}) { + volume, ok := obj.(*v1.PersistentVolume) + if !ok { + glog.Errorf("Expected PersistentVolume but handler received %#v", obj) + return + } + + monitor.localVolumeMap.DeleteLocalVolume(volume) + +} + +// Run starts all of this controller's control loops +func (monitor *Monitor) Run(stopCh <-chan struct{}) { + glog.Infof("Starting monitor controller %s!", string(monitor.RuntimeConfig.Name)) + monitor.hasRunLock.Lock() + monitor.hasRun = true + monitor.hasRunLock.Unlock() + go monitor.volumeController.Run(stopCh) + + go monitor.MonitorLocalVolumes() + <-stopCh +} + +// HasRun returns whether the volume controller has Run +func (monitor *Monitor) HasRun() bool { + monitor.hasRunLock.Lock() + defer monitor.hasRunLock.Unlock() + return monitor.hasRun +} + +// MonitorLocalVolumes checks local PVs periodically +func (monitor *Monitor) MonitorLocalVolumes() { + for { + if monitor.HasRun() { + pvs := monitor.localVolumeMap.GetPVs() + for _, pv := range pvs { + monitor.checkStatus(pv) + } + } + + time.Sleep(DefaultMonitorResyncPeriod) + } +} + +// checkStatus checks pv health condition +func (monitor *Monitor) checkStatus(pv *v1.PersistentVolume) { + // check if PV is local storage + if pv.Spec.Local == nil { + glog.Infof("PV: %s is not local storage", pv.Name) + return + } + // check node and pv affinity + fit, err := CheckNodeAffinity(pv, monitor.Node.Labels) + if err != nil { + glog.Errorf("check node affinity error: %v", err) + return + } + if !fit { + glog.Errorf("pv: %s does not belong to this node: %s", pv.Name, monitor.Node.Name) + return + } + + // check if host dir still exists + mountPath, continueThisCheck := monitor.checkHostDir(pv) + if !continueThisCheck { + glog.Errorf("Host dir is modified, PV should be marked") + return + } + + // check if it is still a mount point + continueThisCheck = monitor.checkMountPoint(mountPath, pv) + if !continueThisCheck { + glog.Errorf("Retrieving mount points error or %s is not a mount point any more", mountPath) + return + } + + // check PV size: PV capacity must not be greater than device capacity and PV used bytes must not be greater that PV capacity + dir, _ := monitor.VolUtil.IsDir(mountPath) + if dir { + monitor.checkPVAndFSSize(mountPath, pv) + } + bl, _ := monitor.VolUtil.IsBlock(mountPath) + if bl { + monitor.checkPVAndBlockSize(mountPath, pv) + } +} + +func (monitor *Monitor) checkMountPoint(mountPath string, pv *v1.PersistentVolume) bool { + // Retrieve list of mount points to iterate through discovered paths (aka files) below + mountPoints, mountPointsErr := monitor.RuntimeConfig.Mounter.List() + if mountPointsErr != nil { + glog.Errorf("Error retrieving mount points: %v", mountPointsErr) + return false + } + // Check if mountPath is still a mount point + for _, mp := range mountPoints { + if mp.Path == mountPath { + glog.V(10).Infof("mountPath is still a mount point: %s", mountPath) + err := monitor.markOrUnmarkPV(pv, NotMountPoint, "yes", false) + if err != nil { + glog.Errorf("mark PV: %s failed, err: %v", pv.Name, err) + } + return true + } + } + + glog.V(6).Infof("mountPath is not a mount point any more: %s", mountPath) + err := monitor.markOrUnmarkPV(pv, NotMountPoint, "yes", true) + if err != nil { + glog.Errorf("mark PV: %s failed, err: %v", pv.Name, err) + } + return false + +} + +func (monitor *Monitor) checkHostDir(pv *v1.PersistentVolume) (mountPath string, continueThisCheck bool) { + var err error + for _, config := range monitor.DiscoveryMap { + if strings.Contains(pv.Spec.Local.Path, config.HostDir) { + mountPath, err = common.GetContainerPath(pv, config) + if err != nil { + glog.Errorf("get container path error: %v", err) + } + break + } + } + if len(mountPath) == 0 { + // can not find mount path, this may because: admin modify config(hostpath) + // mark PV and send a event + err = monitor.markOrUnmarkPV(pv, HostPathNotExist, "yes", true) + if err != nil { + glog.Errorf("mark PV: %s failed, err: %v", pv.Name, err) + } + return + } + dir, dirErr := monitor.VolUtil.IsDir(mountPath) + bl, blErr := monitor.VolUtil.IsBlock(mountPath) + if !dir && !bl && (dirErr != nil || blErr != nil) { + // mountPath does not exist or is not a directory + // mark PV and send a event + err = monitor.markOrUnmarkPV(pv, HostPathNotExist, "yes", true) + if err != nil { + glog.Errorf("mark PV: %s failed, err: %v", pv.Name, err) + } + return + } + continueThisCheck = true + // unmark PV if it was marked before + err = monitor.markOrUnmarkPV(pv, HostPathNotExist, "yes", false) + if err != nil { + glog.Errorf("mark PV: %s failed, err: %v", pv.Name, err) + } + return + +} + +func (monitor *Monitor) checkPVAndFSSize(mountPath string, pv *v1.PersistentVolume) { + capacityByte, err := monitor.VolUtil.GetFsCapacityByte(mountPath) + if err != nil { + glog.Errorf("Path %q fs stats error: %v", mountPath, err) + return + } + // actually if PV is provisioned by provisioner, the two values must be equal, but the PV may be + // created manually, so the PV capacity must not be greater than FS capacity + storage := pv.Spec.Capacity[v1.ResourceStorage] + if util.RoundDownCapacityPretty(capacityByte) < storage.Value() { + // mark PV and send a event + err = monitor.markOrUnmarkPV(pv, MisMatchedVolSize, "yes", true) + if err != nil { + glog.Errorf("mark PV: %s failed, err: %v", pv.Name, err) + } + return + } + // TODO: make sure that PV used bytes is not greater that PV capacity ? + + // unmark PV if it was marked before + err = monitor.markOrUnmarkPV(pv, MisMatchedVolSize, "yes", false) + if err != nil { + glog.Errorf("mark PV: %s failed, err: %v", pv.Name, err) + } + return + +} + +func (monitor *Monitor) checkPVAndBlockSize(mountPath string, pv *v1.PersistentVolume) { + capacityByte, err := monitor.VolUtil.GetBlockCapacityByte(mountPath) + if err != nil { + glog.Errorf("Path %q block stats error: %v", mountPath, err) + return + } + // actually if PV is provisioned by provisioner, the two values must be equal, but the PV may be + // created manually, so the PV capacity must not be greater than block device capacity + storage := pv.Spec.Capacity[v1.ResourceStorage] + if util.RoundDownCapacityPretty(capacityByte) < storage.Value() { + // mark PV and send a event + err = monitor.markOrUnmarkPV(pv, MisMatchedVolSize, "yes", true) + if err != nil { + glog.Errorf("mark PV: %s failed, err: %v", pv.Name, err) + } + return + } + // TODO: make sure that PV used bytes is not greater that PV capacity ? + + // unmark PV if it was marked before + err = monitor.markOrUnmarkPV(pv, MisMatchedVolSize, "yes", false) + if err != nil { + glog.Errorf("mark PV: %s failed, err: %v", pv.Name, err) + } + return +} + +// CheckNodeAffinity looks at the PV node affinity, and checks if the node has the same corresponding labels +// This ensures that we don't mount a volume that doesn't belong to this node +func CheckNodeAffinity(pv *v1.PersistentVolume, nodeLabels map[string]string) (bool, error) { + affinity, err := helper.GetStorageNodeAffinityFromAnnotation(pv.Annotations) + if err != nil { + return false, fmt.Errorf("error getting storage node affinity: %v", err) + } + if affinity == nil { + return false, nil + } + + if affinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { + terms := affinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms + glog.V(10).Infof("Match for RequiredDuringSchedulingIgnoredDuringExecution node selector terms %+v", terms) + for _, term := range terms { + selector, err := helper.NodeSelectorRequirementsAsSelector(term.MatchExpressions) + if err != nil { + return false, fmt.Errorf("failed to parse MatchExpressions: %v", err) + } + if !selector.Matches(labels.Set(nodeLabels)) { + return false, fmt.Errorf("NodeSelectorTerm %+v does not match node labels", term.MatchExpressions) + } + } + } + return true, nil +} + +// markPV marks PV by adding annotation +func (monitor *Monitor) markOrUnmarkPV(pv *v1.PersistentVolume, ann, value string, mark bool) error { + // The volume from method args can be pointing to watcher cache. We must not + // modify these, therefore create a copy. + volumeClone := pv.DeepCopy() + var eventMes string + + if mark { + // mark PV + _, ok := volumeClone.ObjectMeta.Annotations[ann] + if ok { + glog.V(10).Infof("PV: %s is already marked with ann: %s", volumeClone.Name, ann) + return nil + } + metav1.SetMetaDataAnnotation(&volumeClone.ObjectMeta, ann, value) + _, ok = volumeClone.ObjectMeta.Annotations[FirstMarkTime] + if !ok { + firstMarkTime := time.Now() + metav1.SetMetaDataAnnotation(&volumeClone.ObjectMeta, FirstMarkTime, firstMarkTime.String()) + } + } else { + // unmark PV + _, ok := volumeClone.ObjectMeta.Annotations[ann] + if !ok { + glog.V(10).Infof("PV: %s is not marked with ann: %s", volumeClone.Name, ann) + return nil + } + delete(volumeClone.ObjectMeta.Annotations, ann) + var hasOtherMarkKeys bool + for _, key := range PVUnhealthyKeys { + if _, ok = volumeClone.ObjectMeta.Annotations[key]; ok { + hasOtherMarkKeys = true + break + } + } + if !hasOtherMarkKeys { + delete(volumeClone.ObjectMeta.Annotations, FirstMarkTime) + } + + } + + var err error + var newVol *v1.PersistentVolume + // Try to update the PV object several times + for i := 0; i < UpdatePVRetryCount; i++ { + glog.V(4).Infof("try to update PV: %s", pv.Name) + newVol, err = monitor.APIUtil.UpdatePV(volumeClone) + if err != nil { + glog.V(4).Infof("updating PersistentVolume[%s] failed: %v", volumeClone.Name, err) + continue + } + monitor.localVolumeMap.UpdateLocalVolume(newVol) + glog.V(4).Infof("updating PersistentVolume[%s] successfully", newVol.Name) + if mark { + eventMes = "Mark PV successfully with annotation key: " + ann + monitor.Recorder.Event(pv, v1.EventTypeNormal, MarkPVSucceeded, eventMes) + } else { + eventMes = "UnMark PV successfully, removed annotation key: " + ann + monitor.Recorder.Event(pv, v1.EventTypeNormal, UnMarkPVSucceeded, "UnMark PV successfully") + } + time.Sleep(UpdatePVInterval) + return nil + } + + if mark { + eventMes = "Failed to Mark PV with annotation key: " + ann + monitor.Recorder.Event(pv, v1.EventTypeWarning, MarkPVFailed, "Failed to Mark PV") + } else { + eventMes = "Failed to UnMark PV, attempt to remove annotation key: " + ann + monitor.Recorder.Event(pv, v1.EventTypeWarning, UnMarkPVFailed, "Failed to UnMark PV") + } + return err +} diff --git a/local-volume/provisioner/pkg/util/api_util.go b/local-volume/provisioner/pkg/util/api_util.go index ac47811b47c..966df590d5d 100644 --- a/local-volume/provisioner/pkg/util/api_util.go +++ b/local-volume/provisioner/pkg/util/api_util.go @@ -31,6 +31,9 @@ type APIUtil interface { // Create PersistentVolume object CreatePV(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) + // UpdatePV updates PersistentVolume object + UpdatePV(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) + // Delete PersistentVolume object DeletePV(pvName string) error } @@ -51,6 +54,11 @@ func (u *apiUtil) CreatePV(pv *v1.PersistentVolume) (*v1.PersistentVolume, error return u.client.Core().PersistentVolumes().Create(pv) } +// UpdatePV will update a PersistentVolume +func (u *apiUtil) UpdatePV(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) { + return u.client.Core().PersistentVolumes().Update(pv) +} + // DeletePV will delete a PersistentVolume func (u *apiUtil) DeletePV(pvName string) error { return u.client.Core().PersistentVolumes().Delete(pvName, &metav1.DeleteOptions{}) @@ -87,6 +95,21 @@ func (u *FakeAPIUtil) CreatePV(pv *v1.PersistentVolume) (*v1.PersistentVolume, e return pv, nil } +// UpdatePV will update the PV from the created list and cache +func (u *FakeAPIUtil) UpdatePV(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) { + if u.shouldFail { + return nil, fmt.Errorf("API failed") + } + + pv, exists := u.cache.GetPV(pv.Name) + if !exists { + u.createdPVs[pv.Name] = pv + } + + u.cache.UpdatePV(pv) + return pv, nil +} + // DeletePV will delete the PV from the created list and cache, and also add it to the deleted list func (u *FakeAPIUtil) DeletePV(pvName string) error { if u.shouldFail { diff --git a/local-volume/provisioner/pkg/util/util.go b/local-volume/provisioner/pkg/util/util.go new file mode 100644 index 00000000000..57c4e752976 --- /dev/null +++ b/local-volume/provisioner/pkg/util/util.go @@ -0,0 +1,35 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import esUtil "github.com/kubernetes-incubator/external-storage/lib/util" + +// RoundDownCapacityPretty rounds down the capacity to an easy to read value. +func RoundDownCapacityPretty(capacityBytes int64) int64 { + easyToReadUnitsBytes := []int64{esUtil.GiB, esUtil.MiB} + + // Round down to the nearest easy to read unit + // such that there are at least 10 units at that size. + for _, easyToReadUnitBytes := range easyToReadUnitsBytes { + // Round down the capacity to the nearest unit. + size := capacityBytes / easyToReadUnitBytes + if size >= 10 { + return size * easyToReadUnitBytes + } + } + return capacityBytes +}