Skip to content
This repository has been archived by the owner on Oct 21, 2020. It is now read-only.

[WIP]: Monitor local pv #528

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions local-volume/provisioner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,22 @@ import (
"k8s.io/client-go/kubernetes"
)

const (
defaultEnableMonitor = false
defaultEnableProvisioner = true
)

var (
enableMonitor *bool
enableProvisioner *bool
)

var provisionerConfig common.ProvisionerConfiguration

func init() {
enableMonitor = flag.Bool("enable-monitor", defaultEnableMonitor, "If the monitor is enabled")
enableProvisioner = flag.Bool("enable-provisioner", defaultEnableProvisioner, "If the provisioner is enabled")

provisionerConfig = common.ProvisionerConfiguration{
StorageClassConfig: make(map[string]common.MountConfig),
}
Expand All @@ -55,10 +68,13 @@ func main() {

glog.Info("Starting controller\n")
controller.StartLocalController(client, &common.UserConfig{
Node: node,
DiscoveryMap: provisionerConfig.StorageClassConfig,
NodeLabelsForPV: provisionerConfig.NodeLabelsForPV,
})
Node: node,
DiscoveryMap: provisionerConfig.StorageClassConfig,
NodeLabelsForPV: provisionerConfig.NodeLabelsForPV,
LabelSelectorForPV: provisionerConfig.LabelSelectorForPV,
},
*enableMonitor,
*enableProvisioner)
}

