From 15303a0ed799c4cc3f18ea26c2613465b95010a2 Mon Sep 17 00:00:00 2001 From: Divyen Patel Date: Tue, 27 Aug 2019 13:22:08 -0700 Subject: [PATCH] common library for CNS CSI driver --- .gitignore | 3 + go.mod | 16 +- go.sum | 21 + pkg/common/cns-lib/node/cache.go | 190 ++++++++ pkg/common/cns-lib/node/manager.go | 252 ++++++++++ pkg/common/cns-lib/volume/manager.go | 432 ++++++++++++++++++ pkg/common/cns-lib/volume/util.go | 67 +++ pkg/common/cns-lib/vsphere/cns.go | 60 +++ pkg/common/cns-lib/vsphere/datacenter.go | 226 +++++++++ pkg/common/cns-lib/vsphere/datastore.go | 58 +++ pkg/common/cns-lib/vsphere/hostsystem.go | 64 +++ pkg/common/cns-lib/vsphere/pbm.go | 60 +++ pkg/common/cns-lib/vsphere/utils.go | 163 +++++++ pkg/common/cns-lib/vsphere/virtualcenter.go | 353 ++++++++++++++ .../cns-lib/vsphere/virtualcentermanager.go | 141 ++++++ pkg/common/cns-lib/vsphere/virtualmachine.go | 353 ++++++++++++++ pkg/common/config/config.go | 285 ++++++++++++ pkg/common/config/types.go | 61 +++ .../common/common_controller_helper.go | 135 ++++++ pkg/csi/service/common/constants.go | 72 +++ pkg/csi/service/common/types.go | 53 +++ pkg/csi/service/common/util.go | 95 ++++ pkg/csi/service/common/vsphereutil.go | 172 +++++++ pkg/kubernetes/informers.go | 108 +++++ pkg/kubernetes/kubernetes.go | 71 +++ pkg/kubernetes/types.go | 46 ++ 26 files changed, 3552 insertions(+), 5 deletions(-) create mode 100644 pkg/common/cns-lib/node/cache.go create mode 100644 pkg/common/cns-lib/node/manager.go create mode 100644 pkg/common/cns-lib/volume/manager.go create mode 100644 pkg/common/cns-lib/volume/util.go create mode 100644 pkg/common/cns-lib/vsphere/cns.go create mode 100644 pkg/common/cns-lib/vsphere/datacenter.go create mode 100644 pkg/common/cns-lib/vsphere/datastore.go create mode 100644 pkg/common/cns-lib/vsphere/hostsystem.go create mode 100644 pkg/common/cns-lib/vsphere/pbm.go create mode 100644 pkg/common/cns-lib/vsphere/utils.go create mode 100644 pkg/common/cns-lib/vsphere/virtualcenter.go create mode 100644 pkg/common/cns-lib/vsphere/virtualcentermanager.go create mode 100644 pkg/common/cns-lib/vsphere/virtualmachine.go create mode 100644 pkg/common/config/config.go create mode 100644 pkg/common/config/types.go create mode 100644 pkg/csi/service/common/common_controller_helper.go create mode 100644 pkg/csi/service/common/constants.go create mode 100644 pkg/csi/service/common/types.go create mode 100644 pkg/csi/service/common/util.go create mode 100644 pkg/csi/service/common/vsphereutil.go create mode 100644 pkg/kubernetes/informers.go create mode 100644 pkg/kubernetes/kubernetes.go create mode 100644 pkg/kubernetes/types.go diff --git a/.gitignore b/.gitignore index 66a94f3e39..e1e7c0befa 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,9 @@ # Ignore the build output. /.build +# Ingore project specific setting files created by Intellij IDE +/.idea + # Ignore the environment variable and Prow configuration files at # the root of the project. /config.env diff --git a/go.mod b/go.mod index 1ce556448f..83249de22c 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/akutz/gosync v0.1.0 // indirect github.com/akutz/memconn v0.1.0 github.com/container-storage-interface/spec v1.0.0 + github.com/davecgh/go-spew v1.1.1 github.com/google/btree v1.0.0 // indirect github.com/google/uuid v1.1.1 // indirect github.com/gregjones/httpcache v0.0.0-20190212212710-3befbb6ad0cc // indirect @@ -17,16 +18,21 @@ require ( github.com/sirupsen/logrus v1.4.1 github.com/thecodeteam/gofsutil v0.1.2 // indirect github.com/thecodeteam/gosync v0.1.0 // indirect - github.com/vmware/govmomi v0.20.0 - golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect - golang.org/x/net v0.0.0-20190628185345-da137c7871d7 - golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect - golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 // indirect + github.com/vmware/govmomi v0.21.1-0.20190821201433-8bdc2d6fc858 + golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586 // indirect + golang.org/x/lint v0.0.0-20190409202823-959b441ac422 // indirect + golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297 + golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 // indirect golang.org/x/text v0.3.2 // indirect golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect + golang.org/x/tools v0.0.0-20190827205025-b29f5f60c37a // indirect google.golang.org/grpc v1.19.0 + gopkg.in/gcfg.v1 v1.2.3 + k8s.io/apimachinery v0.0.0-20180621070125-103fd098999d + k8s.io/client-go v8.0.0+incompatible k8s.io/cloud-provider-vsphere v0.2.1 k8s.io/klog v0.2.0 k8s.io/kubernetes v1.11.2 + k8s.io/sample-controller v0.0.0-20180822125000-be98dc6210ab k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7 // indirect ) diff --git a/go.sum b/go.sum index 16518e655e..5dc1cc8f45 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,7 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-xdr v0.0.0-20161123171359-e6a2ba005892/go.mod h1:CTDl0pzVzE5DEzZhPfvhY/9sPFMQIxaJ9VAMs9AagrE= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/docker/distribution v0.0.0-20180820212402-02bf4a2887a4 h1:8UiRPZ+dol3ndYHduUTj2JM4gG4wcsHW6kZoSAJowBc= @@ -60,6 +61,7 @@ github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeqr2z77+8R2RKyh8PG66dcu1V0ck= github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= github.com/google/uuid v0.0.0-20161128191214-064e2069ce9c/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v0.0.0-20170306145142-6a5e28554805/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gnostic v0.2.0 h1:l6N3VoaVzTncYYW+9yOz2LJJammFZGBO13sqgEhpy9g= @@ -149,6 +151,9 @@ github.com/ugorji/go v1.1.1 h1:gmervu+jDMvXTbcHQ0pd2wee85nEoE0BsVyEuzkfK8w= github.com/ugorji/go v1.1.1/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ= github.com/vmware/govmomi v0.20.0 h1:+1IyhvoVb5JET2Wvgw9J3ZDv6CK4sxzUunpH8LhQqm4= github.com/vmware/govmomi v0.20.0/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU= +github.com/vmware/govmomi v0.21.1-0.20190821201433-8bdc2d6fc858 h1:kVY/A59m1eRowd1b9ECtZsm0jieqcmOm48p7JAeyFug= +github.com/vmware/govmomi v0.21.1-0.20190821201433-8bdc2d6fc858/go.mod h1:zbnFoBQ9GIjs2RVETy8CNEpb+L+Lwkjs3XZUL0B3/m0= +github.com/vmware/vmw-guestinfo v0.0.0-20170707015358-25eff159a728/go.mod h1:x9oS4Wk2s2u4tS29nEaDLdzvuHdB19CvSGJjPgkZJNk= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk= @@ -158,14 +163,22 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90Pveol golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586 h1:7KByu05hhLed2MO29w7p1XfZvZ13m8mub3shuVftRs0= +golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3 h1:x/bBzNauLQAlE3fLku/xy92Y8QwKX5HZymrMz2IiKFc= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190409202823-959b441ac422 h1:QzoH/1pFpZguR8NrRHLcO6jKqfv2zpuSqZLgdm7ZmjI= +golang.org/x/lint v0.0.0-20190409202823-959b441ac422/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190628185345-da137c7871d7 h1:rTIdg5QFRR7XCaK4LCjBiPbx8j4DQRpdYMnGn/bJUEU= golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297 h1:k7pJ2yAPLPgbskkFdhRCsA77k2fySZ1zf2zCjvQCiIM= +golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -182,6 +195,8 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7 h1:LepdCS8Gf/MVejFIt8lsiexZATdoGVyp5bcyS+rYoUI= golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 h1:ng0gs1AKnRRuEMZoTLLlbOd+C17zUDepwGQBb/n+JVg= +golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= @@ -192,6 +207,12 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b h1:qMK98NmNCRVDIYFycQ5yVRkvgDUFfdP8Ip4KqmDEB7g= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190827152308-062dbaebb618 h1:WtF22n/HcPWMhvZm4KWiQ0FcC1m8kk5ILpXYtY+qN7s= +golang.org/x/tools v0.0.0-20190827152308-062dbaebb618/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20190827205025-b29f5f60c37a h1:0JEq5ZQ3TgsRlFmz4BcD+E6U6cOk4pOImCQSyIG59ZM= +golang.org/x/tools v0.0.0-20190827205025-b29f5f60c37a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= diff --git a/pkg/common/cns-lib/node/cache.go b/pkg/common/cns-lib/node/cache.go new file mode 100644 index 0000000000..a7863c0861 --- /dev/null +++ b/pkg/common/cns-lib/node/cache.go @@ -0,0 +1,190 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "errors" + "strings" + "sync" + + "k8s.io/klog" +) + +// ErrNodeAlreadyExists is returned if there's exists a node with +// the same name but different UUID. +var ErrNodeAlreadyExists = errors.New("another node with the same name exists") + +// Cache provides thread-safe functionality to cache node information. Note that +// node names are handled in a case sensitive manner, and must be unique. +type Cache interface { + // DeleteNodeByUUID deletes a node entry by its UUID and returns its current name. + DeleteNodeByUUID(nodeUUID string) (string, error) + // DeleteNodeByName deletes a node entry by its name and returns its current UUID. + DeleteNodeByName(nodeName string) (string, error) + // LoadNodeNameByUUID returns a node's name given its UUID. + LoadNodeNameByUUID(nodeUUID string) (string, error) + // LoadNodeUUIDByName returns a node's UUID given its name. + LoadNodeUUIDByName(nodeName string) (string, error) + // Range calls f sequentially for each node entry. + Range(f func(nodeUUID, nodeName string) bool) + // Store associates the node UUID with its name. If the node UUID already + // exists in the Cache, the name associated with it is updated. + Store(nodeUUID, nodeName string) error +} + +var ( + // cacheInstance is a Cache singleton. + cacheInstance *defaultCache + // onceForCache is used for initializing the Cache singleton. + onceForCache sync.Once +) + +// GetCache returns the Cache singleton. +func GetCache() Cache { + onceForCache.Do(func() { + klog.V(1).Info("Initializing node.defaultCache") + cacheInstance = &defaultCache{ + uuidsToNames: make(map[string]string), + namesToUUIDs: make(map[string]string), + } + klog.V(1).Info("node.defaultCache initialized") + }) + return cacheInstance +} + +// defaultCache caches node information and provides functionality around it. +type defaultCache struct { + // mutex is used to ensure atomicity. + sync.Mutex + // uuidsToNames map node UUIDs to their names. + uuidsToNames map[string]string + // namesToUUIDs map node names to their UUIDs. + namesToUUIDs map[string]string +} + +func normalizeUUID(nodeUUID string) string { + return strings.ToLower(nodeUUID) +} + +func (c *defaultCache) DeleteNodeByUUID(nodeUUID string) (string, error) { + c.Lock() + defer c.Unlock() + + nodeUUID = normalizeUUID(nodeUUID) + nodeName, exists := c.uuidsToNames[nodeUUID] + if !exists { + klog.Warningf("Node entry wasn't found with nodeUUID %s", nodeUUID) + return "", ErrNodeNotFound + } + + delete(c.uuidsToNames, nodeUUID) + delete(c.namesToUUIDs, nodeName) + klog.V(2).Infof("Node entry was deleted with nodeUUID %s and nodeName %s", nodeUUID, nodeName) + + return nodeName, nil +} + +func (c *defaultCache) DeleteNodeByName(nodeName string) (string, error) { + c.Lock() + defer c.Unlock() + + nodeUUID, exists := c.namesToUUIDs[nodeName] + if !exists { + klog.Warningf("Node entry wasn't found with nodeName %s", nodeName) + return "", ErrNodeNotFound + } + + delete(c.namesToUUIDs, nodeName) + delete(c.uuidsToNames, nodeUUID) + klog.V(2).Infof("Node entry was deleted with nodeUUID %s and nodeName %s", nodeUUID, nodeName) + return nodeUUID, nil +} + +func (c *defaultCache) LoadNodeNameByUUID(nodeUUID string) (string, error) { + nodeUUID = normalizeUUID(nodeUUID) + c.Lock() + nodeName, exists := c.uuidsToNames[nodeUUID] + c.Unlock() + + if !exists { + klog.Warningf("Node entry wasn't found with nodeUUID %s", nodeUUID) + return "", ErrNodeNotFound + } + klog.V(2).Infof("Node entry was loaded with nodeUUID %s and nodeName %s", nodeUUID, nodeName) + return nodeName, nil +} + +func (c *defaultCache) LoadNodeUUIDByName(nodeName string) (string, error) { + c.Lock() + nodeUUID, exists := c.namesToUUIDs[nodeName] + c.Unlock() + + if !exists { + klog.Warningf("Node entry wasn't found with nodeName %s", nodeName) + return "", ErrNodeNotFound + } + + klog.V(2).Infof("Node entry was loaded with nodeUUID %s and nodeName %s", nodeUUID, nodeName) + return nodeUUID, nil +} + +func (c *defaultCache) Range(findNodeEntry func(string, string) bool) { + c.Lock() + defer c.Unlock() + + for nodeUUID, nodeName := range c.uuidsToNames { + klog.V(4).Infof("Calling findNodeEntry func for node entry with nodeUUID %s and nodeName %s", + nodeUUID, nodeName) + + if !findNodeEntry(nodeUUID, nodeName) { + klog.V(4).Infof("findNodeEntry func returned false for node entry with nodeUUID %s and nodeName %s, "+ + "breaking", nodeUUID, nodeName) + break + } + } +} + +func (c *defaultCache) Store(nodeUUID, nodeName string) error { + c.Lock() + defer c.Unlock() + + // Return an error if there exists a node with the same name but different UUID. + nodeUUID = normalizeUUID(nodeUUID) + prevNameForUUID, prevNameExistsForUUID := c.uuidsToNames[nodeUUID] + prevUUIDForName, prevUUIDExistsForName := c.namesToUUIDs[nodeName] + if prevNameExistsForUUID && prevUUIDExistsForName && prevUUIDForName != nodeUUID { + klog.Errorf("Another node with the same name %s exists with nodeUUID %s and prevUUIDForName %s", + nodeName, nodeUUID, prevUUIDForName) + return ErrNodeAlreadyExists + } + + // Clear cache entry for the given UUID and name, along with previously associated values. + delete(c.uuidsToNames, nodeUUID) + delete(c.namesToUUIDs, nodeName) + if prevNameExistsForUUID { + delete(c.namesToUUIDs, prevNameForUUID) + } + if prevUUIDExistsForName { + delete(c.uuidsToNames, prevUUIDForName) + } + + // Store the new node UUID and name. + c.uuidsToNames[nodeUUID] = nodeName + c.namesToUUIDs[nodeName] = nodeUUID + klog.V(2).Infof("Node entry was stored with nodeUUID %s and nodeName %s", nodeUUID, nodeName) + return nil +} diff --git a/pkg/common/cns-lib/node/manager.go b/pkg/common/cns-lib/node/manager.go new file mode 100644 index 0000000000..642eb9a621 --- /dev/null +++ b/pkg/common/cns-lib/node/manager.go @@ -0,0 +1,252 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "errors" + "sync" + + clientset "k8s.io/client-go/kubernetes" + "k8s.io/klog" + k8s "sigs.k8s.io/vsphere-csi-driver/pkg/kubernetes" + + "sigs.k8s.io/vsphere-csi-driver/pkg/common/cns-lib/vsphere" +) + +var ( + // ErrNodeNotFound is returned when a node isn't found. + ErrNodeNotFound = errors.New("node wasn't found") + // ErrEmptyProviderID is returned when it is observed that provider id is not set on the kubernetes cluster + ErrEmptyProviderID = errors.New("node with empty providerId present in the cluster") +) + +// Manager provides functionality to manage nodes. +type Manager interface { + // SetKubernetesClient sets kubernetes client for node manager + SetKubernetesClient(clientset.Interface) + // RegisterNode registers a node given its UUID, name. + RegisterNode(nodeUUID string, nodeName string) error + // DiscoverNode discovers a registered node given its UUID. This method + // scans all virtual centers registered on the VirtualCenterManager for a + // virtual machine with the given UUID. + DiscoverNode(nodeUUID string) error + // GetNode refreshes and returns the VirtualMachine for a registered node + // given its UUID. + GetNode(nodeUUID string) (*vsphere.VirtualMachine, error) + // GetNodeByName refreshes and returns the VirtualMachine for a registered node + // given its name. + GetNodeByName(nodeName string) (*vsphere.VirtualMachine, error) + // GetAllNodes refreshes and returns VirtualMachine for all registered + // nodes. If nodes are added or removed concurrently, they may or may not be + // reflected in the result of a call to this method. + GetAllNodes() ([]*vsphere.VirtualMachine, error) + // UnregisterNode unregisters a registered node given its name. + UnregisterNode(nodeName string) error +} + +// Metadata represents node metadata. +type Metadata interface{} + +var ( + // managerInstance is a Manager singleton. + managerInstance *nodeManager + // onceForManager is used for initializing the Manager singleton. + onceForManager sync.Once +) + +// GetManager returns the Manager singleton. +func GetManager() Manager { + onceForManager.Do(func() { + klog.V(1).Info("Initializing node.nodeManager...") + managerInstance = &nodeManager{ + nodeVMs: sync.Map{}, + } + klog.V(1).Info("node.nodeManager initialized") + }) + return managerInstance +} + +// nodeManager holds node information and provides functionality around it. +type nodeManager struct { + // nodeVMs maps node UUIDs to VirtualMachine objects. + nodeVMs sync.Map + // node name to node UUI map. + nodeNameToUUID sync.Map + // k8s client + k8sClient clientset.Interface +} + +// SetKubernetesClient sets specified kubernetes client to nodeManager.k8sClient +func (m *nodeManager) SetKubernetesClient(client clientset.Interface) { + m.k8sClient = client +} + +// RegisterNode registers a node with node manager using its UUID, name. +func (m *nodeManager) RegisterNode(nodeUUID string, nodeName string) error { + m.nodeNameToUUID.Store(nodeName, nodeUUID) + klog.V(2).Infof("Successfully registered node: %q with nodeUUID %q", nodeName, nodeUUID) + err := m.DiscoverNode(nodeUUID) + if err != nil { + klog.Errorf("Failed to discover VM with uuid: %q for node: %q", nodeUUID, nodeName) + return err + } + klog.V(2).Infof("Successfully discovered node: %q with nodeUUID %q", nodeName, nodeUUID) + return nil +} + +// DiscoverNode discovers a registered node given its UUID from vCenter. +// If node is not found in the vCenter for the given UUID, for ErrVMNotFound is returned to the caller +func (m *nodeManager) DiscoverNode(nodeUUID string) error { + vm, err := vsphere.GetVirtualMachineByUUID(nodeUUID, false) + if err != nil { + klog.Errorf("Couldn't find VM instance with nodeUUID %s, failed to discover with err: %v", nodeUUID, err) + return err + } + m.nodeVMs.Store(nodeUUID, vm) + klog.V(2).Infof("Successfully discovered node with nodeUUID %s in vm %v", nodeUUID, vm) + return nil +} + +// GetNodeByName refreshes and returns the VirtualMachine for a registered node +// given its name. +func (m *nodeManager) GetNodeByName(nodeName string) (*vsphere.VirtualMachine, error) { + nodeUUID, found := m.nodeNameToUUID.Load(nodeName) + if !found { + klog.Errorf("Node not found with nodeName %s", nodeName) + return nil, ErrNodeNotFound + } + if nodeUUID != nil && nodeUUID.(string) != "" { + return m.GetNode(nodeUUID.(string)) + } + klog.V(2).Infof("Empty nodeUUID observed in cache for the node: %q", nodeName) + k8snodeUUID, err := k8s.GetNodeVMUUID(m.k8sClient, nodeName) + if err != nil { + klog.Errorf("Failed to get providerId from node: %q. Err: %v", nodeName, err) + return nil, err + } + m.nodeNameToUUID.Store(nodeName, k8snodeUUID) + return m.GetNode(k8snodeUUID) + +} + +// GetNode refreshes and returns the VirtualMachine for a registered node +// given its UUID +func (m *nodeManager) GetNode(nodeUUID string) (*vsphere.VirtualMachine, error) { + vmInf, discovered := m.nodeVMs.Load(nodeUUID) + if !discovered { + klog.V(2).Infof("Node hasn't been discovered yet with nodeUUID %s", nodeUUID) + + if err := m.DiscoverNode(nodeUUID); err != nil { + klog.Errorf("Failed to discover node with nodeUUID %s with err: %v", nodeUUID, err) + return nil, err + } + + vmInf, _ = m.nodeVMs.Load(nodeUUID) + klog.V(2).Infof("Node was successfully discovered with nodeUUID %s in vm %v", nodeUUID, vmInf) + + return vmInf.(*vsphere.VirtualMachine), nil + } + + vm := vmInf.(*vsphere.VirtualMachine) + klog.V(1).Infof("Renewing virtual machine %v with nodeUUID %s", vm, nodeUUID) + + if err := vm.Renew(true); err != nil { + klog.Errorf("Failed to renew VM %v with nodeUUID %s with err: %v", vm, nodeUUID, err) + return nil, err + } + + klog.V(1).Infof("VM %v was successfully renewed with nodeUUID %s", vm, nodeUUID) + return vm, nil +} + +// GetAllNodes refreshes and returns VirtualMachine for all registered nodes. +func (m *nodeManager) GetAllNodes() ([]*vsphere.VirtualMachine, error) { + var vms []*vsphere.VirtualMachine + var err error + reconnectedHosts := make(map[string]bool) + + m.nodeNameToUUID.Range(func(nodeName, nodeUUID interface{}) bool { + if nodeName != nil && nodeUUID != nil && nodeUUID.(string) == "" { + klog.V(2).Infof("Empty node UUID observed for the node: %q", nodeName) + k8snodeUUID, err := k8s.GetNodeVMUUID(m.k8sClient, nodeName.(string)) + if err != nil { + klog.Errorf("Failed to get providerId from node: %q. Err: %v", nodeName, err) + return true + } + if k8snodeUUID == "" { + klog.Errorf("Node: %q with empty providerId found in the cluster. aborting get all nodes", nodeName) + err = ErrEmptyProviderID + return true + } + m.nodeNameToUUID.Store(nodeName, k8snodeUUID) + return false + } + return true + }) + + if err != nil { + return nil, err + } + m.nodeVMs.Range(func(nodeUUIDInf, vmInf interface{}) bool { + // If an entry was concurrently deleted from vm, Range could + // possibly return a nil value for that key. + // See https://golang.org/pkg/sync/#Map.Range for more info. + if vmInf == nil { + klog.Warningf("VM instance was nil, ignoring with nodeUUID %v", nodeUUIDInf) + return true + } + + nodeUUID := nodeUUIDInf.(string) + vm := vmInf.(*vsphere.VirtualMachine) + + if reconnectedHosts[vm.VirtualCenterHost] { + klog.V(3).Infof("Renewing VM %v, no new connection needed: nodeUUID %s", vm, nodeUUID) + err = vm.Renew(false) + } else { + klog.V(3).Infof("Renewing VM %v with new connection: nodeUUID %s", vm, nodeUUID) + err = vm.Renew(true) + reconnectedHosts[vm.VirtualCenterHost] = true + } + + if err != nil { + klog.Errorf("Failed to renew VM %v with nodeUUID %s, aborting get all nodes", vm, nodeUUID) + return false + } + + klog.V(3).Infof("Updated VM %v for node with nodeUUID %s", vm, nodeUUID) + vms = append(vms, vm) + return true + }) + + if err != nil { + return nil, err + } + return vms, nil +} + +// UnregisterNode unregisters a registered node given its name. +func (m *nodeManager) UnregisterNode(nodeName string) error { + nodeUUID, found := m.nodeNameToUUID.Load(nodeName) + if !found { + klog.Errorf("Node wasn't found, failed to unregister node: %q", nodeName) + return ErrNodeNotFound + } + m.nodeNameToUUID.Delete(nodeName) + m.nodeVMs.Delete(nodeUUID) + klog.V(2).Infof("Successfully unregistered node with nodeName %s", nodeName) + return nil +} diff --git a/pkg/common/cns-lib/volume/manager.go b/pkg/common/cns-lib/volume/manager.go new file mode 100644 index 0000000000..b924bc6b64 --- /dev/null +++ b/pkg/common/cns-lib/volume/manager.go @@ -0,0 +1,432 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volume + +import ( + "context" + "errors" + "sync" + + "github.com/davecgh/go-spew/spew" + "github.com/vmware/govmomi/cns" + cnstypes "github.com/vmware/govmomi/cns/types" + "k8s.io/klog" + + cnsvsphere "sigs.k8s.io/vsphere-csi-driver/pkg/common/cns-lib/vsphere" +) + +// Manager provides functionality to manage volumes. +type Manager interface { + // CreateVolume creates a new volume given its spec. + CreateVolume(spec *cnstypes.CnsVolumeCreateSpec) (*cnstypes.CnsVolumeId, error) + // AttachVolume attaches a volume to a virtual machine given the spec. + AttachVolume(vm *cnsvsphere.VirtualMachine, volumeID string) (string, error) + // DetachVolume detaches a volume from the virtual machine given the spec. + DetachVolume(vm *cnsvsphere.VirtualMachine, volumeID string) error + // DeleteVolume deletes a volume given its spec. + DeleteVolume(volumeID string, deleteDisk bool) error + // UpdateVolumeMetadata updates a volume metadata given its spec. + UpdateVolumeMetadata(spec *cnstypes.CnsVolumeMetadataUpdateSpec) error + // QueryVolume returns volumes matching the given filter. + QueryVolume(queryFilter cnstypes.CnsQueryFilter) (*cnstypes.CnsQueryResult, error) + // QueryAllVolume returns all volumes matching the given filter and selection. + QueryAllVolume(queryFilter cnstypes.CnsQueryFilter, querySelection cnstypes.CnsQuerySelection) (*cnstypes.CnsQueryResult, error) +} + +var ( + // managerInstance is a Manager singleton. + managerInstance *volumeManager + // onceForManager is used for initializing the Manager singleton. + onceForManager sync.Once +) + +// GetManager returns the Manager singleton. +func GetManager(vc *cnsvsphere.VirtualCenter) Manager { + onceForManager.Do(func() { + klog.V(1).Infof("Initializing volume.volumeManager...") + managerInstance = &volumeManager{ + virtualCenter: vc, + } + klog.V(1).Infof("volume.volumeManager initialized") + }) + return managerInstance +} + +// DefaultManager provides functionality to manage volumes. +type volumeManager struct { + virtualCenter *cnsvsphere.VirtualCenter +} + +// CreateVolume creates a new volume given its spec. +func (m *volumeManager) CreateVolume(spec *cnstypes.CnsVolumeCreateSpec) (*cnstypes.CnsVolumeId, error) { + err := validateManager(m) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Set up the VC connection + err = m.virtualCenter.ConnectCNS(ctx) + if err != nil { + klog.Errorf("ConnectCNS failed with err: %+v", err) + return nil, err + } + // If the VSphereUser in the CreateSpec is different from session user, update the CreateSpec + s, err := m.virtualCenter.Client.SessionManager.UserSession(ctx) + if err != nil { + klog.Errorf("Failed to get usersession with err: %v", err) + return nil, err + } + if s.UserName != spec.Metadata.ContainerCluster.VSphereUser { + klog.V(4).Infof("Update VSphereUser from %s to %s", spec.Metadata.ContainerCluster.VSphereUser, s.UserName) + spec.Metadata.ContainerCluster.VSphereUser = s.UserName + } + + // Construct the CNS VolumeCreateSpec list + var cnsCreateSpecList []cnstypes.CnsVolumeCreateSpec + cnsCreateSpecList = append(cnsCreateSpecList, *spec) + // Call the CNS CreateVolume + task, err := m.virtualCenter.CnsClient.CreateVolume(ctx, cnsCreateSpecList) + if err != nil { + klog.Errorf("CNS CreateVolume failed from vCenter %q with err: %v", m.virtualCenter.Config.Host, err) + return nil, err + } + // Get the taskInfo + taskInfo, err := cns.GetTaskInfo(ctx, task) + if err != nil { + klog.Errorf("Failed to get taskInfo for CreateVolume task from vCenter %q with err: %v", m.virtualCenter.Config.Host, err) + return nil, err + } + klog.V(2).Infof("CreateVolume: VolumeName: %q, opId: %q", spec.Name, taskInfo.ActivationId) + // Get the taskResult + taskResult, err := cns.GetTaskResult(ctx, taskInfo) + + if err != nil { + klog.Errorf("unable to find the task result for CreateVolume task from vCenter %q. taskID: %q, opId: %q createResults: %+v", + m.virtualCenter.Config.Host, taskInfo.Task.Value, taskInfo.ActivationId, taskResult) + return nil, err + } + + if taskResult == nil { + klog.Errorf("taskResult is empty for CreateVolume task: %q", taskInfo.ActivationId) + return nil, errors.New("taskResult is empty") + } + volumeOperationRes := taskResult.GetCnsVolumeOperationResult() + if volumeOperationRes.Fault != nil { + klog.Errorf("failed to create cns volume. createSpec: %q, fault: %q, opId: %q", spew.Sdump(spec), spew.Sdump(volumeOperationRes.Fault), taskInfo.ActivationId) + return nil, errors.New(volumeOperationRes.Fault.LocalizedMessage) + } + klog.V(2).Infof("CreateVolume: Volume created successfully. VolumeName: %q, opId: %q, volumeID: %q", spec.Name, taskInfo.ActivationId, volumeOperationRes.VolumeId.Id) + return &cnstypes.CnsVolumeId{ + Id: volumeOperationRes.VolumeId.Id, + }, nil +} + +// AttachVolume attaches a volume to a virtual machine given the spec. +func (m *volumeManager) AttachVolume(vm *cnsvsphere.VirtualMachine, volumeID string) (string, error) { + err := validateManager(m) + if err != nil { + return "", err + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Set up the VC connection + err = m.virtualCenter.ConnectCNS(ctx) + if err != nil { + klog.Errorf("ConnectCNS failed with err: %+v", err) + return "", err + } + // Construct the CNS AttachSpec list + var cnsAttachSpecList []cnstypes.CnsVolumeAttachDetachSpec + cnsAttachSpec := cnstypes.CnsVolumeAttachDetachSpec{ + VolumeId: cnstypes.CnsVolumeId{ + Id: volumeID, + }, + Vm: vm.Reference(), + } + cnsAttachSpecList = append(cnsAttachSpecList, cnsAttachSpec) + // Call the CNS AttachVolume + task, err := m.virtualCenter.CnsClient.AttachVolume(ctx, cnsAttachSpecList) + if err != nil { + klog.Errorf("CNS AttachVolume failed from vCenter %q with err: %v", m.virtualCenter.Config.Host, err) + return "", err + } + // Get the taskInfo + taskInfo, err := cns.GetTaskInfo(ctx, task) + if err != nil { + klog.Errorf("Failed to get taskInfo for AttachVolume task from vCenter %q with err: %v", m.virtualCenter.Config.Host, err) + return "", err + } + klog.V(2).Infof("AttachVolume: volumeID: %q, vm: %q, opId: %q", volumeID, vm.String(), taskInfo.ActivationId) + // Get the taskResult + taskResult, err := cns.GetTaskResult(ctx, taskInfo) + if err != nil { + klog.Errorf("unable to find the task result for AttachVolume task from vCenter %q with taskID %s and attachResults %v", + m.virtualCenter.Config.Host, taskInfo.Task.Value, taskResult) + return "", err + } + + if taskResult == nil { + klog.Errorf("taskResult is empty for AttachVolume task: %q, opId: %q", taskInfo.Task.Value, taskInfo.ActivationId) + return "", errors.New("taskResult is empty") + } + + volumeOperationRes := taskResult.GetCnsVolumeOperationResult() + if volumeOperationRes.Fault != nil { + if volumeOperationRes.Fault.LocalizedMessage == CNSVolumeResourceInUseFaultMessage { + // Volume is already attached to VM + diskUUID, err := GetDiskAttachedToVM(ctx, vm, volumeID) + if err != nil { + return "", err + } + if diskUUID != "" { + return diskUUID, nil + } + } + klog.Errorf("failed to attach cns volume: %q to node vm: %q. fault: %q. opId: %q", volumeID, vm.String(), spew.Sdump(volumeOperationRes.Fault), taskInfo.ActivationId) + return "", errors.New(volumeOperationRes.Fault.LocalizedMessage) + } + diskUUID := interface{}(taskResult).(*cnstypes.CnsVolumeAttachResult).DiskUUID + klog.V(2).Infof("AttachVolume: Volume attached successfully. volumeID: %q, opId: %q, vm: %q, diskUUID: %q", volumeID, taskInfo.ActivationId, vm.String(), diskUUID) + return diskUUID, nil +} + +// DetachVolume detaches a volume from the virtual machine given the spec. +func (m *volumeManager) DetachVolume(vm *cnsvsphere.VirtualMachine, volumeID string) error { + err := validateManager(m) + if err != nil { + return err + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Set up the VC connection + err = m.virtualCenter.ConnectCNS(ctx) + if err != nil { + klog.Errorf("ConnectCNS failed with err: %+v", err) + return err + } + // Construct the CNS DetachSpec list + var cnsDetachSpecList []cnstypes.CnsVolumeAttachDetachSpec + cnsDetachSpec := cnstypes.CnsVolumeAttachDetachSpec{ + VolumeId: cnstypes.CnsVolumeId{ + Id: volumeID, + }, + Vm: vm.Reference(), + } + cnsDetachSpecList = append(cnsDetachSpecList, cnsDetachSpec) + // Call the CNS DetachVolume + task, err := m.virtualCenter.CnsClient.DetachVolume(ctx, cnsDetachSpecList) + if err != nil { + klog.Errorf("CNS DetachVolume failed from vCenter %q with err: %v", m.virtualCenter.Config.Host, err) + return err + } + // Get the taskInfo + taskInfo, err := cns.GetTaskInfo(ctx, task) + if err != nil { + klog.Errorf("Failed to get taskInfo for DetachVolume task from vCenter %q with err: %v", m.virtualCenter.Config.Host, err) + return err + } + klog.V(2).Infof("DetachVolume: volumeID: %q, vm: %q, opId: %q", volumeID, vm.String(), taskInfo.ActivationId) + // Get the task results for the given task + taskResult, err := cns.GetTaskResult(ctx, taskInfo) + if err != nil { + klog.Errorf("unable to find the task result for DetachVolume task from vCenter %q with taskID %s and detachResults %v", + m.virtualCenter.Config.Host, taskInfo.Task.Value, taskResult) + return err + } + + if taskResult == nil { + klog.Errorf("taskResult is empty for DetachVolume task: %q, opId: %q", taskInfo.Task.Value, taskInfo.ActivationId) + return errors.New("taskResult is empty") + } + + volumeOperationRes := taskResult.GetCnsVolumeOperationResult() + + if volumeOperationRes.Fault != nil { + klog.Errorf("failed to detach cns volume:%q from node vm: %q. fault: %q, opId: %q", volumeID, vm.InventoryPath, spew.Sdump(volumeOperationRes.Fault), taskInfo.ActivationId) + return errors.New(volumeOperationRes.Fault.LocalizedMessage) + } + klog.V(2).Infof("DetachVolume: Volume detached successfully. volumeID: %q, vm: %q, opId: %q", volumeID, taskInfo.ActivationId, vm.String()) + return nil +} + +// DeleteVolume deletes a volume given its spec. +func (m *volumeManager) DeleteVolume(volumeID string, deleteDisk bool) error { + err := validateManager(m) + if err != nil { + return err + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Set up the VC connection + err = m.virtualCenter.ConnectCNS(ctx) + if err != nil { + klog.Errorf("ConnectCNS failed with err: %+v", err) + return err + } + // Construct the CNS VolumeId list + var cnsVolumeIDList []cnstypes.CnsVolumeId + cnsVolumeID := cnstypes.CnsVolumeId{ + Id: volumeID, + } + // Call the CNS DeleteVolume + cnsVolumeIDList = append(cnsVolumeIDList, cnsVolumeID) + task, err := m.virtualCenter.CnsClient.DeleteVolume(ctx, cnsVolumeIDList, deleteDisk) + if err != nil { + klog.Errorf("CNS DeleteVolume failed from vCenter %q with err: %v", m.virtualCenter.Config.Host, err) + return err + } + // Get the taskInfo + taskInfo, err := cns.GetTaskInfo(ctx, task) + if err != nil { + klog.Errorf("Failed to get taskInfo for DeleteVolume task from vCenter %q with err: %v", m.virtualCenter.Config.Host, err) + return err + } + klog.V(2).Infof("DeleteVolume: volumeID: %q, opId: %q", volumeID, taskInfo.ActivationId) + // Get the task results for the given task + taskResult, err := cns.GetTaskResult(ctx, taskInfo) + if err != nil { + klog.Errorf("unable to find the task result for DeleteVolume task from vCenter %q with taskID %s and deleteResults %v", + m.virtualCenter.Config.Host, taskInfo.Task.Value, taskResult) + return err + } + if taskResult == nil { + klog.Errorf("taskResult is empty for DeleteVolume task: %q, opID: %q", taskInfo.Task.Value, taskInfo.ActivationId) + return errors.New("taskResult is empty") + } + + volumeOperationRes := taskResult.GetCnsVolumeOperationResult() + if volumeOperationRes.Fault != nil { + klog.Errorf("Failed to delete volume: %q, fault: %q, opID: %q", volumeID, spew.Sdump(volumeOperationRes.Fault), taskInfo.ActivationId) + return errors.New(volumeOperationRes.Fault.LocalizedMessage) + } + klog.V(2).Infof("DeleteVolume: Volume deleted successfully. volumeID: %q, opId: %q", volumeID, taskInfo.ActivationId) + return nil +} + +// UpdateVolume updates a volume given its spec. +func (m *volumeManager) UpdateVolumeMetadata(spec *cnstypes.CnsVolumeMetadataUpdateSpec) error { + err := validateManager(m) + if err != nil { + return err + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Set up the VC connection + err = m.virtualCenter.ConnectCNS(ctx) + if err != nil { + klog.Errorf("ConnectCNS failed with err: %+v", err) + return err + } + // If the VSphereUser in the VolumeMetadataUpdateSpec is different from session user, update the VolumeMetadataUpdateSpec + s, err := m.virtualCenter.Client.SessionManager.UserSession(ctx) + if err != nil { + klog.Errorf("Failed to get usersession with err: %v", err) + return err + } + if s.UserName != spec.Metadata.ContainerCluster.VSphereUser { + klog.V(4).Infof("Update VSphereUser from %s to %s", spec.Metadata.ContainerCluster.VSphereUser, s.UserName) + spec.Metadata.ContainerCluster.VSphereUser = s.UserName + } + + var cnsUpdateSpecList []cnstypes.CnsVolumeMetadataUpdateSpec + cnsUpdateSpec := cnstypes.CnsVolumeMetadataUpdateSpec{ + VolumeId: cnstypes.CnsVolumeId{ + Id: spec.VolumeId.Id, + }, + Metadata: spec.Metadata, + } + cnsUpdateSpecList = append(cnsUpdateSpecList, cnsUpdateSpec) + task, err := m.virtualCenter.CnsClient.UpdateVolumeMetadata(ctx, cnsUpdateSpecList) + if err != nil { + klog.Errorf("CNS UpdateVolume failed from vCenter %q with err: %v", m.virtualCenter.Config.Host, err) + return err + } + // Get the taskInfo + taskInfo, err := cns.GetTaskInfo(ctx, task) + if err != nil { + klog.Errorf("Failed to get taskInfo for UpdateVolume task from vCenter %q with err: %v", m.virtualCenter.Config.Host, err) + return err + } + klog.V(2).Infof("UpdateVolumeMetadata: volumeID: %q, opId: %q", spec.VolumeId.Id, taskInfo.ActivationId) + // Get the task results for the given task + taskResult, err := cns.GetTaskResult(ctx, taskInfo) + if err != nil { + klog.Errorf("unable to find the task result for UpdateVolume task from vCenter %q with taskID %q, opId: %q and updateResults %+v", + m.virtualCenter.Config.Host, taskInfo.Task.Value, taskInfo.ActivationId, taskResult) + return err + } + + if taskResult == nil { + klog.Errorf("taskResult is empty for UpdateVolume task: %q, opId: %q", taskInfo.Task.Value, taskInfo.ActivationId) + return errors.New("taskResult is empty") + } + volumeOperationRes := taskResult.GetCnsVolumeOperationResult() + if volumeOperationRes.Fault != nil { + klog.Errorf("Failed to update volume. updateSpec: %q, fault: %q, opID: %q", spew.Sdump(spec), spew.Sdump(volumeOperationRes.Fault), taskInfo.ActivationId) + return errors.New(volumeOperationRes.Fault.LocalizedMessage) + } + klog.V(2).Infof("UpdateVolumeMetadata: Volume metadata updated successfully. volumeID: %q, opId: %q", spec.VolumeId.Id, taskInfo.ActivationId) + return nil +} + +// QueryVolume returns volumes matching the given filter. +func (m *volumeManager) QueryVolume(queryFilter cnstypes.CnsQueryFilter) (*cnstypes.CnsQueryResult, error) { + err := validateManager(m) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Set up the VC connection + err = m.virtualCenter.ConnectCNS(ctx) + if err != nil { + klog.Errorf("ConnectCNS failed with err: %+v", err) + return nil, err + } + //Call the CNS QueryVolume + res, err := m.virtualCenter.CnsClient.QueryVolume(ctx, queryFilter) + if err != nil { + klog.Errorf("CNS QueryVolume failed from vCenter %q with err: %v", m.virtualCenter.Config.Host, err) + return nil, err + } + return res, err +} + +// QueryAllVolume returns all volumes matching the given filter and selection. +func (m *volumeManager) QueryAllVolume(queryFilter cnstypes.CnsQueryFilter, querySelection cnstypes.CnsQuerySelection) (*cnstypes.CnsQueryResult, error) { + err := validateManager(m) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Set up the VC connection + err = m.virtualCenter.ConnectCNS(ctx) + if err != nil { + klog.Errorf("ConnectCNS failed with err: %+v", err) + return nil, err + } + //Call the CNS QueryAllVolume + res, err := m.virtualCenter.CnsClient.QueryAllVolume(ctx, queryFilter, querySelection) + if err != nil { + klog.Errorf("CNS QueryAllVolume failed from vCenter %q with err: %v", m.virtualCenter.Config.Host, err) + return nil, err + } + return res, err +} diff --git a/pkg/common/cns-lib/volume/util.go b/pkg/common/cns-lib/volume/util.go new file mode 100644 index 0000000000..0b265e2b5a --- /dev/null +++ b/pkg/common/cns-lib/volume/util.go @@ -0,0 +1,67 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volume + +import ( + "context" + "errors" + + vimtypes "github.com/vmware/govmomi/vim25/types" + "k8s.io/klog" + + cnsvsphere "sigs.k8s.io/vsphere-csi-driver/pkg/common/cns-lib/vsphere" +) + +// version and namespace constants for task client +const ( + CNSVolumeResourceInUseFaultMessage = "The resource 'volume' is in use." +) + +func validateManager(m *volumeManager) error { + if m.virtualCenter == nil { + klog.Error( + "Virtual Center connection not established") + return errors.New("Virtual Center connection not established") + } + return nil +} + +// GetDiskAttachedToVM checks if the volume is attached to the VM. +// If the volume is attached to the VM, return disk uuid of the volume, else return empty string +func GetDiskAttachedToVM(ctx context.Context, vm *cnsvsphere.VirtualMachine, volumeID string) (string, error) { + // Verify if the volume id is on the VM backing virtual disk devices + vmDevices, err := vm.Device(ctx) + if err != nil { + klog.Errorf("Failed to get devices from vm: %s", vm.InventoryPath) + return "", err + } + for _, device := range vmDevices { + if vmDevices.TypeName(device) == "VirtualDisk" { + if virtualDisk, ok := device.(*vimtypes.VirtualDisk); ok { + if virtualDisk.VDiskId != nil && virtualDisk.VDiskId.Id == volumeID { + virtualDevice := device.GetVirtualDevice() + if backing, ok := virtualDevice.Backing.(*vimtypes.VirtualDiskFlatVer2BackingInfo); ok { + klog.V(3).Infof("Found diskUUID %s for volume %s on vm %s", backing.Uuid, volumeID, vm.InventoryPath) + return backing.Uuid, nil + } + } + } + } + } + klog.V(3).Infof("Volume %s is not attached to VM: %s", volumeID, vm.InventoryPath) + return "", nil +} diff --git a/pkg/common/cns-lib/vsphere/cns.go b/pkg/common/cns-lib/vsphere/cns.go new file mode 100644 index 0000000000..1ffbf388f6 --- /dev/null +++ b/pkg/common/cns-lib/vsphere/cns.go @@ -0,0 +1,60 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vsphere + +import ( + "context" + + "github.com/vmware/govmomi/cns" + "github.com/vmware/govmomi/vim25" + "k8s.io/klog" +) + +// NewCNSClient creates a new CNS client +func NewCNSClient(ctx context.Context, c *vim25.Client) (*cns.Client, error) { + cnsClient, err := cns.NewClient(ctx, c) + if err != nil { + klog.Errorf("Failed to create a new client for CNS. err: %v", err) + return nil, err + } + return cnsClient, nil +} + +// ConnectCNS creates a CNS client for the virtual center. +func (vc *VirtualCenter) ConnectCNS(ctx context.Context) error { + err := vc.Connect(ctx) + if err != nil { + klog.Errorf("Failed to connect to Virtual Center host %q with err: %v", vc.Config.Host, err) + return err + } + if vc.CnsClient == nil { + if vc.CnsClient, err = NewCNSClient(ctx, vc.Client.Client); err != nil { + klog.Errorf("Failed to create CNS client on vCenter host %q with err: %v", vc.Config.Host, err) + return err + } + } + return nil +} + +// DisconnectCNS destroys the CNS client for the virtual center. +func (vc *VirtualCenter) DisconnectCNS(ctx context.Context) { + if vc.CnsClient == nil { + klog.V(1).Info("CnsClient wasn't connected, ignoring") + } else { + vc.CnsClient = nil + } +} diff --git a/pkg/common/cns-lib/vsphere/datacenter.go b/pkg/common/cns-lib/vsphere/datacenter.go new file mode 100644 index 0000000000..dd3f87cbeb --- /dev/null +++ b/pkg/common/cns-lib/vsphere/datacenter.go @@ -0,0 +1,226 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vsphere + +import ( + "context" + "fmt" + "strings" + + "github.com/vmware/govmomi/find" + "github.com/vmware/govmomi/object" + "github.com/vmware/govmomi/property" + "github.com/vmware/govmomi/vim25/mo" + "github.com/vmware/govmomi/vim25/types" + "k8s.io/klog" +) + +// DatastoreInfoProperty refers to the property name info for the Datastore +const DatastoreInfoProperty = "info" + +// Datacenter holds virtual center information along with the Datacenter. +type Datacenter struct { + // Datacenter represents the govmomi Datacenter. + *object.Datacenter + // VirtualCenterHost represents the virtual center host address. + VirtualCenterHost string +} + +func (dc *Datacenter) String() string { + return fmt.Sprintf("Datacenter [Datacenter: %v, VirtualCenterHost: %v]", + dc.Datacenter, dc.VirtualCenterHost) +} + +// GetDatastoreByURL returns the *Datastore instance given its URL. +func (dc *Datacenter) GetDatastoreByURL(ctx context.Context, datastoreURL string) (*Datastore, error) { + finder := find.NewFinder(dc.Datacenter.Client(), false) + finder.SetDatacenter(dc.Datacenter) + datastores, err := finder.DatastoreList(ctx, "*") + if err != nil { + klog.Errorf("Failed to get all the datastores. err: %+v", err) + return nil, err + } + var dsList []types.ManagedObjectReference + for _, ds := range datastores { + dsList = append(dsList, ds.Reference()) + } + + var dsMoList []mo.Datastore + pc := property.DefaultCollector(dc.Client()) + properties := []string{DatastoreInfoProperty} + err = pc.Retrieve(ctx, dsList, properties, &dsMoList) + if err != nil { + klog.Errorf("Failed to get Datastore managed objects from datastore objects."+ + " dsObjList: %+v, properties: %+v, err: %v", dsList, properties, err) + return nil, err + } + for _, dsMo := range dsMoList { + if dsMo.Info.GetDatastoreInfo().Url == datastoreURL { + return &Datastore{object.NewDatastore(dc.Client(), dsMo.Reference()), + dc}, nil + } + } + err = fmt.Errorf("Couldn't find Datastore given URL %q", datastoreURL) + klog.Error(err) + return nil, err +} + +// GetVirtualMachineByUUID returns the VirtualMachine instance given its UUID in a datacenter. +// If instanceUUID is set to true, then UUID is an instance UUID. +// - In this case, this function searches for virtual machines whose instance UUID matches the given uuid. +// If instanceUUID is set to false, then UUID is BIOS UUID. +// - In this case, this function searches for virtual machines whose BIOS UUID matches the given uuid. +func (dc *Datacenter) GetVirtualMachineByUUID(ctx context.Context, uuid string, instanceUUID bool) (*VirtualMachine, error) { + uuid = strings.ToLower(strings.TrimSpace(uuid)) + searchIndex := object.NewSearchIndex(dc.Datacenter.Client()) + svm, err := searchIndex.FindByUuid(ctx, dc.Datacenter, uuid, true, &instanceUUID) + if err != nil { + klog.Errorf("Failed to find VM given uuid %s with err: %v", uuid, err) + return nil, err + } else if svm == nil { + klog.Errorf("Couldn't find VM given uuid %s", uuid) + return nil, ErrVMNotFound + } + vm := &VirtualMachine{ + VirtualCenterHost: dc.VirtualCenterHost, + UUID: uuid, + VirtualMachine: object.NewVirtualMachine(dc.Datacenter.Client(), svm.Reference()), + Datacenter: dc, + } + return vm, nil +} + +// asyncGetAllDatacenters returns *Datacenter instances over the given +// channel. If an error occurs, it will be returned via the given error channel. +// If the given context is canceled, the processing will be stopped as soon as +// possible, and the channels will be closed before returning. +func asyncGetAllDatacenters(ctx context.Context, dcsChan chan<- *Datacenter, errChan chan<- error) { + defer close(dcsChan) + defer close(errChan) + + for _, vc := range GetVirtualCenterManager().GetAllVirtualCenters() { + // If the context was canceled, we stop looking for more Datacenters. + select { + case <-ctx.Done(): + err := ctx.Err() + klog.V(2).Infof("Context was done, returning with err: %v", err) + errChan <- err + return + default: + } + + if err := vc.Connect(ctx); err != nil { + klog.Errorf("Failed connecting to VC %q with err: %v", vc.Config.Host, err) + errChan <- err + return + } + + dcs, err := vc.GetDatacenters(ctx) + if err != nil { + klog.Errorf("Failed to fetch datacenters for vc %v with err: %v", vc.Config.Host, err) + errChan <- err + return + } + + for _, dc := range dcs { + // If the context was canceled, we don't return more Datacenters. + select { + case <-ctx.Done(): + err := ctx.Err() + klog.V(2).Infof("Context was done, returning with err: %v", err) + errChan <- err + return + default: + klog.V(2).Infof("Publishing datacenter %v", dc) + dcsChan <- dc + } + } + } +} + +// AsyncGetAllDatacenters fetches all Datacenters asynchronously. The +// *Datacenter chan returns a *Datacenter on discovering one. The +// error chan returns a single error if one occurs. Both channels are closed +// when nothing more is to be sent. +// +// The buffer size for the *Datacenter chan can be specified via the +// buffSize parameter. For example, buffSize could be 1, in which case, the +// sender will buffer at most 1 *Datacenter instance (and possibly close +// the channel and terminate, if that was the only instance found). +// +// Note that a context.Canceled error would be returned if the context was +// canceled at some point during the execution of this function. +func AsyncGetAllDatacenters(ctx context.Context, buffSize int) (<-chan *Datacenter, <-chan error) { + dcsChan := make(chan *Datacenter, buffSize) + errChan := make(chan error, 1) + go asyncGetAllDatacenters(ctx, dcsChan, errChan) + return dcsChan, errChan +} + +// GetVMMoList gets the VM Managed Objects with the given properties from the VM object +func (dc *Datacenter) GetVMMoList(ctx context.Context, vmObjList []*VirtualMachine, properties []string) ([]mo.VirtualMachine, error) { + var vmMoList []mo.VirtualMachine + var vmRefs []types.ManagedObjectReference + if len(vmObjList) < 1 { + msg := fmt.Sprintf("VirtualMachine Object list is empty") + klog.Errorf(msg+": %v", vmObjList) + return nil, fmt.Errorf(msg) + } + + for _, vmObj := range vmObjList { + vmRefs = append(vmRefs, vmObj.Reference()) + } + pc := property.DefaultCollector(dc.Client()) + err := pc.Retrieve(ctx, vmRefs, properties, &vmMoList) + if err != nil { + klog.Errorf("Failed to get VM managed objects from VM objects. vmObjList: %+v, properties: %+v, err: %v", vmObjList, properties, err) + return nil, err + } + return vmMoList, nil +} + +// GetAllDatastores gets the datastore URL to DatastoreInfo map for all the datastores in +// the datacenter. +func (dc *Datacenter) GetAllDatastores(ctx context.Context) (map[string]*DatastoreInfo, error) { + finder := find.NewFinder(dc.Client(), false) + finder.SetDatacenter(dc.Datacenter) + datastores, err := finder.DatastoreList(ctx, "*") + if err != nil { + klog.Errorf("Failed to get all the datastores in the Datacenter %s with error: %v", dc.Datacenter.String(), err) + return nil, err + } + var dsList []types.ManagedObjectReference + for _, ds := range datastores { + dsList = append(dsList, ds.Reference()) + } + var dsMoList []mo.Datastore + pc := property.DefaultCollector(dc.Client()) + properties := []string{"info"} + err = pc.Retrieve(ctx, dsList, properties, &dsMoList) + if err != nil { + klog.Errorf("Failed to get datastore managed objects from datastore objects %v with properties %v: %v", dsList, properties, err) + return nil, err + } + dsURLInfoMap := make(map[string]*DatastoreInfo) + for _, dsMo := range dsMoList { + dsURLInfoMap[dsMo.Info.GetDatastoreInfo().Url] = &DatastoreInfo{ + &Datastore{object.NewDatastore(dc.Client(), dsMo.Reference()), + dc}, + dsMo.Info.GetDatastoreInfo()} + } + return dsURLInfoMap, nil +} diff --git a/pkg/common/cns-lib/vsphere/datastore.go b/pkg/common/cns-lib/vsphere/datastore.go new file mode 100644 index 0000000000..1f15016c5a --- /dev/null +++ b/pkg/common/cns-lib/vsphere/datastore.go @@ -0,0 +1,58 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vsphere + +import ( + "context" + "fmt" + + "github.com/vmware/govmomi/object" + "github.com/vmware/govmomi/property" + "github.com/vmware/govmomi/vim25/mo" + "github.com/vmware/govmomi/vim25/types" + "k8s.io/klog" +) + +// Datastore holds Datastore and Datacenter information. +type Datastore struct { + // Datastore represents the govmomi Datastore instance. + *object.Datastore + // Datacenter represents the datacenter on which the Datastore resides. + Datacenter *Datacenter +} + +// DatastoreInfo is a structure to store the Datastore and it's Info. +type DatastoreInfo struct { + *Datastore + Info *types.DatastoreInfo +} + +func (di DatastoreInfo) String() string { + return fmt.Sprintf("Datastore: %+v, datastore URL: %s", di.Datastore, di.Info.Url) +} + +// GetDatastoreURL returns the URL of datastore +func (ds *Datastore) GetDatastoreURL(ctx context.Context) (string, error) { + var dsMo mo.Datastore + pc := property.DefaultCollector(ds.Client()) + err := pc.RetrieveOne(ctx, ds.Datastore.Reference(), []string{"summary"}, &dsMo) + if err != nil { + klog.Errorf("Failed to retrieve datastore summary property: %v", err) + return "", err + } + return dsMo.Summary.Url, nil +} diff --git a/pkg/common/cns-lib/vsphere/hostsystem.go b/pkg/common/cns-lib/vsphere/hostsystem.go new file mode 100644 index 0000000000..211175569c --- /dev/null +++ b/pkg/common/cns-lib/vsphere/hostsystem.go @@ -0,0 +1,64 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vsphere + +import ( + "context" + + "github.com/vmware/govmomi/object" + "github.com/vmware/govmomi/property" + "github.com/vmware/govmomi/vim25/mo" + "github.com/vmware/govmomi/vim25/types" + "k8s.io/klog" +) + +// HostSystem holds details of a host instance. +type HostSystem struct { + // HostSystem represents the host system. + *object.HostSystem +} + +// GetAllAccessibleDatastores gets the list of accessible datastores for the given host +func (host *HostSystem) GetAllAccessibleDatastores(ctx context.Context) ([]*DatastoreInfo, error) { + var hostSystemMo mo.HostSystem + s := object.NewSearchIndex(host.Client()) + err := s.Properties(ctx, host.Reference(), []string{"datastore"}, &hostSystemMo) + if err != nil { + klog.Errorf("Failed to retrieve datastores for host %v with err: %v", host, err) + return nil, err + } + var dsRefList []types.ManagedObjectReference + dsRefList = append(dsRefList, hostSystemMo.Datastore...) + + var dsMoList []mo.Datastore + pc := property.DefaultCollector(host.Client()) + properties := []string{"info"} + err = pc.Retrieve(ctx, dsRefList, properties, &dsMoList) + if err != nil { + klog.Errorf("Failed to get datastore managed objects from datastore objects %v with properties %v: %v", dsRefList, properties, err) + return nil, err + } + var dsObjList []*DatastoreInfo + for _, dsMo := range dsMoList { + dsObjList = append(dsObjList, + &DatastoreInfo{ + &Datastore{object.NewDatastore(host.Client(), dsMo.Reference()), + nil}, + dsMo.Info.GetDatastoreInfo()}) + } + return dsObjList, nil +} diff --git a/pkg/common/cns-lib/vsphere/pbm.go b/pkg/common/cns-lib/vsphere/pbm.go new file mode 100644 index 0000000000..fb3ac3546d --- /dev/null +++ b/pkg/common/cns-lib/vsphere/pbm.go @@ -0,0 +1,60 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vsphere + +import ( + "context" + + "github.com/vmware/govmomi/pbm" + "k8s.io/klog" +) + +// ConnectPbm creates a PBM client for the virtual center. +func (vc *VirtualCenter) ConnectPbm(ctx context.Context) error { + var err = vc.Connect(ctx) + if err != nil { + klog.Errorf("Failed to connect to Virtual Center %q with err: %v", vc.Config.Host, err) + return err + } + if vc.PbmClient == nil { + if vc.PbmClient, err = pbm.NewClient(ctx, vc.Client.Client); err != nil { + klog.Errorf("Failed to create pbm client with err: %v", err) + return err + } + } + return nil +} + +// DisconnectPbm destroys the PBM client for the virtual center. +func (vc *VirtualCenter) DisconnectPbm(ctx context.Context) error { + if vc.PbmClient == nil { + klog.V(1).Info("PbmClient wasn't connected, ignoring") + } else { + vc.PbmClient = nil + } + return nil +} + +// GetStoragePolicyIDByName gets storage policy ID by name. +func (vc *VirtualCenter) GetStoragePolicyIDByName(ctx context.Context, storagePolicyName string) (string, error) { + storagePolicyID, err := vc.PbmClient.ProfileIDByName(ctx, storagePolicyName) + if err != nil { + klog.Errorf("Failed to get StoragePolicyID from StoragePolicyName %s with err: %v", storagePolicyName, err) + return "", err + } + return storagePolicyID, nil +} diff --git a/pkg/common/cns-lib/vsphere/utils.go b/pkg/common/cns-lib/vsphere/utils.go new file mode 100644 index 0000000000..ead4ed4a27 --- /dev/null +++ b/pkg/common/cns-lib/vsphere/utils.go @@ -0,0 +1,163 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vsphere + +import ( + "context" + "crypto/tls" + "encoding/pem" + "errors" + "fmt" + "reflect" + "strconv" + "strings" + + cnstypes "github.com/vmware/govmomi/cns/types" + "github.com/vmware/govmomi/sts" + "github.com/vmware/govmomi/vim25" + "github.com/vmware/govmomi/vim25/soap" + "github.com/vmware/govmomi/vim25/types" + + "sigs.k8s.io/vsphere-csi-driver/pkg/common/config" +) + +// IsInvalidCredentialsError returns true if error is of type InvalidLogin +func IsInvalidCredentialsError(err error) bool { + isInvalidCredentialsError := false + if soap.IsSoapFault(err) { + _, isInvalidCredentialsError = soap.ToSoapFault(err).VimFault().(types.InvalidLogin) + } + return isInvalidCredentialsError +} + +// GetCnsKubernetesEntityMetaData creates a CnsKubernetesEntityMetadataObject object from given parameters +func GetCnsKubernetesEntityMetaData(entityName string, labels map[string]string, deleteFlag bool, entityType string, namespace string) *cnstypes.CnsKubernetesEntityMetadata { + // Create new metadata spec + var newLabels []types.KeyValue + for labelKey, labelVal := range labels { + newLabels = append(newLabels, types.KeyValue{ + Key: labelKey, + Value: labelVal, + }) + } + + entityMetadata := &cnstypes.CnsKubernetesEntityMetadata{} + entityMetadata.EntityName = entityName + entityMetadata.Delete = deleteFlag + if labels != nil { + entityMetadata.Labels = newLabels + } + entityMetadata.EntityType = entityType + entityMetadata.Namespace = namespace + return entityMetadata +} + +// GetContainerCluster creates ContainerCluster object from given parameters +func GetContainerCluster(clusterid string, username string) cnstypes.CnsContainerCluster { + return cnstypes.CnsContainerCluster{ + ClusterType: string(cnstypes.CnsClusterTypeKubernetes), + ClusterId: clusterid, + VSphereUser: username, + } + +} + +// GetVirtualCenterConfig returns VirtualCenterConfig Object created using vSphere Configuration +// specified in the argurment. +func GetVirtualCenterConfig(cfg *config.Config) (*VirtualCenterConfig, error) { + var err error + vCenterIPs, err := GetVcenterIPs(cfg) // make([]string, 0) + if err != nil { + return nil, err + } + host := vCenterIPs[0] + port, err := strconv.Atoi(cfg.VirtualCenter[host].VCenterPort) + if err != nil { + return nil, err + } + vcConfig := &VirtualCenterConfig{ + Host: host, + Port: port, + Username: cfg.VirtualCenter[host].User, + Password: cfg.VirtualCenter[host].Password, + Insecure: cfg.VirtualCenter[host].InsecureFlag, + DatacenterPaths: strings.Split(cfg.VirtualCenter[host].Datacenters, ","), + } + for idx := range vcConfig.DatacenterPaths { + vcConfig.DatacenterPaths[idx] = strings.TrimSpace(vcConfig.DatacenterPaths[idx]) + } + return vcConfig, nil +} + +// GetVcenterIPs returns list of vCenter IPs from VSphereConfig +func GetVcenterIPs(cfg *config.Config) ([]string, error) { + var err error + vCenterIPs := make([]string, 0) + for key := range cfg.VirtualCenter { + vCenterIPs = append(vCenterIPs, key) + } + if len(vCenterIPs) == 0 { + err = errors.New("Unable get vCenter Hosts from VSphereConfig") + } + return vCenterIPs, err +} + +// GetLabelsMapFromKeyValue creates a map object from given parameter +func GetLabelsMapFromKeyValue(labels []types.KeyValue) map[string]string { + labelsMap := make(map[string]string) + for _, label := range labels { + labelsMap[label.Key] = label.Value + } + return labelsMap +} + +// CompareKubernetesMetadata compares the whole cnskubernetesEntityMetadata from two given parameters +func CompareKubernetesMetadata(pvMetaData *cnstypes.CnsKubernetesEntityMetadata, cnsMetaData *cnstypes.CnsKubernetesEntityMetadata) bool { + if (pvMetaData.EntityName != cnsMetaData.EntityName) || (pvMetaData.Delete != cnsMetaData.Delete) || (pvMetaData.Namespace != cnsMetaData.Namespace) { + return false + } + labelsMatch := reflect.DeepEqual(GetLabelsMapFromKeyValue(pvMetaData.Labels), GetLabelsMapFromKeyValue(cnsMetaData.Labels)) + if !labelsMatch { + return false + } + return true +} + +// Signer decodes the certificate and private key and returns SAML token needed for authentication +func signer(ctx context.Context, client *vim25.Client, username string, password string) (*sts.Signer, error) { + pemBlock, _ := pem.Decode([]byte(username)) + if pemBlock == nil { + return nil, nil + } + certificate, err := tls.X509KeyPair([]byte(username), []byte(password)) + if err != nil { + return nil, fmt.Errorf("Failed to load X509 key pair. Error: %+v", err) + } + tokens, err := sts.NewClient(ctx, client) + if err != nil { + return nil, fmt.Errorf("Failed to create STS client. err: %+v", err) + } + req := sts.TokenRequest{ + Certificate: &certificate, + Delegatable: true, + } + signer, err := tokens.Issue(ctx, req) + if err != nil { + return nil, fmt.Errorf("Failed to issue SAML token. err: %+v", err) + } + return signer, nil +} diff --git a/pkg/common/cns-lib/vsphere/virtualcenter.go b/pkg/common/cns-lib/vsphere/virtualcenter.go new file mode 100644 index 0000000000..14aabecf45 --- /dev/null +++ b/pkg/common/cns-lib/vsphere/virtualcenter.go @@ -0,0 +1,353 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vsphere + +import ( + "context" + "crypto/tls" + "encoding/pem" + "fmt" + "net" + neturl "net/url" + "strconv" + "sync" + + csictx "github.com/rexray/gocsi/context" + "github.com/vmware/govmomi" + "github.com/vmware/govmomi/cns" + "github.com/vmware/govmomi/find" + "github.com/vmware/govmomi/object" + "github.com/vmware/govmomi/pbm" + "github.com/vmware/govmomi/session" + "github.com/vmware/govmomi/sts" + "github.com/vmware/govmomi/vim25" + "github.com/vmware/govmomi/vim25/mo" + "github.com/vmware/govmomi/vim25/soap" + "github.com/vmware/govmomi/vim25/types" + "k8s.io/klog" + + cnsconfig "sigs.k8s.io/vsphere-csi-driver/pkg/common/config" +) + +const ( + // DefaultScheme is the default connection scheme. + DefaultScheme = "https" + // DefaultRoundTripperCount is the default SOAP round tripper count. + DefaultRoundTripperCount = 3 +) + +// VirtualCenter holds details of a virtual center instance. +type VirtualCenter struct { + // Config represents the virtual center configuration. + Config *VirtualCenterConfig + // Client represents the govmomi client instance for the connection. + Client *govmomi.Client + // PbmClient represents the govmomi PBM Client instance. + PbmClient *pbm.Client + // CnsClient represents the CNS client instance. + CnsClient *cns.Client + credentialsLock sync.Mutex +} + +func (vc *VirtualCenter) String() string { + return fmt.Sprintf("VirtualCenter [Config: %v, Client: %v, PbmClient: %v]", + vc.Config, vc.Client, vc.PbmClient) +} + +// VirtualCenterConfig represents virtual center configuration. +type VirtualCenterConfig struct { + // Scheme represents the connection scheme. (Ex: https) + Scheme string + // Host represents the virtual center host address. + Host string + // Port represents the virtual center host port. + Port int + // Username represents the virtual center username. + Username string + // Password represents the virtual center password in clear text. + Password string + // Insecure tells if an insecure connection is allowed. + Insecure bool + // RoundTripperCount is the SOAP round tripper count. (retries = RoundTripperCount - 1) + RoundTripperCount int + // DatacenterPaths represents paths of datacenters on the virtual center. + DatacenterPaths []string +} + +func (vcc *VirtualCenterConfig) String() string { + return fmt.Sprintf("VirtualCenterConfig [Scheme: %v, Host: %v, Port: %v, "+ + "Username: %v, Password: %v, Insecure: %v, RoundTripperCount: %v, "+ + "DatacenterPaths: %v]", vcc.Scheme, vcc.Host, vcc.Port, vcc.Username, + vcc.Password, vcc.Insecure, vcc.RoundTripperCount, vcc.DatacenterPaths) +} + +// clientMutex is used for exclusive connection creation. +var clientMutex sync.Mutex + +// newClient creates a new govmomi Client instance. +func (vc *VirtualCenter) newClient(ctx context.Context) (*govmomi.Client, error) { + if vc.Config.Scheme == "" { + vc.Config.Scheme = DefaultScheme + } + + url, err := soap.ParseURL(net.JoinHostPort(vc.Config.Host, strconv.Itoa(vc.Config.Port))) + if err != nil { + klog.Errorf("Failed to parse URL %s with err: %v", url, err) + return nil, err + } + + soapClient := soap.NewClient(url, vc.Config.Insecure) + vimClient, err := vim25.NewClient(ctx, soapClient) + if err != nil { + klog.Errorf("Failed to create new client with err: %v", err) + return nil, err + } + + vimClient.UserAgent = "k8s-csi-useragent" + + client := &govmomi.Client{ + Client: vimClient, + SessionManager: session.NewManager(vimClient), + } + + err = vc.login(ctx, client) + if err != nil { + return nil, err + } + + s, err := client.SessionManager.UserSession(ctx) + if err == nil { + klog.V(4).Infof("New session ID for '%s' = %s", s.UserName, s.Key) + } + + if vc.Config.RoundTripperCount == 0 { + vc.Config.RoundTripperCount = DefaultRoundTripperCount + } + client.RoundTripper = vim25.Retry(client.RoundTripper, vim25.TemporaryNetworkError(vc.Config.RoundTripperCount)) + return client, nil +} + +// login calls SessionManager.LoginByToken if certificate and private key are configured, +// otherwise calls SessionManager.Login with user and password. +func (vc *VirtualCenter) login(ctx context.Context, client *govmomi.Client) error { + var err error + vc.credentialsLock.Lock() + defer vc.credentialsLock.Unlock() + + b, _ := pem.Decode([]byte(vc.Config.Username)) + if b == nil { + return client.SessionManager.Login(ctx, neturl.UserPassword(vc.Config.Username, vc.Config.Password)) + } + + cert, err := tls.X509KeyPair([]byte(vc.Config.Username), []byte(vc.Config.Password)) + if err != nil { + klog.Errorf("Failed to load X509 key pair with err: %v", err) + return err + } + + tokens, err := sts.NewClient(ctx, client.Client) + if err != nil { + klog.Errorf("Failed to create STS client with err: %v", err) + return err + } + + req := sts.TokenRequest{ + Certificate: &cert, + } + + signer, err := tokens.Issue(ctx, req) + if err != nil { + klog.Errorf("Failed to issue SAML token with err: %v", err) + return err + } + + header := soap.Header{Security: signer} + return client.SessionManager.LoginByToken(client.Client.WithHeader(ctx, header)) +} + +// Connect establishes connection with vSphere with existing credentials if session doesn't exist. +// If credentials are invalid then it fetches latest credential from credential store and connects with it. +func (vc *VirtualCenter) Connect(ctx context.Context) error { + err := vc.connect(ctx) + if err == nil { + return nil + } + if !IsInvalidCredentialsError(err) { + klog.Errorf("Cannot connect to vCenter with err: %v", err) + return err + } + klog.V(2).Infof("Invalid credentials. Cannot connect to server %q. "+ + "Fetching credentials from secret.", vc.Config.Host) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cfgPath := csictx.Getenv(ctx, cnsconfig.EnvCloudConfig) + if cfgPath == "" { + cfgPath = cnsconfig.DefaultCloudConfigPath + } + + cfg, err := cnsconfig.GetCnsconfig(cfgPath) + if err != nil { + klog.Errorf("Failed to read config with err: %v", err) + return err + } + vcenterconfig, err := GetVirtualCenterConfig(cfg) + if err != nil { + klog.Errorf("Failed to get VirtualCenterConfig. err=%v", err) + return err + } + vc.UpdateCredentials(vcenterconfig.Username, vcenterconfig.Password) + return vc.connect(ctx) +} + +// connect creates a connection to the virtual center host. +func (vc *VirtualCenter) connect(ctx context.Context) error { + clientMutex.Lock() + defer clientMutex.Unlock() + + // If client was never initialized, initialize one. + var err error + if vc.Client == nil { + if vc.Client, err = vc.newClient(ctx); err != nil { + klog.Errorf("Failed to create govmomi client with err: %v", err) + return err + } + return nil + } + + // If session hasn't expired, nothing to do. + sessionMgr := session.NewManager(vc.Client.Client) + // SessionMgr.UserSession(ctx) retrieves and returns the SessionManager's CurrentSession field + // Nil is returned if the session is not authenticated or timed out. + if userSession, err := sessionMgr.UserSession(ctx); err != nil { + klog.Errorf("Failed to obtain user session with err: %v", err) + return err + } else if userSession != nil { + return nil + } + // If session has expired, create a new instance. + klog.Warning("Creating a new client session as the existing session isn't valid or not authenticated") + if vc.Client, err = vc.newClient(ctx); err != nil { + klog.Errorf("Failed to create govmomi client with err: %v", err) + return err + } + // Recreate PbmClient If created using timed out VC Client + if vc.PbmClient != nil { + if vc.PbmClient, err = pbm.NewClient(ctx, vc.Client.Client); err != nil { + klog.Errorf("Failed to create pbm client with err: %v", err) + return err + } + } + // Recreate CNSClient If created using timed out VC Client + if vc.CnsClient != nil { + if vc.CnsClient, err = NewCNSClient(ctx, vc.Client.Client); err != nil { + klog.Errorf("Failed to create CNS client on vCenter host %v with err: %v", vc.Config.Host, err) + return err + } + } + return nil +} + +// listDatacenters returns all Datacenters. +func (vc *VirtualCenter) listDatacenters(ctx context.Context) ([]*Datacenter, error) { + finder := find.NewFinder(vc.Client.Client, false) + dcList, err := finder.DatacenterList(ctx, "*") + if err != nil { + klog.Errorf("Failed to list datacenters with err: %v", err) + return nil, err + } + + var dcs []*Datacenter + for _, dcObj := range dcList { + dc := &Datacenter{Datacenter: dcObj, VirtualCenterHost: vc.Config.Host} + dcs = append(dcs, dc) + } + return dcs, nil +} + +// getDatacenters returns Datacenter instances given their paths. +func (vc *VirtualCenter) getDatacenters(ctx context.Context, dcPaths []string) ([]*Datacenter, error) { + finder := find.NewFinder(vc.Client.Client, false) + var dcs []*Datacenter + for _, dcPath := range dcPaths { + dcObj, err := finder.Datacenter(ctx, dcPath) + if err != nil { + klog.Errorf("Failed to fetch datacenter given dcPath %s with err: %v", dcPath, err) + return nil, err + } + dc := &Datacenter{Datacenter: dcObj, VirtualCenterHost: vc.Config.Host} + dcs = append(dcs, dc) + } + return dcs, nil +} + +// GetDatacenters returns Datacenters found on the VirtualCenter. If no +// datacenters are mentioned in the VirtualCenterConfig during registration, all +// Datacenters for the given VirtualCenter will be returned. If DatacenterPaths +// is configured in VirtualCenterConfig during registration, only the listed +// Datacenters are returned. +func (vc *VirtualCenter) GetDatacenters(ctx context.Context) ([]*Datacenter, error) { + if len(vc.Config.DatacenterPaths) != 0 { + return vc.getDatacenters(ctx, vc.Config.DatacenterPaths) + } + return vc.listDatacenters(ctx) +} + +// Disconnect disconnects the virtual center host connection if connected. +func (vc *VirtualCenter) Disconnect(ctx context.Context) error { + if vc.Client == nil { + klog.V(1).Info("Client wasn't connected, ignoring") + return nil + } + if err := vc.Client.Logout(ctx); err != nil { + klog.Errorf("Failed to logout with err: %v", err) + return err + } + vc.Client = nil + return nil +} + +// UpdateCredentials updates username and password in the VirtualCenterConfig object +func (vc *VirtualCenter) UpdateCredentials(username, password string) { + vc.credentialsLock.Lock() + defer vc.credentialsLock.Unlock() + vc.Config.Username = username + vc.Config.Password = password +} + +// GetHostsByCluster return hosts inside the cluster using cluster moref. +func (vc *VirtualCenter) GetHostsByCluster(ctx context.Context, clusterMorefValue string) ([]*HostSystem, error) { + clusterMoref := types.ManagedObjectReference{ + Type: "ClusterComputeResource", + Value: clusterMorefValue, + } + clusterComputeResourceMo := mo.ClusterComputeResource{} + err := vc.Client.RetrieveOne(ctx, clusterMoref, []string{"host"}, &clusterComputeResourceMo) + if err != nil { + klog.Errorf("Failed to fetch hosts from cluster given clusterMorefValue %s with err: %v", clusterMorefValue, err) + return nil, err + } + var hostObjList []*HostSystem + for _, hostMoref := range clusterComputeResourceMo.Host { + hostObjList = append(hostObjList, + &HostSystem{ + HostSystem: object.NewHostSystem(vc.Client.Client, hostMoref), + }) + } + return hostObjList, nil +} diff --git a/pkg/common/cns-lib/vsphere/virtualcentermanager.go b/pkg/common/cns-lib/vsphere/virtualcentermanager.go new file mode 100644 index 0000000000..03d8ddee53 --- /dev/null +++ b/pkg/common/cns-lib/vsphere/virtualcentermanager.go @@ -0,0 +1,141 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vsphere + +import ( + "context" + "errors" + "sync" + + "k8s.io/klog" +) + +var ( + // ErrVCAlreadyRegistered is returned when registration for a previously + // registered virtual center is attempted. + ErrVCAlreadyRegistered = errors.New("virtual center was already registered") + // ErrVCNotFound is returned when a virtual center instance isn't found. + ErrVCNotFound = errors.New("virtual center wasn't found in registry") +) + +// VirtualCenterManager provides functionality to manage virtual centers. +type VirtualCenterManager interface { + // GetVirtualCenter returns the VirtualCenter instance given the host. + GetVirtualCenter(host string) (*VirtualCenter, error) + // GetAllVirtualCenters returns all VirtualCenter instances. If virtual + // centers are added or removed concurrently, they may or may not be + // reflected in the result of a call to this method. + GetAllVirtualCenters() []*VirtualCenter + // RegisterVirtualCenter registers a virtual center, but doesn't initiate + // the connection to the host. + RegisterVirtualCenter(config *VirtualCenterConfig) (*VirtualCenter, error) + // UnregisterVirtualCenter disconnects and unregisters the virtual center + // given it's host. + UnregisterVirtualCenter(host string) error + // UnregisterAllVirtualCenters disconnects and unregisters all virtual centers. + UnregisterAllVirtualCenters() error +} + +var ( + // vcManagerInst is a VirtualCenterManager singleton. + vcManagerInst *defaultVirtualCenterManager + // onceForVCManager is used for initializing the VirtualCenterManager singleton. + onceForVCManager sync.Once +) + +// GetVirtualCenterManager returns the VirtualCenterManager singleton. +func GetVirtualCenterManager() VirtualCenterManager { + onceForVCManager.Do(func() { + klog.V(1).Info("Initializing defaultVirtualCenterManager...") + vcManagerInst = &defaultVirtualCenterManager{virtualCenters: sync.Map{}} + klog.V(1).Info("Successfully initialized defaultVirtualCenterManager") + }) + return vcManagerInst +} + +// defaultVirtualCenterManager holds virtual center information and provides +// functionality around it. +type defaultVirtualCenterManager struct { + // virtualCenters map hosts to *VirtualCenter instances. + virtualCenters sync.Map +} + +func (m *defaultVirtualCenterManager) GetVirtualCenter(host string) (*VirtualCenter, error) { + if vc, exists := m.virtualCenters.Load(host); exists { + return vc.(*VirtualCenter), nil + } + klog.Errorf("Couldn't find VC %s in registry", host) + return nil, ErrVCNotFound +} + +func (m *defaultVirtualCenterManager) GetAllVirtualCenters() []*VirtualCenter { + var vcs []*VirtualCenter + m.virtualCenters.Range(func(_, vcInf interface{}) bool { + // If an entry was concurrently deleted from virtualCenters, Range could + // possibly return a nil value for that key. + // See https://golang.org/pkg/sync/#Map.Range for more info. + if vcInf != nil { + vcs = append(vcs, vcInf.(*VirtualCenter)) + } + return true + }) + return vcs +} + +func (m *defaultVirtualCenterManager) RegisterVirtualCenter(config *VirtualCenterConfig) (*VirtualCenter, error) { + if _, exists := m.virtualCenters.Load(config.Host); exists { + klog.Errorf("VC was already found in registry, failed to register with config %v", config) + return nil, ErrVCAlreadyRegistered + } + + vc := &VirtualCenter{Config: config} // Note that the Client isn't initialized here. + m.virtualCenters.Store(config.Host, vc) + klog.V(1).Infof("Successfully registered VC %q", vc.Config.Host) + return vc, nil +} + +func (m *defaultVirtualCenterManager) UnregisterVirtualCenter(host string) error { + vc, err := m.GetVirtualCenter(host) + if err != nil { + klog.Errorf("Failed to find VC %s, couldn't unregister", host) + return err + } + if err := vc.DisconnectPbm(context.Background()); err != nil { + klog.Errorf("Failed to disconnect VC pbm %s, couldn't unregister", host) + return err + } + if err := vc.Disconnect(context.Background()); err != nil { + klog.Errorf("Failed to disconnect VC %s, couldn't unregister", host) + return err + } + + m.virtualCenters.Delete(host) + klog.V(2).Infof("Successfully unregistered VC %s", host) + return nil +} + +func (m *defaultVirtualCenterManager) UnregisterAllVirtualCenters() error { + var err error + m.virtualCenters.Range(func(hostInf, _ interface{}) bool { + if err = m.UnregisterVirtualCenter(hostInf.(string)); err != nil { + klog.Errorf("Failed to unregister VC %v", hostInf) + return false + } + return true + }) + return err +} diff --git a/pkg/common/cns-lib/vsphere/virtualmachine.go b/pkg/common/cns-lib/vsphere/virtualmachine.go new file mode 100644 index 0000000000..fdc446eec6 --- /dev/null +++ b/pkg/common/cns-lib/vsphere/virtualmachine.go @@ -0,0 +1,353 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vsphere + +import ( + "context" + "errors" + "fmt" + "net/url" + "sync" + + "github.com/vmware/govmomi/object" + "github.com/vmware/govmomi/vapi/rest" + "github.com/vmware/govmomi/vapi/tags" + "github.com/vmware/govmomi/vim25/mo" + "github.com/vmware/govmomi/vim25/types" + "k8s.io/klog" +) + +// ErrVMNotFound is returned when a virtual machine isn't found. +var ErrVMNotFound = errors.New("virtual machine wasn't found") + +// VirtualMachine holds details of a virtual machine instance. +type VirtualMachine struct { + // VirtualCenterHost represents the virtual machine's vCenter host. + VirtualCenterHost string + // UUID represents the virtual machine's UUID. + UUID string + // VirtualMachine represents the virtual machine. + *object.VirtualMachine + // Datacenter represents the datacenter to which the virtual machine belongs. + Datacenter *Datacenter +} + +func (vm *VirtualMachine) String() string { + return fmt.Sprintf("%v [VirtualCenterHost: %v, UUID: %v, Datacenter: %v]", + vm.VirtualMachine, vm.VirtualCenterHost, vm.UUID, vm.Datacenter) +} + +// IsActive returns true if Virtual Machine is powered on, else returns false. +func (vm *VirtualMachine) IsActive(ctx context.Context) (bool, error) { + vmMoList, err := vm.Datacenter.GetVMMoList(ctx, []*VirtualMachine{vm}, []string{"summary"}) + if err != nil { + klog.Errorf("Failed to get VM Managed object with property summary. err: +%v", err) + return false, err + } + if vmMoList[0].Summary.Runtime.PowerState == types.VirtualMachinePowerStatePoweredOn { + return true, nil + } + return false, nil +} + +// renew renews the virtual machine and datacenter objects given its virtual center. +func (vm *VirtualMachine) renew(vc *VirtualCenter) { + vm.VirtualMachine = object.NewVirtualMachine(vc.Client.Client, vm.VirtualMachine.Reference()) + vm.Datacenter.Datacenter = object.NewDatacenter(vc.Client.Client, vm.Datacenter.Reference()) +} + +// GetAllAccessibleDatastores gets the list of accessible Datastores for the given Virtual Machine +func (vm *VirtualMachine) GetAllAccessibleDatastores(ctx context.Context) ([]*DatastoreInfo, error) { + host, err := vm.HostSystem(ctx) + if err != nil { + klog.Errorf("Failed to get host system for VM %v with err: %v", vm.InventoryPath, err) + return nil, err + } + hostObj := &HostSystem{ + HostSystem: object.NewHostSystem(vm.Client(), host.Reference()), + } + return hostObj.GetAllAccessibleDatastores(ctx) +} + +// Renew renews the virtual machine and datacenter information. If reconnect is +// set to true, the virtual center connection is also renewed. +func (vm *VirtualMachine) Renew(reconnect bool) error { + vc, err := GetVirtualCenterManager().GetVirtualCenter(vm.VirtualCenterHost) + if err != nil { + klog.Errorf("Failed to get VC while renewing VM %v with err: %v", vm, err) + return err + } + + if reconnect { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := vc.Connect(ctx); err != nil { + klog.Errorf("Failed reconnecting to VC %q while renewing VM %v with err: %v", vc.Config.Host, vm, err) + return err + } + } + + vm.renew(vc) + return nil +} + +const ( + // poolSize is the number of goroutines to run while trying to find a + // virtual machine. + poolSize = 8 + // dcBufferSize is the buffer size for the channel that is used to + // asynchronously receive *Datacenter instances. + dcBufferSize = poolSize * 10 +) + +// GetVirtualMachineByUUID returns virtual machine given its UUID in entire VC. +// If instanceUuid is set to true, then UUID is an instance UUID. +// In this case, this function searches for virtual machines whose instance UUID matches the given uuid. +// If instanceUuid is set to false, then UUID is BIOS UUID. +// In this case, this function searches for virtual machines whose BIOS UUID matches the given uuid. +func GetVirtualMachineByUUID(uuid string, instanceUUID bool) (*VirtualMachine, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + klog.V(2).Infof("Initiating asynchronous datacenter listing with uuid %s", uuid) + dcsChan, errChan := AsyncGetAllDatacenters(ctx, dcBufferSize) + + var wg sync.WaitGroup + var nodeVM *VirtualMachine + var poolErr error + + for i := 0; i < poolSize; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case err, ok := <-errChan: + if !ok { + // Async function finished. + klog.V(2).Infof("AsyncGetAllDatacenters finished with uuid %s", uuid) + return + } else if err == context.Canceled { + // Canceled by another instance of this goroutine. + klog.V(2).Infof("AsyncGetAllDatacenters ctx was canceled with uuid %s", uuid) + return + } else { + // Some error occurred. + klog.Errorf("AsyncGetAllDatacenters with uuid %s sent an error: %v", uuid, err) + poolErr = err + return + } + + case dc, ok := <-dcsChan: + if !ok { + // Async function finished. + klog.V(2).Infof("AsyncGetAllDatacenters finished with uuid %s", uuid) + return + } + + // Found some Datacenter object. + klog.V(2).Infof("AsyncGetAllDatacenters with uuid %s sent a dc %v", uuid, dc) + if vm, err := dc.GetVirtualMachineByUUID(context.Background(), uuid, instanceUUID); err != nil { + if err == ErrVMNotFound { + // Didn't find VM on this DC, so, continue searching on other DCs. + klog.V(2).Infof("Couldn't find VM given uuid %s on DC %v with err: %v, continuing search", uuid, dc, err) + continue + } else { + // Some serious error occurred, so stop the async function. + klog.Errorf("Failed finding VM given uuid %s on DC %v with err: %v, canceling context", uuid, dc, err) + cancel() + poolErr = err + return + } + } else { + // Virtual machine was found, so stop the async function. + klog.V(2).Infof("Found VM %v given uuid %s on DC %v, canceling context", vm, uuid, dc) + nodeVM = vm + cancel() + return + } + } + } + }() + } + wg.Wait() + + if nodeVM != nil { + klog.V(2).Infof("Returning VM %v for UUID %s", nodeVM, uuid) + return nodeVM, nil + } else if poolErr != nil { + klog.Errorf("Returning err: %v for UUID %s", poolErr, uuid) + return nil, poolErr + } else { + klog.Errorf("Returning VM not found err for UUID %s", uuid) + return nil, ErrVMNotFound + } +} + +// GetHostSystem returns HostSystem object of the virtual machine +func (vm *VirtualMachine) GetHostSystem(ctx context.Context) (*object.HostSystem, error) { + vmHost, err := vm.VirtualMachine.HostSystem(ctx) + if err != nil { + klog.Errorf("Failed to get host system for vm: %v. err: %+v", vm, err) + return nil, err + } + var oHost mo.HostSystem + err = vmHost.Properties(ctx, vmHost.Reference(), []string{"summary"}, &oHost) + if err != nil { + klog.Errorf("Failed to get host system properties. err: %+v", err) + return nil, err + } + klog.V(4).Infof("Host owning node vm: %v is %s", vm, oHost.Summary.Config.Name) + return vmHost, nil +} + +// GetTagManager returns tagManager using vm client +func (vm *VirtualMachine) GetTagManager(ctx context.Context) (*tags.Manager, error) { + restClient := rest.NewClient(vm.Client()) + virtualCenter, err := GetVirtualCenterManager().GetVirtualCenter(vm.VirtualCenterHost) + if err != nil { + klog.Errorf("Failed to get virtualCenter. Error: %v", err) + return nil, err + } + signer, err := signer(ctx, vm.Client(), virtualCenter.Config.Username, virtualCenter.Config.Password) + if err != nil { + klog.Errorf("Failed to create the Signer. Error: %v", err) + return nil, err + } + if signer == nil { + klog.V(3).Info("Using plain text username and password") + user := url.UserPassword(virtualCenter.Config.Username, virtualCenter.Config.Password) + err = restClient.Login(ctx, user) + } else { + klog.V(3).Info("Using certificate and private key") + err = restClient.LoginByToken(restClient.WithSigner(ctx, signer)) + } + if err != nil { + klog.Errorf("Failed to login for the rest client. Error: %v", err) + } + tagManager := tags.NewManager(restClient) + if tagManager == nil { + klog.Errorf("Failed to create a tagManager") + } + return tagManager, nil +} + +// GetAncestors returns ancestors of VM +// example result: "Folder", "Datacenter", "Cluster" +func (vm *VirtualMachine) GetAncestors(ctx context.Context) ([]mo.ManagedEntity, error) { + vmHost, err := vm.GetHostSystem(ctx) + if err != nil { + klog.Errorf("Failed to get host system for vm: %v. err: %+v", vm, err) + return nil, err + } + var objects []mo.ManagedEntity + pc := vm.Datacenter.Client().ServiceContent.PropertyCollector + // example result: ["Folder", "Datacenter", "Cluster"] + objects, err = mo.Ancestors(ctx, vm.Datacenter.Client(), pc, vmHost.Reference()) + if err != nil { + klog.Errorf("GetAncestors failed for %s with err %v", vmHost.Reference(), err) + return nil, err + } + klog.V(4).Infof("Ancestors of node vm: %v are : [%+v]", vm, objects) + return objects, nil +} + +// GetZoneRegion returns zone and region of the node vm +func (vm *VirtualMachine) GetZoneRegion(ctx context.Context, zoneCategoryName string, regionCategoryName string) (zone string, region string, err error) { + klog.V(4).Infof("GetZoneRegion: called with zoneCategoryName: %s, regionCategoryName: %s", zoneCategoryName, regionCategoryName) + tagManager, err := vm.GetTagManager(ctx) + if err != nil || tagManager == nil { + klog.Errorf("Failed to get tagManager. Error: %v", err) + return "", "", err + } + defer tagManager.Logout(ctx) + var objects []mo.ManagedEntity + objects, err = vm.GetAncestors(ctx) + if err != nil { + klog.Errorf("GetAncestors failed for %s with err %v", vm.Reference(), err) + return "", "", err + } + // search the hierarchy, example order: ["Host", "Cluster", "Datacenter", "Folder"] + for i := range objects { + obj := objects[len(objects)-1-i] + klog.V(4).Infof("Name: %s, Type: %s", obj.Self.Value, obj.Self.Type) + tags, err := tagManager.ListAttachedTags(ctx, obj) + if err != nil { + klog.Errorf("Cannot list attached tags. Err: %v", err) + return "", "", err + } + if len(tags) > 0 { + klog.V(4).Infof("Object [%v] has attached Tags [%v]", obj, tags) + } + for _, value := range tags { + tag, err := tagManager.GetTag(ctx, value) + if err != nil { + klog.Errorf("Failed to get tag:%s, error:%v", value, err) + return "", "", err + } + klog.V(4).Infof("Found tag: %s for object %v", tag.Name, obj) + category, err := tagManager.GetCategory(ctx, tag.CategoryID) + if err != nil { + klog.Errorf("Failed to get category for tag: %s, error: %v", tag.Name, tag) + return "", "", err + } + klog.V(4).Infof("Found category: %s for object %v with tag: %s", category.Name, obj, tag.Name) + + if category.Name == zoneCategoryName && zone == "" { + zone = tag.Name + } else if category.Name == regionCategoryName && region == "" { + region = tag.Name + } + if zone != "" && region != "" { + return zone, region, nil + } + } + } + return zone, region, err +} + +// IsInZoneRegion checks if virtual machine belongs to specified zone and region +// This function returns true if virtual machine belongs to specified zone/region, else returns false. +func (vm *VirtualMachine) IsInZoneRegion(ctx context.Context, zoneCategoryName string, regionCategoryName string, zoneValue string, regionValue string) (bool, error) { + klog.V(4).Infof("IsInZoneRegion: called with zoneCategoryName: %s, regionCategoryName: %s, zoneValue: %s, regionValue: %s", zoneCategoryName, regionCategoryName, zoneValue, regionValue) + tagManager, err := vm.GetTagManager(ctx) + if err != nil || tagManager == nil { + klog.Errorf("Failed to get tagManager. Error: %v", err) + return false, err + } + defer tagManager.Logout(ctx) + vmZone, vmRegion, err := vm.GetZoneRegion(ctx, zoneCategoryName, regionCategoryName) + if err != nil { + klog.Errorf("failed to get accessibleTopology for vm: %v, err: %v", vm.Reference(), err) + return false, err + } + if regionValue == "" && zoneValue != "" && vmZone == zoneValue { + // region is not specified, if zone matches with look up zone value, return true + klog.V(4).Infof("MoRef [%v] belongs to zone [%s]", vm.Reference(), zoneValue) + return true, nil + } + if zoneValue == "" && regionValue != "" && vmRegion == regionValue { + // zone is not specified, if region matches with look up region value, return true + klog.V(4).Infof("MoRef [%v] belongs to region [%s]", vm.Reference(), regionValue) + return true, nil + } + if vmZone != "" && vmRegion != "" && vmRegion == regionValue && vmZone == zoneValue { + klog.V(4).Infof("MoRef [%v] belongs to zone [%s] and region [%s]", vm.Reference(), zoneValue, regionValue) + return true, nil + } + return false, nil +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go new file mode 100644 index 0000000000..a1f98ea526 --- /dev/null +++ b/pkg/common/config/config.go @@ -0,0 +1,285 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "errors" + "fmt" + "io" + "os" + "strconv" + "strings" + + "gopkg.in/gcfg.v1" + "k8s.io/klog" +) + +const ( + // DefaultK8sServiceAccount is the default name of the Kubernetes + // service account for csi controller. + DefaultK8sServiceAccount string = "vsphere-csi-controller" + // DefaultVCenterPort is the default port used to access vCenter. + DefaultVCenterPort string = "443" + // DefaultCloudConfigPath is the default path of csi config file + DefaultCloudConfigPath = "/etc/cloud/csi-vsphere.conf" + // EnvCloudConfig contains the path to the CSI vSphere Config + EnvCloudConfig = "X_CSI_VSPHERE_CLOUD_CONFIG" +) + +// Errors +var ( + // ErrUsernameMissing is returned when the provided username is empty. + ErrUsernameMissing = errors.New("Username is missing") + + // ErrPasswordMissing is returned when the provided password is empty. + ErrPasswordMissing = errors.New("Password is missing") + + // ErrInvalidVCenterIP is returned when the provided vCenter IP address is + // missing from the provided configuration. + ErrInvalidVCenterIP = errors.New("vsphere.conf does not have the VirtualCenter IP address specified") + + // ErrMissingVCenter is returned when the provided configuration does not + // define any vCenters. + ErrMissingVCenter = errors.New("No Virtual Center hosts defined") +) + +func getEnvKeyValue(match string, partial bool) (string, string, error) { + for _, e := range os.Environ() { + pair := strings.Split(e, "=") + if len(pair) != 2 { + continue + } + + key := pair[0] + value := pair[1] + + if partial && strings.Contains(key, match) { + return key, value, nil + } + + if strings.Compare(key, match) == 0 { + return key, value, nil + } + } + + matchType := "match" + if partial { + matchType = "partial match" + } + + return "", "", fmt.Errorf("Failed to find %s with %s", matchType, match) +} + +// FromEnv initializes the provided configuratoin object with values +// obtained from environment variables. If an environment variable is set +// for a property that's already initialized, the environment variable's value +// takes precedence. +func FromEnv(cfg *Config) error { + if cfg == nil { + return fmt.Errorf("Config object cannot be nil") + } + //Init + if cfg.VirtualCenter == nil { + cfg.VirtualCenter = make(map[string]*VirtualCenterConfig) + } + + //Globals + if v := os.Getenv("VSPHERE_VCENTER"); v != "" { + cfg.Global.VCenterIP = v + } + if v := os.Getenv("VSPHERE_VCENTER_PORT"); v != "" { + cfg.Global.VCenterPort = v + } + if v := os.Getenv("VSPHERE_USER"); v != "" { + cfg.Global.User = v + } + if v := os.Getenv("VSPHERE_PASSWORD"); v != "" { + cfg.Global.Password = v + } + if v := os.Getenv("VSPHERE_DATACENTER"); v != "" { + cfg.Global.Datacenters = v + } + if v := os.Getenv("VSPHERE_INSECURE"); v != "" { + InsecureFlag, err := strconv.ParseBool(v) + if err != nil { + klog.Errorf("Failed to parse VSPHERE_INSECURE: %s", err) + } else { + cfg.Global.InsecureFlag = InsecureFlag + } + } + if v := os.Getenv("VSPHERE_LABEL_REGION"); v != "" { + cfg.Labels.Region = v + } + if v := os.Getenv("VSPHERE_LABEL_ZONE"); v != "" { + cfg.Labels.Zone = v + } + //Build VirtualCenter from ENVs + for _, e := range os.Environ() { + pair := strings.Split(e, "=") + + if len(pair) != 2 { + continue + } + + key := pair[0] + value := pair[1] + + if strings.HasPrefix(key, "VSPHERE_VCENTER_") && len(value) > 0 { + id := strings.TrimPrefix(key, "VSPHERE_VCENTER_") + vcenter := value + + _, username, errUsername := getEnvKeyValue("VCENTER_"+id+"_USERNAME", false) + if errUsername != nil { + username = cfg.Global.User + } + _, password, errPassword := getEnvKeyValue("VCENTER_"+id+"_PASSWORD", false) + if errPassword != nil { + password = cfg.Global.Password + } + _, port, errPort := getEnvKeyValue("VCENTER_"+id+"_PORT", false) + if errPort != nil { + port = cfg.Global.VCenterPort + } + insecureFlag := false + _, insecureTmp, errInsecure := getEnvKeyValue("VCENTER_"+id+"_INSECURE", false) + if errInsecure != nil { + insecureFlagTmp, errTmp := strconv.ParseBool(insecureTmp) + if errTmp == nil { + insecureFlag = insecureFlagTmp + } + } + _, datacenters, errDatacenters := getEnvKeyValue("VCENTER_"+id+"_DATACENTERS", false) + if errDatacenters != nil { + datacenters = cfg.Global.Datacenters + } + cfg.VirtualCenter[vcenter] = &VirtualCenterConfig{ + User: username, + Password: password, + VCenterPort: port, + InsecureFlag: insecureFlag, + Datacenters: datacenters, + } + } + } + if cfg.Global.VCenterIP != "" && cfg.VirtualCenter[cfg.Global.VCenterIP] == nil { + cfg.VirtualCenter[cfg.Global.VCenterIP] = &VirtualCenterConfig{ + User: cfg.Global.User, + Password: cfg.Global.Password, + VCenterPort: cfg.Global.VCenterPort, + InsecureFlag: cfg.Global.InsecureFlag, + Datacenters: cfg.Global.Datacenters, + } + } + err := validateConfig(cfg) + if err != nil { + return err + } + + return nil +} + +func validateConfig(cfg *Config) error { + //Fix default global values + if cfg.Global.VCenterPort == "" { + cfg.Global.VCenterPort = DefaultVCenterPort + } + // Must have at least one vCenter defined + if len(cfg.VirtualCenter) == 0 { + klog.Error(ErrMissingVCenter) + return ErrMissingVCenter + } + for vcServer, vcConfig := range cfg.VirtualCenter { + klog.V(4).Infof("Initializing vc server %s", vcServer) + if vcServer == "" { + klog.Error(ErrInvalidVCenterIP) + return ErrInvalidVCenterIP + } + + if vcConfig.User == "" { + vcConfig.User = cfg.Global.User + if vcConfig.User == "" { + klog.Errorf("vcConfig.User is empty for vc %s!", vcServer) + return ErrUsernameMissing + } + } + if vcConfig.Password == "" { + vcConfig.Password = cfg.Global.Password + if vcConfig.Password == "" { + klog.Errorf("vcConfig.Password is empty for vc %s!", vcServer) + return ErrPasswordMissing + } + } + if vcConfig.VCenterPort == "" { + vcConfig.VCenterPort = cfg.Global.VCenterPort + } + if vcConfig.Datacenters == "" { + if cfg.Global.Datacenters != "" { + vcConfig.Datacenters = cfg.Global.Datacenters + } + } + insecure := vcConfig.InsecureFlag + if !insecure { + insecure = cfg.Global.InsecureFlag + vcConfig.InsecureFlag = cfg.Global.InsecureFlag + } + } + return nil +} + +// ReadConfig parses vSphere cloud config file and stores it into VSphereConfig. +// Environment variables are also checked +func ReadConfig(config io.Reader) (*Config, error) { + if config == nil { + return nil, fmt.Errorf("no vSphere cloud provider config file given") + } + cfg := &Config{} + if err := gcfg.FatalOnly(gcfg.ReadInto(cfg, config)); err != nil { + return nil, err + } + // Env Vars should override config file entries if present + if err := FromEnv(cfg); err != nil { + return nil, err + } + return cfg, nil +} + +// GetCnsconfig returns Config from specified config file path +func GetCnsconfig(cfgPath string) (*Config, error) { + klog.V(4).Infof("GetCnsconfig called with cfgPath: %s", cfgPath) + var cfg *Config + //Read in the vsphere.conf if it exists + if _, err := os.Stat(cfgPath); os.IsNotExist(err) { + // config from Env var only + cfg = &Config{} + if err := FromEnv(cfg); err != nil { + klog.Errorf("Error reading vsphere.conf\n") + return cfg, err + } + } else { + config, err := os.Open(cfgPath) + if err != nil { + klog.Errorf("Failed to open %s. Err: %v", cfgPath, err) + return cfg, err + } + cfg, err = ReadConfig(config) + if err != nil { + klog.Errorf("Failed to parse config. Err: %v", err) + return cfg, err + } + } + return cfg, nil +} diff --git a/pkg/common/config/types.go b/pkg/common/config/types.go new file mode 100644 index 0000000000..4810363528 --- /dev/null +++ b/pkg/common/config/types.go @@ -0,0 +1,61 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +// Config is used to read and store information from the cloud configuration file +type Config struct { + Global struct { + //vCenter IP address or FQDN + VCenterIP string + // Kubernetes Cluster ID + ClusterID string `gcfg:"cluster-id"` + // vCenter username. + User string `gcfg:"user"` + // vCenter password in clear text. + Password string `gcfg:"password"` + // vCenter port. + VCenterPort string `gcfg:"port"` + // True if vCenter uses self-signed cert. + InsecureFlag bool `gcfg:"insecure-flag"` + // Datacenter in which Node VMs are located. + Datacenters string `gcfg:"datacenters"` + } + + // Virtual Center configurations + VirtualCenter map[string]*VirtualCenterConfig + + // Tag categories and tags which correspond to "built-in node labels: zones and region" + Labels struct { + Zone string `gcfg:"zone"` + Region string `gcfg:"region"` + } +} + +// VirtualCenterConfig contains information used to access a remote vCenter +// endpoint. +type VirtualCenterConfig struct { + // vCenter username. + User string `gcfg:"user"` + // vCenter password in clear text. + Password string `gcfg:"password"` + // vCenter port. + VCenterPort string `gcfg:"port"` + // True if vCenter uses self-signed cert. + InsecureFlag bool `gcfg:"insecure-flag"` + // Datacenter in which VMs are located. + Datacenters string `gcfg:"datacenters"` +} diff --git a/pkg/csi/service/common/common_controller_helper.go b/pkg/csi/service/common/common_controller_helper.go new file mode 100644 index 0000000000..86aad4c6fd --- /dev/null +++ b/pkg/csi/service/common/common_controller_helper.go @@ -0,0 +1,135 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import ( + "fmt" + "strconv" + "strings" + + "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/klog" +) + +// ValidateCreateVolumeRequest is the helper function to validate +// CreateVolumeRequest for all block controllers. +// Function returns error if validation fails otherwise returns nil. +func ValidateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { + // Volume Name + volName := req.GetName() + if len(volName) == 0 { + msg := "Volume name is a required parameter." + klog.Error(msg) + return status.Error(codes.InvalidArgument, msg) + } + // Validate Volume Capabilities + volCaps := req.GetVolumeCapabilities() + if len(volCaps) == 0 { + return status.Error(codes.InvalidArgument, "Volume capabilities not provided") + } + if !IsValidVolumeCapabilities(volCaps) { + return status.Error(codes.InvalidArgument, "Volume capabilities not supported") + } + return nil +} + +// ValidateDeleteVolumeRequest is the helper function to validate +// DeleteVolumeRequest for all block controllers. +// Function returns error if validation fails otherwise returns nil. +func ValidateDeleteVolumeRequest(req *csi.DeleteVolumeRequest) error { + //check for required parameters + if len(req.VolumeId) == 0 { + msg := "Volume ID is a required parameter." + klog.Error(msg) + return status.Errorf(codes.InvalidArgument, msg) + } + return nil +} + +// ValidateControllerPublishVolumeRequest is the helper function to validate +// ControllerPublishVolumeRequest for all block controllers. +// Function returns error if validation fails otherwise returns nil. +func ValidateControllerPublishVolumeRequest(req *csi.ControllerPublishVolumeRequest) error { + //check for required parameters + if len(req.VolumeId) == 0 { + msg := "Volume ID is a required parameter." + klog.Error(msg) + return status.Error(codes.InvalidArgument, msg) + } else if len(req.NodeId) == 0 { + msg := "Node ID is a required parameter." + klog.Error(msg) + return status.Error(codes.InvalidArgument, msg) + } + volCap := req.GetVolumeCapability() + if volCap == nil { + return status.Error(codes.InvalidArgument, "Volume capability not provided") + } + caps := []*csi.VolumeCapability{volCap} + if !IsValidVolumeCapabilities(caps) { + return status.Error(codes.InvalidArgument, "Volume capability not supported") + } + return nil +} + +// ValidateControllerUnpublishVolumeRequest is the helper function to validate +// ControllerUnpublishVolumeRequest for all block controllers. +// Function returns error if validation fails otherwise returns nil. +func ValidateControllerUnpublishVolumeRequest(req *csi.ControllerUnpublishVolumeRequest) error { + //check for required parameters + if len(req.VolumeId) == 0 { + msg := "Volume ID is a required parameter." + klog.Error(msg) + return status.Error(codes.InvalidArgument, msg) + } else if len(req.NodeId) == 0 { + msg := "Node ID is a required parameter." + klog.Error(msg) + return status.Error(codes.InvalidArgument, msg) + } + return nil +} + +// CheckAPI checks if specified version is 6.7.3 or higher +func CheckAPI(version string) error { + items := strings.Split(version, ".") + if len(items) < 2 || len(items) > 3 { + return fmt.Errorf("Invalid API Version format") + } + major, err := strconv.Atoi(items[0]) + if err != nil { + return fmt.Errorf("Invalid Major Version value") + } + minor, err := strconv.Atoi(items[1]) + if err != nil { + return fmt.Errorf("Invalid Minor Version value") + } + + if major < MinSupportedVCenterMajor || (major == MinSupportedVCenterMajor && minor < MinSupportedVCenterMinor) { + return fmt.Errorf("The minimum supported vCenter is 6.7.3") + } + + if major == MinSupportedVCenterMajor && minor == MinSupportedVCenterMinor { + if len(items) == 3 { + patch, err := strconv.Atoi(items[2]) + if err != nil || patch < MinSupportedVCenterPatch { + return fmt.Errorf("Invalid patch version value") + } + } + } + return nil +} diff --git a/pkg/csi/service/common/constants.go b/pkg/csi/service/common/constants.go new file mode 100644 index 0000000000..823f444440 --- /dev/null +++ b/pkg/csi/service/common/constants.go @@ -0,0 +1,72 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +const ( + // MbInBytes is the number of bytes in one mebibyte. + MbInBytes = int64(1024 * 1024) + + // GbInBytes is the number of bytes in one gibibyte. + GbInBytes = int64(1024 * 1024 * 1024) + + // DiskTypeString is the value for the PersistentVolume's attribute "type" + DiskTypeString = "vSphere CNS Block Volume" + + // AttributeDiskType is a PersistentVolume's attribute. + AttributeDiskType = "type" + + // AttributeDatastoreURL represents URL of the datastore in the StorageClass + // For Example: DatastoreURL: "ds:///vmfs/volumes/5c9bb20e-009c1e46-4b85-0200483b2a97/" + AttributeDatastoreURL = "datastoreurl" + + // AttributeStoragePolicyName represents name of the Storage Policy in the Storage Class + // For Example: StoragePolicy: "vSAN Default Storage Policy" + AttributeStoragePolicyName = "storagepolicyname" + + // AttributeStoragePolicyID represents Storage Policy Id in the Storage Classs + // For Example: StoragePolicyId: "251bce41-cb24-41df-b46b-7c75aed3c4ee" + AttributeStoragePolicyID = "storagepolicyid" + + // AttributeFsType represents filesystem type in the Storage Classs + // For Example: FsType: "ext4" + AttributeFsType = "fstype" + + // DefaultFsType represents the default filesystem type which will be used to format the volume + // during mount if user does not specify the filesystem type in the Storage Class + DefaultFsType = "ext4" + + //ProviderPrefix is the prefix used for the ProviderID set on the node + // Example: vsphere://4201794a-f26b-8914-d95a-edeb7ecc4a8f + ProviderPrefix = "vsphere://" + + // AttributeFirstClassDiskUUID is the SCSI Disk Identifier + AttributeFirstClassDiskUUID = "diskUUID" + + // BlockVolumeType is the VolumeType for CNS Volume + BlockVolumeType = "BLOCK" + + // MinSupportedVCenterMajor is the minimum, major version of vCenter + // on which CNS is supported. + MinSupportedVCenterMajor int = 6 + + // MinSupportedVCenterMinor is the minimum, minor version of vCenter + // on which CNS is supported. + MinSupportedVCenterMinor int = 7 + + // MinSupportedVCenterPatch is the patch version supported with MinSupportedVCenterMajor and MinSupportedVCenterMinor + MinSupportedVCenterPatch int = 3 +) diff --git a/pkg/csi/service/common/types.go b/pkg/csi/service/common/types.go new file mode 100644 index 0000000000..f2a25f5ae2 --- /dev/null +++ b/pkg/csi/service/common/types.go @@ -0,0 +1,53 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import ( + "github.com/container-storage-interface/spec/lib/go/csi" + + cnsvolume "sigs.k8s.io/vsphere-csi-driver/pkg/common/cns-lib/volume" + cnsvsphere "sigs.k8s.io/vsphere-csi-driver/pkg/common/cns-lib/vsphere" + "sigs.k8s.io/vsphere-csi-driver/pkg/common/config" +) + +var ( + // VolumeCaps represents how the volume could be accessed. + // It is SINGLE_NODE_WRITER since vSphere CNS Block volume could only be + // attached to a single node at any given time. + VolumeCaps = []csi.VolumeCapability_AccessMode{ + { + Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + }, + } +) + +// Manager type comprises VirtualCenterConfig, CnsConfig, VolumeManager and VirtualCenterManager +type Manager struct { + VcenterConfig *cnsvsphere.VirtualCenterConfig + CnsConfig *config.Config + VolumeManager cnsvolume.Manager + VcenterManager cnsvsphere.VirtualCenterManager +} + +// CreateVolumeSpec is the Volume Spec used by CSI driver +type CreateVolumeSpec struct { + Name string + StoragePolicyName string + StoragePolicyID string + DatastoreURL string + CapacityMB int64 +} diff --git a/pkg/csi/service/common/util.go b/pkg/csi/service/common/util.go new file mode 100644 index 0000000000..05bf4aec38 --- /dev/null +++ b/pkg/csi/service/common/util.go @@ -0,0 +1,95 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import ( + "strings" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/vmware/govmomi/vim25/types" + "golang.org/x/net/context" + "k8s.io/klog" + cnsvsphere "sigs.k8s.io/vsphere-csi-driver/pkg/common/cns-lib/vsphere" +) + +// GetVCenter returns VirtualCenter object from specified Manager object. +// Before returning VirtualCenter object, vcenter connection is established if session doesn't exist. +func GetVCenter(ctx context.Context, manager *Manager) (*cnsvsphere.VirtualCenter, error) { + var err error + vcenter, err := manager.VcenterManager.GetVirtualCenter(manager.VcenterConfig.Host) + if err != nil { + klog.Errorf("Failed to get VirtualCenter instance for host: %q. err=%v", manager.VcenterConfig.Host, err) + return nil, err + } + err = vcenter.Connect(ctx) + if err != nil { + klog.Errorf("Failed to connect to VirtualCenter host: %q. err=%v", manager.VcenterConfig.Host, err) + return nil, err + } + return vcenter, nil +} + +// GetUUIDFromProviderID Returns VM UUID from Node's providerID +func GetUUIDFromProviderID(providerID string) string { + return strings.TrimPrefix(providerID, ProviderPrefix) +} + +// FormatDiskUUID removes any spaces and hyphens in UUID +// Example UUID input is 42375390-71f9-43a3-a770-56803bcd7baa and output after format is 4237539071f943a3a77056803bcd7baa +func FormatDiskUUID(uuid string) string { + uuidwithNoSpace := strings.Replace(uuid, " ", "", -1) + uuidWithNoHypens := strings.Replace(uuidwithNoSpace, "-", "", -1) + return strings.ToLower(uuidWithNoHypens) +} + +// RoundUpSize calculates how many allocation units are needed to accommodate +// a volume of given size. +func RoundUpSize(volumeSizeBytes int64, allocationUnitBytes int64) int64 { + roundedUp := volumeSizeBytes / allocationUnitBytes + if volumeSizeBytes%allocationUnitBytes > 0 { + roundedUp++ + } + return roundedUp +} + +// GetLabelsMapFromKeyValue creates a map object from given parameter +func GetLabelsMapFromKeyValue(labels []types.KeyValue) map[string]string { + labelsMap := make(map[string]string) + for _, label := range labels { + labelsMap[label.Key] = label.Value + } + return labelsMap +} + +// IsValidVolumeCapabilities is the helper function to validate capabilities of volume. +func IsValidVolumeCapabilities(volCaps []*csi.VolumeCapability) bool { + hasSupport := func(cap *csi.VolumeCapability) bool { + for _, c := range VolumeCaps { + if c.GetMode() == cap.AccessMode.GetMode() { + return true + } + } + return false + } + foundAll := true + for _, c := range volCaps { + if !hasSupport(c) { + foundAll = false + } + } + return foundAll +} diff --git a/pkg/csi/service/common/vsphereutil.go b/pkg/csi/service/common/vsphereutil.go new file mode 100644 index 0000000000..37ec77507f --- /dev/null +++ b/pkg/csi/service/common/vsphereutil.go @@ -0,0 +1,172 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import ( + "errors" + "fmt" + + "github.com/davecgh/go-spew/spew" + cnstypes "github.com/vmware/govmomi/cns/types" + vim25types "github.com/vmware/govmomi/vim25/types" + "golang.org/x/net/context" + "k8s.io/klog" + "sigs.k8s.io/vsphere-csi-driver/pkg/common/cns-lib/vsphere" +) + +// CreateVolumeUtil is the helper function to create CNS volume +func CreateVolumeUtil(ctx context.Context, manager *Manager, spec *CreateVolumeSpec, sharedDatastores []*vsphere.DatastoreInfo) (string, error) { + vc, err := GetVCenter(ctx, manager) + if err != nil { + klog.Errorf("Failed to get vCenter from Manager, err: %+v", err) + return "", err + } + if spec.StoragePolicyName != "" { + // Get Storage Policy ID from Storage Policy Name + err = vc.ConnectPbm(ctx) + if err != nil { + klog.Errorf("Error occurred while connecting to PBM, err: %+v", err) + return "", err + } + spec.StoragePolicyID, err = vc.GetStoragePolicyIDByName(ctx, spec.StoragePolicyName) + if err != nil { + klog.Errorf("Error occurred while getting Profile Id from Profile Name: %s, err: %+v", spec.StoragePolicyName, err) + return "", err + } + } + var datastores []vim25types.ManagedObjectReference + if spec.DatastoreURL == "" { + // If DatastoreURL is not specified in StorageClass, get all shared datastores + datastores = getDatastoreMoRefs(sharedDatastores) + } else { + // Check datastore specified in the StorageClass should be shared datastore across all nodes. + + // vc.GetDatacenters returns datacenters found on the VirtualCenter. + // If no datacenters are mentioned in the VirtualCenterConfig during registration, all + // Datacenters for the given VirtualCenter will be returned, else only the listed + // Datacenters are returned. + datacenters, err := vc.GetDatacenters(ctx) + if err != nil { + klog.Errorf("Failed to find datacenters from VC: %+v, Error: %+v", vc.Config.Host, err) + return "", err + } + isSharedDatastoreURL := false + var datastoreObj *vsphere.Datastore + for _, datacenter := range datacenters { + datastoreObj, err = datacenter.GetDatastoreByURL(ctx, spec.DatastoreURL) + if err != nil { + klog.Warningf("Failed to find datastore with URL %q in datacenter %q from VC %q, Error: %+v", spec.DatastoreURL, datacenter.InventoryPath, vc.Config.Host, err) + continue + } + for _, sharedDatastore := range sharedDatastores { + if sharedDatastore.Info.Url == spec.DatastoreURL { + isSharedDatastoreURL = true + break + } + } + if isSharedDatastoreURL { + break + } + } + if datastoreObj == nil { + errMsg := fmt.Sprintf("DatastoreURL: %s specified in the storage class is not found.", spec.DatastoreURL) + klog.Errorf(errMsg) + return "", errors.New(errMsg) + } + if isSharedDatastoreURL { + datastores = append(datastores, datastoreObj.Reference()) + } else { + errMsg := fmt.Sprintf("Datastore: %s specified in the storage class is not accessible to all nodes.", spec.DatastoreURL) + klog.Errorf(errMsg) + return "", errors.New(errMsg) + } + } + createSpec := &cnstypes.CnsVolumeCreateSpec{ + Name: spec.Name, + VolumeType: BlockVolumeType, + Datastores: datastores, + BackingObjectDetails: &cnstypes.CnsBackingObjectDetails{ + CapacityInMb: spec.CapacityMB, + }, + Metadata: cnstypes.CnsVolumeMetadata{ + ContainerCluster: vsphere.GetContainerCluster(manager.CnsConfig.Global.ClusterID, manager.CnsConfig.VirtualCenter[vc.Config.Host].User), + }, + } + if spec.StoragePolicyID != "" { + profileSpec := &vim25types.VirtualMachineDefinedProfileSpec{ + ProfileId: spec.StoragePolicyID, + } + createSpec.Profile = append(createSpec.Profile, profileSpec) + } + klog.V(4).Infof("vSphere CNS driver creating volume %s with create spec %+v", spec.Name, spew.Sdump(createSpec)) + volumeID, err := manager.VolumeManager.CreateVolume(createSpec) + if err != nil { + klog.Errorf("Failed to create disk %s with error %+v", spec.Name, err) + return "", err + } + return volumeID.Id, nil +} + +// AttachVolumeUtil is the helper function to attach CNS volume to specified vm +func AttachVolumeUtil(ctx context.Context, manager *Manager, + vm *vsphere.VirtualMachine, + volumeID string) (string, error) { + klog.V(4).Infof("vSphere CNS driver is attaching volume: %s to node vm: %s", volumeID, vm.InventoryPath) + diskUUID, err := manager.VolumeManager.AttachVolume(vm, volumeID) + if err != nil { + klog.Errorf("Failed to attach disk %s with err %+v", volumeID, err) + return "", err + } + klog.V(4).Infof("Successfully attached disk %s to VM %v. Disk UUID is %s", volumeID, vm, diskUUID) + return diskUUID, nil +} + +// DetachVolumeUtil is the helper function to detach CNS volume from specified vm +func DetachVolumeUtil(ctx context.Context, manager *Manager, + vm *vsphere.VirtualMachine, + volumeID string) error { + klog.V(4).Infof("vSphere CNS driver is detaching volume: %s from node vm: %s", volumeID, vm.InventoryPath) + err := manager.VolumeManager.DetachVolume(vm, volumeID) + if err != nil { + klog.Errorf("Failed to detach disk %s with err %+v", volumeID, err) + return err + } + klog.V(4).Infof("Successfully detached disk %s from VM %v.", volumeID, vm) + return nil +} + +// DeleteVolumeUtil is the helper function to delete CNS volume for given volumeId +func DeleteVolumeUtil(ctx context.Context, manager *Manager, volumeID string, deleteDisk bool) error { + var err error + klog.V(4).Infof("vSphere Cloud Provider deleting volume: %s", volumeID) + err = manager.VolumeManager.DeleteVolume(volumeID, deleteDisk) + if err != nil { + klog.Errorf("Failed to delete disk %s with error %+v", volumeID, err) + return err + } + klog.V(4).Infof("Successfully deleted disk for volumeid: %s", volumeID) + return nil +} + +// Helper function to get DatastoreMoRefs +func getDatastoreMoRefs(datastores []*vsphere.DatastoreInfo) []vim25types.ManagedObjectReference { + var datastoreMoRefs []vim25types.ManagedObjectReference + for _, datastore := range datastores { + datastoreMoRefs = append(datastoreMoRefs, datastore.Reference()) + } + return datastoreMoRefs +} diff --git a/pkg/kubernetes/informers.go b/pkg/kubernetes/informers.go new file mode 100644 index 0000000000..fad88559a5 --- /dev/null +++ b/pkg/kubernetes/informers.go @@ -0,0 +1,108 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "time" + + "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/sample-controller/pkg/signals" +) + +func noResyncPeriodFunc() time.Duration { + return 0 +} + +// NewInformer creates a new K8S client based on a service account +func NewInformer(client clientset.Interface) *InformerManager { + return &InformerManager{ + client: client, + stopCh: signals.SetupSignalHandler(), + informerFactory: informers.NewSharedInformerFactory(client, noResyncPeriodFunc()), + } +} + +// AddNodeListener hooks up add, update, delete callbacks +func (im *InformerManager) AddNodeListener(add func(obj interface{}), update func(oldObj, newObj interface{}), remove func(obj interface{})) { + if im.nodeInformer == nil { + im.nodeInformer = im.informerFactory.Core().V1().Nodes().Informer() + } + + im.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: add, + UpdateFunc: update, + DeleteFunc: remove, + }) +} + +// AddPVCListener hooks up add, update, delete callbacks +func (im *InformerManager) AddPVCListener(add func(obj interface{}), update func(oldObj, newObj interface{}), remove func(obj interface{})) { + if im.pvcInformer == nil { + im.pvcInformer = im.informerFactory.Core().V1().PersistentVolumeClaims().Informer() + } + + im.pvcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: add, + UpdateFunc: update, + DeleteFunc: remove, + }) +} + +// AddPVListener hooks up add, update, delete callbacks +func (im *InformerManager) AddPVListener(add func(obj interface{}), update func(oldObj, newObj interface{}), remove func(obj interface{})) { + if im.pvInformer == nil { + im.pvInformer = im.informerFactory.Core().V1().PersistentVolumes().Informer() + } + + im.pvInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: add, + UpdateFunc: update, + DeleteFunc: remove, + }) +} + +// AddPodListener hooks up add, update, delete callbacks +func (im *InformerManager) AddPodListener(add func(obj interface{}), update func(oldObj, newObj interface{}), remove func(obj interface{})) { + if im.podInformer == nil { + im.podInformer = im.informerFactory.Core().V1().Pods().Informer() + } + + im.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: add, + UpdateFunc: update, + DeleteFunc: remove, + }) +} + +// GetPVLister returns Persistent Volume Lister for the calling informer manager +func (im *InformerManager) GetPVLister() corelisters.PersistentVolumeLister { + return im.informerFactory.Core().V1().PersistentVolumes().Lister() +} + +// GetPVCLister returns PVC Lister for the calling informer manager +func (im *InformerManager) GetPVCLister() corelisters.PersistentVolumeClaimLister { + return im.informerFactory.Core().V1().PersistentVolumeClaims().Lister() +} + +// Listen starts the Informers +func (im *InformerManager) Listen() (stopCh <-chan struct{}) { + go im.informerFactory.Start(im.stopCh) + return im.stopCh +} diff --git a/pkg/kubernetes/kubernetes.go b/pkg/kubernetes/kubernetes.go new file mode 100644 index 0000000000..f5e6416651 --- /dev/null +++ b/pkg/kubernetes/kubernetes.go @@ -0,0 +1,71 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "k8s.io/klog" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + + "sigs.k8s.io/vsphere-csi-driver/pkg/csi/service/common" +) + +// NewClient creates a newk8s client based on a service account +func NewClient() (clientset.Interface, error) { + + var config *restclient.Config + var err error + klog.V(2).Info("k8s client using in-cluster config") + config, err = restclient.InClusterConfig() + if err != nil { + klog.Errorf("InClusterConfig failed %q", err) + return nil, err + } + + return clientset.NewForConfig(config) +} + +// CreateKubernetesClientFromConfig creaates a newk8s client from given kubeConfig file +func CreateKubernetesClientFromConfig(kubeConfigPath string) (clientset.Interface, error) { + + cfg, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) + if err != nil { + return nil, err + } + + client, err := clientset.NewForConfig(cfg) + if err != nil { + return nil, err + } + return client, nil +} + +// GetNodeVMUUID returns vSphere VM UUID set by CCM on the Kubernetes Node +func GetNodeVMUUID(k8sclient clientset.Interface, nodeName string) (string, error) { + klog.V(2).Infof("GetNodeVMUUID called for the node: %q", nodeName) + node, err := k8sclient.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) + if err != nil { + klog.Errorf("Failed to get kubernetes node with the name: %q. Err: %v", nodeName, err) + return "", err + } + k8sNodeUUID := common.GetUUIDFromProviderID(node.Spec.ProviderID) + klog.V(2).Infof("Retrieved node UUID: %q for the node: %q", k8sNodeUUID, nodeName) + return k8sNodeUUID, nil +} diff --git a/pkg/kubernetes/types.go b/pkg/kubernetes/types.go new file mode 100644 index 0000000000..8ab9ac606d --- /dev/null +++ b/pkg/kubernetes/types.go @@ -0,0 +1,46 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +// InformerManager is a service that notifies subscribers about changes +// to well-defined information in the Kubernetes API server. +type InformerManager struct { + // k8s client + client clientset.Interface + // main shared informer factory + informerFactory informers.SharedInformerFactory + // main signal + stopCh (<-chan struct{}) + + // node informer + nodeInformer cache.SharedInformer + + // PV informer + pvInformer cache.SharedInformer + + // PVC informer + pvcInformer cache.SharedInformer + + // Pod informer + podInformer cache.SharedInformer +}