func getNode(client *kubernetes.Clientset, name string) *v1.Node {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: local-storage-admin
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: local-storage-provisioner-pv-binding
namespace: default
subjects:
- kind: ServiceAccount
name: local-storage-admin
namespace: default
roleRef:
kind: ClusterRole
name: system:persistent-volume-monitor
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: local-storage-provisioner-node-binding
namespace: default
subjects:
- kind: ServiceAccount
name: local-storage-admin
namespace: default
roleRef:
kind: ClusterRole
name: system:node
apiGroup: rbac.authorization.k8s.io

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: local-volume-config
data:
storageClassMap: |
local-storage:
hostDir: "/mnt/disks/vol"
mountDir: "/local-disks"
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
name: system:persistent-volume-monitor
rules:
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["get", "list", "watch", "create", "delete", "update"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"]
verbs: ["get", "list", "watch", "update"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["events"]
verbs: ["watch", "create", "update", "patch"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
name: local-volume-provisioner
spec:
template:
metadata:
labels:
app: local-volume-provisioner
spec:
containers:
- name: provisioner
image: "quay.io/external_storage/local-volume-provisioner:latest"
args:
- "-enable-monitor=true"
imagePullPolicy: Always
securityContext:
privileged: true
volumeMounts:
- name: discovery-vol
mountPath: "/local-disks"
- name: local-volume-config
mountPath: "/etc/provisioner/config/"
env:
- name: MY_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
# If you want provisioner to use a kubeconfig file to access API server, instead of the default
# in-cluster config, then specify the following environment variable:
# - name: KUBECONFIG
# value: /path/to/kubeconfig

volumes:
- name: discovery-vol
hostPath:
path: "/mnt/disks/vol"
- name: local-volume-config
configMap:
name: local-volume-config
serviceAccount: local-storage-admin

10 changes: 10 additions & 0 deletions local-volume/provisioner/pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ const (
ProvisonerStorageClassConfig = "storageClassMap"
// ProvisionerNodeLabelsForPV contains a list of node labels to be copied to the PVs created by the provisioner
ProvisionerNodeLabelsForPV = "nodeLabelsForPV"
// MonitorLabelSelectorForPV is the label selector for monitor to filter PVs
MonitorLabelSelectorForPV = "labelSelectorForPV"
// VolumeDelete copied from k8s.io/kubernetes/pkg/controller/volume/events
VolumeDelete = "VolumeDelete"

Expand All @@ -84,6 +86,8 @@ type UserConfig struct {
DiscoveryMap map[string]MountConfig
// Labels and their values that are added to PVs created by the provisioner
NodeLabelsForPV []string
// LabelSelectorForPV is the label selector for monitor to filter PVs
LabelSelectorForPV string
}

// MountConfig stores a configuration for discoverying a specific storageclass
Expand Down Expand Up @@ -144,6 +148,9 @@ type ProvisionerConfiguration struct {
// NodeLabelsForPV contains a list of node labels to be copied to the PVs created by the provisioner
// +optional
NodeLabelsForPV []string `json:"nodeLabelsForPV" yaml:"nodeLabelsForPV"`
// LabelSelectorForPV is the label selector for monitor to filter PVs
// +optional
LabelSelectorForPV string `json:"labelSelectorForPV" yaml:"labelSelectorForPV"`
}

// CreateLocalPVSpec returns a PV spec that can be used for PV creation
Expand Down Expand Up @@ -221,6 +228,9 @@ func VolumeConfigToConfigMapData(config *ProvisionerConfiguration) (map[string]s
}
configMapData[ProvisionerNodeLabelsForPV] = string(nodeLabels)
}
if len(config.LabelSelectorForPV) > 0 {
configMapData[MonitorLabelSelectorForPV] = config.LabelSelectorForPV
}

return configMapData, nil
}
Expand Down
39 changes: 25 additions & 14 deletions local-volume/provisioner/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/common"
"github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/deleter"
"github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/discovery"
"github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/monitor"
"github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/populator"
"github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/util"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -38,7 +40,7 @@ import (
)

// StartLocalController starts the sync loop for the local PV discovery and deleter
func StartLocalController(client *kubernetes.Clientset, config *common.UserConfig) {
func StartLocalController(client *kubernetes.Clientset, config *common.UserConfig, enableMonitor, enableProvisioner bool) {
glog.Info("Initializing volume cache\n")

provisionerName := fmt.Sprintf("local-volume-provisioner-%v-%v", config.Node.Name, config.Node.UID)
Expand All @@ -59,21 +61,30 @@ func StartLocalController(client *kubernetes.Clientset, config *common.UserConfi
Mounter: mount.New("" /* default mount path */),
}

populator := populator.NewPopulator(runtimeConfig)
populator.Start()

ptable := deleter.NewProcTable()
discoverer, err := discovery.NewDiscoverer(runtimeConfig, ptable)
if err != nil {
glog.Fatalf("Error starting discoverer: %v", err)
if enableMonitor {
glog.Info("Monitor started\n")
monitor := monitor.NewMonitor(runtimeConfig)
go monitor.Run(wait.NeverStop)
}

deleter := deleter.NewDeleter(runtimeConfig, ptable)
if enableProvisioner {
populator := populator.NewPopulator(runtimeConfig)
populator.Start()

ptable := deleter.NewProcTable()
discoverer, err := discovery.NewDiscoverer(runtimeConfig, ptable)
if err != nil {
glog.Fatalf("Error starting discoverer: %v", err)
}

glog.Info("Controller started\n")
for {
deleter.DeletePVs()
discoverer.DiscoverLocalVolumes()
time.Sleep(10 * time.Second)
deleter := deleter.NewDeleter(runtimeConfig, ptable)

glog.Info("Controller started\n")
for {
deleter.DeletePVs()
discoverer.DiscoverLocalVolumes()
time.Sleep(10 * time.Second)
}
}

}
21 changes: 2 additions & 19 deletions local-volume/provisioner/pkg/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (

"github.com/golang/glog"
"github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/common"
"github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/util"

esUtil "github.com/kubernetes-incubator/external-storage/lib/util"
"github.com/kubernetes-incubator/external-storage/local-volume/provisioner/pkg/deleter"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/api/v1/helper"
Expand Down Expand Up @@ -210,7 +210,7 @@ func (d *Discoverer) createPV(file, class string, config common.MountConfig, cap
pvSpec := common.CreateLocalPVSpec(&common.LocalPVConfig{
Name: pvName,
HostPath: outsidePath,
Capacity: roundDownCapacityPretty(capacityByte),
Capacity: util.RoundDownCapacityPretty(capacityByte),
StorageClass: class,
ProvisionerName: d.Name,
AffinityAnn: d.nodeAffinityAnn,
Expand All @@ -224,20 +224,3 @@ func (d *Discoverer) createPV(file, class string, config common.MountConfig, cap
}
glog.Infof("Created PV %q for volume at %q", pvName, outsidePath)
}

// Round down the capacity to an easy to read value.
func roundDownCapacityPretty(capacityBytes int64) int64 {

easyToReadUnitsBytes := []int64{esUtil.GiB, esUtil.MiB}

// Round down to the nearest easy to read unit
// such that there are at least 10 units at that size.
for _, easyToReadUnitBytes := range easyToReadUnitsBytes {
// Round down the capacity to the nearest unit.
size := capacityBytes / easyToReadUnitBytes
if size >= 10 {
return size * easyToReadUnitBytes
}
}
return capacityBytes
}
4 changes: 2 additions & 2 deletions local-volume/provisioner/pkg/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func verifyCapacity(t *testing.T, createdPV *v1.PersistentVolume, expectedPV *te
if !ok {
t.Errorf("Unable to convert resource storage into int64")
}
if roundDownCapacityPretty(capacityInt) != expectedPV.capacity {
if util.RoundDownCapacityPretty(capacityInt) != expectedPV.capacity {
t.Errorf("Expected capacity %d, got %d", expectedPV.capacity, capacityInt)
}
}
Expand Down Expand Up @@ -504,7 +504,7 @@ func TestRoundDownCapacityPretty(t *testing.T) {
{3*esUtil.TiB + 2*esUtil.GiB + 1*esUtil.MiB, 3*esUtil.TiB + 2*esUtil.GiB},
}
for _, tt := range capTests {
actual := roundDownCapacityPretty(tt.n)
actual := util.RoundDownCapacityPretty(tt.n)
if actual != tt.expected {
t.Errorf("roundDownCapacityPretty(%d): expected %d, actual %d", tt.n, tt.expected, actual)
}
Expand Down
84 changes: 84 additions & 0 deletions local-volume/provisioner/pkg/monitor/localVolumeCache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package monitor

import (
"sync"

"k8s.io/api/core/v1"
)

// LocalVolumeMap is the interface to store local volumes
type LocalVolumeMap interface {
AddLocalVolume(pv *v1.PersistentVolume)

UpdateLocalVolume(newPV *v1.PersistentVolume)

DeleteLocalVolume(pv *v1.PersistentVolume)

GetPVs() []*v1.PersistentVolume
}

type localVolumeMap struct {
// for guarding access to pvs map
sync.RWMutex

// local storage PV map of unique pv name and pv obj
volumeMap map[string]*v1.PersistentVolume
}

// NewLocalVolumeMap returns new LocalVolumeMap which acts as a cache
// for holding local storage PVs.
func NewLocalVolumeMap() LocalVolumeMap {
localVolumeMap := &localVolumeMap{}
localVolumeMap.volumeMap = make(map[string]*v1.PersistentVolume)
return localVolumeMap
}

// TODO: just add local storage PVs which belongs to the specific node
func (lvm *localVolumeMap) AddLocalVolume(pv *v1.PersistentVolume) {
lvm.Lock()
defer lvm.Unlock()

lvm.volumeMap[pv.Name] = pv
}

func (lvm *localVolumeMap) UpdateLocalVolume(newPV *v1.PersistentVolume) {
lvm.Lock()
defer lvm.Unlock()

lvm.volumeMap[newPV.Name] = newPV
}

func (lvm *localVolumeMap) DeleteLocalVolume(pv *v1.PersistentVolume) {
lvm.Lock()
defer lvm.Unlock()

delete(lvm.volumeMap, pv.Name)
}

func (lvm *localVolumeMap) GetPVs() []*v1.PersistentVolume {
lvm.Lock()
defer lvm.Unlock()

pvs := []*v1.PersistentVolume{}
for _, pv := range lvm.volumeMap {
pvs = append(pvs, pv)
}

return pvs
}
Loading