diff --git a/cmd/csi-attacher/main.go b/cmd/csi-attacher/main.go index 53987b689..eda1a0bb4 100644 --- a/cmd/csi-attacher/main.go +++ b/cmd/csi-attacher/main.go @@ -198,13 +198,31 @@ func main() { klog.Error(err.Error()) os.Exit(1) } + + supportsSingleNodeMultiWriter, err := supportsSingleNodeMultiWriter(ctx, csiConn) + if err != nil { + klog.Error(err.Error()) + os.Exit(1) + } + if supportsAttach { pvLister := factory.Core().V1().PersistentVolumes().Lister() vaLister := factory.Storage().V1().VolumeAttachments().Lister() csiNodeLister := factory.Storage().V1().CSINodes().Lister() volAttacher := attacher.NewAttacher(csiConn) CSIVolumeLister := attacher.NewVolumeLister(csiConn) - handler = controller.NewCSIHandler(clientset, csiAttacher, volAttacher, CSIVolumeLister, pvLister, csiNodeLister, vaLister, timeout, supportsReadOnly, csitrans.New()) + handler = controller.NewCSIHandler( + clientset, + csiAttacher, + volAttacher, + CSIVolumeLister, + pvLister, + csiNodeLister, + vaLister, + timeout, + supportsReadOnly, + supportsSingleNodeMultiWriter, + csitrans.New()) klog.V(2).Infof("CSI driver supports ControllerPublishUnpublish, using real CSI handler") } else { handler = controller.NewTrivialHandler(clientset) @@ -294,6 +312,15 @@ func supportsListVolumesPublishedNodes(ctx context.Context, csiConn *grpc.Client return caps[csi.ControllerServiceCapability_RPC_LIST_VOLUMES] && caps[csi.ControllerServiceCapability_RPC_LIST_VOLUMES_PUBLISHED_NODES], nil } +func supportsSingleNodeMultiWriter(ctx context.Context, csiConn *grpc.ClientConn) (bool, error) { + caps, err := rpc.GetControllerCapabilities(ctx, csiConn) + if err != nil { + return false, fmt.Errorf("failed to get controller capabilities: %v", err) + } + + return caps[csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER], nil +} + func supportsPluginControllerService(ctx context.Context, csiConn *grpc.ClientConn) (bool, error) { caps, err := rpc.GetPluginCapabilities(ctx, csiConn) if err != nil { diff --git a/go.mod b/go.mod index f27673366..6f73f5e50 100644 --- a/go.mod +++ b/go.mod @@ -22,17 +22,19 @@ require ( golang.org/x/term v0.0.0-20210317153231-de623e64d2a6 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20210317182105-75c7a8546eb9 // indirect - google.golang.org/grpc v1.36.0 + google.golang.org/grpc v1.37.0 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect - k8s.io/api v0.21.0 + k8s.io/api v0.21.1 k8s.io/apimachinery v0.21.0 - k8s.io/client-go v0.21.0 - k8s.io/component-base v0.21.0 // indirect + k8s.io/client-go v0.21.1 k8s.io/csi-translation-lib v0.21.0 k8s.io/klog/v2 v2.8.0 k8s.io/utils v0.0.0-20210305010621-2afb4311ab10 // indirect ) +// go get -d github.com/chrishenzie/csi-lib-utils@single-node-access-modes +replace github.com/kubernetes-csi/csi-lib-utils => github.com/chrishenzie/csi-lib-utils v0.9.2-0.20210603000358-d0686c7b81af + replace k8s.io/component-base => github.com/chrishenzie/kubernetes/staging/src/k8s.io/component-base v0.0.0-20210507180302-a29b4b67ec78 replace k8s.io/node-api => github.com/chrishenzie/kubernetes/staging/src/k8s.io/node-api v0.0.0-20210507180302-a29b4b67ec78 diff --git a/go.sum b/go.sum index 4bab83150..6172c8901 100644 --- a/go.sum +++ b/go.sum @@ -82,6 +82,8 @@ github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chrishenzie/csi-lib-utils v0.9.2-0.20210603000358-d0686c7b81af h1:rNyFwAyxVYHRhVfJAOdTpyCdk9GAORCccWn0UwZpxug= +github.com/chrishenzie/csi-lib-utils v0.9.2-0.20210603000358-d0686c7b81af/go.mod h1:D0gR5OCNhxbA7T54s7rcwokSueAUE/G7JjrLsZ8jI0M= github.com/chrishenzie/kubernetes/staging/src/k8s.io/api v0.0.0-20210507180302-a29b4b67ec78 h1:vDfCcW8hb8tcfyBxS2bPKirFbvXII6uuVXgH8+JZLbo= github.com/chrishenzie/kubernetes/staging/src/k8s.io/api v0.0.0-20210507180302-a29b4b67ec78/go.mod h1:DKjoC7WTLvupppdmb5jEvRDPQENLZqz/stEUs19TOOc= github.com/chrishenzie/kubernetes/staging/src/k8s.io/apimachinery v0.0.0-20210507180302-a29b4b67ec78 h1:N+JEnOzxl1RhNwvsBee6C2Rrydng8sAW2GSaB4rAdoY= @@ -133,7 +135,7 @@ github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4s github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= -github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v4.9.0+incompatible h1:kLcOMZeuLAJvL2BPWLMIj5oaZQobrkAqrL+WFZwQses= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= @@ -153,7 +155,6 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= -github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v0.4.0 h1:K7/B1jt6fIBQVd4Owv2MqGQClcgf0R266+7C/QjRcLc= github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= @@ -310,8 +311,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kubernetes-csi/csi-lib-utils v0.9.1 h1:sGq6ifVujfMSkfTsMZip44Ttv8SDXvsBlFk9GdYl/b8= -github.com/kubernetes-csi/csi-lib-utils v0.9.1/go.mod h1:8E2jVUX9j3QgspwHXa6LwyN7IHQDjW9jX3kwoWnSC+M= github.com/kubernetes-csi/csi-test/v4 v4.0.2 h1:MNj94SFHOGK6lOy+yDgxI+zlFWaPcgByqBH3JZZGyZI= github.com/kubernetes-csi/csi-test/v4 v4.0.2/go.mod h1:z3FYigjLFAuzmFzKdHQr8gUPm5Xr4Du2twKcxfys0eI= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= @@ -800,12 +799,11 @@ google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= -google.golang.org/grpc v1.29.0/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= -google.golang.org/grpc v1.36.0 h1:o1bcQ6imQMIOpdrO3SWf2z5RV72WbDwdXuK0MDlc8As= -google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.37.0 h1:uSZWeQJX5j11bIQ4AJoj+McDBo29cY1MCoC1wO3ts+c= +google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -865,7 +863,6 @@ k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8 k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= -k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.8.0 h1:Q3gmuM9hKEjefWFFYF0Mat+YyFJvsUyYuwyNNJ5C9Ts= k8s.io/klog/v2 v2.8.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec= k8s.io/kube-openapi v0.0.0-20210421082810-95288971da7e h1:KLHHjkdQFomZy8+06csTWZ0m1343QqxZhR2LJ1OxCYM= diff --git a/pkg/attacher/attacher.go b/pkg/attacher/attacher.go index 8a92dabbc..8af577b31 100644 --- a/pkg/attacher/attacher.go +++ b/pkg/attacher/attacher.go @@ -42,8 +42,7 @@ type Attacher interface { } type attacher struct { - conn *grpc.ClientConn - capabilities []csi.ControllerServiceCapability + conn *grpc.ClientConn } var ( diff --git a/pkg/controller/csi_handler.go b/pkg/controller/csi_handler.go index df372f3f1..a522b8459 100644 --- a/pkg/controller/csi_handler.go +++ b/pkg/controller/csi_handler.go @@ -58,19 +58,20 @@ var _ VolumeLister = &attacher.CSIVolumeLister{} // It adds finalizer to VolumeAttachment instance to make sure they're detached // before deletion. type csiHandler struct { - client kubernetes.Interface - attacherName string - attacher attacher.Attacher - CSIVolumeLister VolumeLister - pvLister corelisters.PersistentVolumeLister - csiNodeLister storagelisters.CSINodeLister - vaLister storagelisters.VolumeAttachmentLister - vaQueue, pvQueue workqueue.RateLimitingInterface - forceSync map[string]bool - forceSyncMux sync.Mutex - timeout time.Duration - supportsPublishReadOnly bool - translator AttacherCSITranslator + client kubernetes.Interface + attacherName string + attacher attacher.Attacher + CSIVolumeLister VolumeLister + pvLister corelisters.PersistentVolumeLister + csiNodeLister storagelisters.CSINodeLister + vaLister storagelisters.VolumeAttachmentLister + vaQueue, pvQueue workqueue.RateLimitingInterface + forceSync map[string]bool + forceSyncMux sync.Mutex + timeout time.Duration + supportsPublishReadOnly bool + supportsSingleNodeMultiWriter bool + translator AttacherCSITranslator } var _ Handler = &csiHandler{} @@ -86,21 +87,23 @@ func NewCSIHandler( vaLister storagelisters.VolumeAttachmentLister, timeout *time.Duration, supportsPublishReadOnly bool, + supportsSingleNodeMultiWriter bool, translator AttacherCSITranslator) Handler { return &csiHandler{ - client: client, - attacherName: attacherName, - attacher: attacher, - CSIVolumeLister: CSIVolumeLister, - pvLister: pvLister, - csiNodeLister: csiNodeLister, - vaLister: vaLister, - timeout: *timeout, - supportsPublishReadOnly: supportsPublishReadOnly, - translator: translator, - forceSync: map[string]bool{}, - forceSyncMux: sync.Mutex{}, + client: client, + attacherName: attacherName, + attacher: attacher, + CSIVolumeLister: CSIVolumeLister, + pvLister: pvLister, + csiNodeLister: csiNodeLister, + vaLister: vaLister, + timeout: *timeout, + supportsPublishReadOnly: supportsPublishReadOnly, + supportsSingleNodeMultiWriter: supportsSingleNodeMultiWriter, + translator: translator, + forceSync: map[string]bool{}, + forceSyncMux: sync.Mutex{}, } } @@ -488,10 +491,11 @@ func (h *csiHandler) csiAttach(va *storage.VolumeAttachment) (*storage.VolumeAtt readOnly = false } - volumeCapabilities, err := GetVolumeCapabilities(pvSpec) + volumeCapabilities, err := GetVolumeCapabilities(pvSpec, h.supportsSingleNodeMultiWriter) if err != nil { return va, nil, err } + secrets, err := h.getCredentialsFromPV(csiSource) if err != nil { return va, nil, err diff --git a/pkg/controller/csi_handler_test.go b/pkg/controller/csi_handler_test.go index 72ce6a0d7..0a8526bb3 100644 --- a/pkg/controller/csi_handler_test.go +++ b/pkg/controller/csi_handler_test.go @@ -63,7 +63,8 @@ func csiHandlerFactory(client kubernetes.Interface, informerFactory informers.Sh informerFactory.Storage().V1().CSINodes().Lister(), informerFactory.Storage().V1().VolumeAttachments().Lister(), &timeout, - true, /* supports PUBLISH_READONLY */ + true, /* supports PUBLISH_READONLY */ + false, /* does not support SINGLE_NODE_MULTI_WRITER access mode */ csitranslator.New(), ) } @@ -79,6 +80,7 @@ func csiHandlerFactoryNoReadOnly(client kubernetes.Interface, informerFactory in informerFactory.Storage().V1().VolumeAttachments().Lister(), &timeout, false, /* does not support PUBLISH_READONLY */ + false, /* does not support SINGLE_NODE_MULTI_WRITER access mode */ csitranslator.New(), ) } diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 8d3bd4e08..e7bd3477e 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -25,6 +25,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" jsonpatch "github.com/evanphx/json-patch" + "github.com/kubernetes-csi/csi-lib-utils/accessmodes" v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -138,13 +139,9 @@ func GetNodeIDFromCSINode(driver string, csiNode *storage.CSINode) (string, bool return "", false } -// GetVolumeCapabilities returns volumecapability from PV spec -func GetVolumeCapabilities(pvSpec *v1.PersistentVolumeSpec) (*csi.VolumeCapability, error) { - m := map[v1.PersistentVolumeAccessMode]bool{} - for _, mode := range pvSpec.AccessModes { - m[mode] = true - } - +// GetVolumeCapabilities returns a VolumeCapability from the PV spec. Which access mode will be set depends if the driver supports the +// SINGLE_NODE_MULTI_WRITER capability. +func GetVolumeCapabilities(pvSpec *v1.PersistentVolumeSpec, singleNodeMultiWriterCapable bool) (*csi.VolumeCapability, error) { if pvSpec.CSI == nil { return nil, errors.New("CSI volume source was nil") } @@ -175,27 +172,17 @@ func GetVolumeCapabilities(pvSpec *v1.PersistentVolumeSpec) (*csi.VolumeCapabili } } - // Translate array of modes into single VolumeCapability - switch { - case m[v1.ReadWriteMany]: - // ReadWriteMany trumps everything, regardless what other modes are set - cap.AccessMode.Mode = csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER - - case m[v1.ReadOnlyMany] && m[v1.ReadWriteOnce]: - // This is no way how to translate this to CSI... - return nil, fmt.Errorf("CSI does not support ReadOnlyMany and ReadWriteOnce on the same PersistentVolume") - - case m[v1.ReadOnlyMany]: - // There is only ReadOnlyMany set - cap.AccessMode.Mode = csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY - - case m[v1.ReadWriteOnce]: - // There is only ReadWriteOnce set - cap.AccessMode.Mode = csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER - - default: - return nil, fmt.Errorf("unsupported AccessMode combination: %+v", pvSpec.AccessModes) + var am csi.VolumeCapability_AccessMode_Mode + var err error + if singleNodeMultiWriterCapable { + am, err = accessmodes.ToSingleNodeMultiWriterCapableCSIAccessMode(pvSpec.AccessModes) + } else { + am, err = accessmodes.ToCSIAccessMode(pvSpec.AccessModes) + } + if err != nil { + return nil, err } + cap.AccessMode.Mode = am return cap, nil } diff --git a/pkg/controller/util_test.go b/pkg/controller/util_test.go index 66b1de474..c3a336d12 100644 --- a/pkg/controller/util_test.go +++ b/pkg/controller/util_test.go @@ -57,13 +57,14 @@ func TestGetVolumeCapabilities(t *testing.T) { filesystemVolumeMode := v1.PersistentVolumeMode(v1.PersistentVolumeFilesystem) tests := []struct { - name string - volumeMode *v1.PersistentVolumeMode - fsType string - modes []v1.PersistentVolumeAccessMode - mountOptions []string - expectedCapability *csi.VolumeCapability - expectError bool + name string + volumeMode *v1.PersistentVolumeMode + fsType string + modes []v1.PersistentVolumeAccessMode + mountOptions []string + supportsSingleNodeMultiWriter bool + expectedCapability *csi.VolumeCapability + expectError bool }{ { name: "RWX", @@ -130,6 +131,69 @@ func TestGetVolumeCapabilities(t *testing.T) { expectedCapability: nil, expectError: true, }, + { + name: "RWX with SINGLE_NODE_MULTI_WRITER capable driver", + volumeMode: &filesystemVolumeMode, + modes: []v1.PersistentVolumeAccessMode{v1.ReadWriteMany}, + supportsSingleNodeMultiWriter: true, + expectedCapability: createMountCapability(defaultFSType, csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, nil), + expectError: false, + }, + { + name: "ROX + RWO with SINGLE_NODE_MULTI_WRITER capable driver", + volumeMode: &filesystemVolumeMode, + modes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany, v1.ReadWriteOnce}, + supportsSingleNodeMultiWriter: true, + expectedCapability: nil, + expectError: true, + }, + { + name: "ROX + RWOP with SINGLE_NODE_MULTI_WRITER capable driver", + volumeMode: &filesystemVolumeMode, + modes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany, v1.ReadWriteOncePod}, + supportsSingleNodeMultiWriter: true, + expectedCapability: nil, + expectError: true, + }, + { + name: "ROX with SINGLE_NODE_MULTI_WRITER capable driver", + volumeMode: &filesystemVolumeMode, + modes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}, + supportsSingleNodeMultiWriter: true, + expectedCapability: createMountCapability(defaultFSType, csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, nil), + expectError: false, + }, + { + name: "RWO with SINGLE_NODE_MULTI_WRITER capable driver", + volumeMode: &filesystemVolumeMode, + modes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + supportsSingleNodeMultiWriter: true, + expectedCapability: createMountCapability(defaultFSType, csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER, nil), + expectError: false, + }, + { + name: "RWOP with SINGLE_NODE_MULTI_WRITER capable driver", + volumeMode: &filesystemVolumeMode, + modes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}, + supportsSingleNodeMultiWriter: true, + expectedCapability: createMountCapability(defaultFSType, csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER, nil), + expectError: false, + }, + { + name: "RWO + RWOP with SINGLE_NODE_MULTI_WRITER capable driver", + volumeMode: &filesystemVolumeMode, + modes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce, v1.ReadWriteOncePod}, + supportsSingleNodeMultiWriter: true, + expectedCapability: nil, + expectError: true, + }, + { + name: "nothing with SINGLE_NODE_MULTI_WRITER capable driver", + modes: []v1.PersistentVolumeAccessMode{}, + supportsSingleNodeMultiWriter: true, + expectedCapability: nil, + expectError: true, + }, } for _, test := range tests { @@ -145,7 +209,7 @@ func TestGetVolumeCapabilities(t *testing.T) { }, }, } - cap, err := GetVolumeCapabilities(&pv.Spec) + cap, err := GetVolumeCapabilities(&pv.Spec, test.supportsSingleNodeMultiWriter) if err == nil && test.expectError { t.Errorf("test %s: expected error, got none", test.name) diff --git a/vendor/github.com/kubernetes-csi/csi-lib-utils/accessmodes/access_modes.go b/vendor/github.com/kubernetes-csi/csi-lib-utils/accessmodes/access_modes.go new file mode 100644 index 000000000..c1aa060b8 --- /dev/null +++ b/vendor/github.com/kubernetes-csi/csi-lib-utils/accessmodes/access_modes.go @@ -0,0 +1,110 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package accessmodes + +import ( + "fmt" + + "github.com/container-storage-interface/spec/lib/go/csi" + v1 "k8s.io/api/core/v1" +) + +// ToCSIAccessMode maps PersistentVolume access modes in Kubernetes to CSI +// access modes. +func ToCSIAccessMode(pvAccessModes []v1.PersistentVolumeAccessMode) (csi.VolumeCapability_AccessMode_Mode, error) { + m := uniqueAccessModes(pvAccessModes) + + switch { + case m[v1.ReadWriteMany]: + // ReadWriteMany takes precedence, regardless of what other + // modes are set. + return csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, nil + + case m[v1.ReadOnlyMany] && m[v1.ReadWriteOnce]: + // This is not possible in the CSI spec. + return csi.VolumeCapability_AccessMode_UNKNOWN, fmt.Errorf("CSI does not support ReadOnlyMany and ReadWriteOnce on the same PersistentVolume") + + case m[v1.ReadOnlyMany] && m[v1.ReadWriteOncePod]: + // This is not possible in the CSI spec. + return csi.VolumeCapability_AccessMode_UNKNOWN, fmt.Errorf("CSI does not support ReadOnlyMany and ReadWriteOncePod on the same PersistentVolume") + + case m[v1.ReadWriteOnce] && m[v1.ReadWriteOncePod]: + // This is not possible in the CSI spec. + return csi.VolumeCapability_AccessMode_UNKNOWN, fmt.Errorf("CSI does not support ReadWriteOnce and ReadWriteOncePod on the same PersistentVolume") + + case m[v1.ReadOnlyMany]: + return csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, nil + + case m[v1.ReadWriteOnce]: + return csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, nil + + // This mapping exists to enable CSI drivers that lack the + // SINGLE_NODE_MULTI_WRITER capability to work with the + // ReadWriteOncePod access mode. + case m[v1.ReadWriteOncePod]: + return csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, nil + + default: + return csi.VolumeCapability_AccessMode_UNKNOWN, fmt.Errorf("unsupported AccessMode combination: %+v", pvAccessModes) + } +} + +// ToSingleNodeMultiWriterCapableCSIAccessMode maps PersistentVolume access +// modes in Kubernetes to CSI access modes for drivers that support the +// SINGLE_NODE_MULTI_WRITER capability. +func ToSingleNodeMultiWriterCapableCSIAccessMode(pvAccessModes []v1.PersistentVolumeAccessMode) (csi.VolumeCapability_AccessMode_Mode, error) { + m := uniqueAccessModes(pvAccessModes) + + switch { + case m[v1.ReadWriteMany]: + // ReadWriteMany trumps everything, regardless of what other + // modes are set. + return csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, nil + + case m[v1.ReadOnlyMany] && m[v1.ReadWriteOnce]: + // This is not possible in the CSI spec. + return csi.VolumeCapability_AccessMode_UNKNOWN, fmt.Errorf("CSI does not support ReadOnlyMany and ReadWriteOnce on the same PersistentVolume") + + case m[v1.ReadOnlyMany] && m[v1.ReadWriteOncePod]: + // This is not possible in the CSI spec. + return csi.VolumeCapability_AccessMode_UNKNOWN, fmt.Errorf("CSI does not support ReadOnlyMany and ReadWriteOncePod on the same PersistentVolume") + + case m[v1.ReadWriteOnce] && m[v1.ReadWriteOncePod]: + // This is not possible in the CSI spec. + return csi.VolumeCapability_AccessMode_UNKNOWN, fmt.Errorf("CSI does not support ReadWriteOnce and ReadWriteOncePod on the same PersistentVolume") + + case m[v1.ReadOnlyMany]: + return csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY, nil + + case m[v1.ReadWriteOnce]: + return csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER, nil + + case m[v1.ReadWriteOncePod]: + return csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER, nil + + default: + return csi.VolumeCapability_AccessMode_UNKNOWN, fmt.Errorf("unsupported AccessMode combination: %+v", pvAccessModes) + } +} + +func uniqueAccessModes(pvAccessModes []v1.PersistentVolumeAccessMode) map[v1.PersistentVolumeAccessMode]bool { + m := map[v1.PersistentVolumeAccessMode]bool{} + for _, mode := range pvAccessModes { + m[mode] = true + } + return m +} diff --git a/vendor/github.com/kubernetes-csi/csi-lib-utils/connection/connection.go b/vendor/github.com/kubernetes-csi/csi-lib-utils/connection/connection.go index fbd8d37b5..ad37321e8 100644 --- a/vendor/github.com/kubernetes-csi/csi-lib-utils/connection/connection.go +++ b/vendor/github.com/kubernetes-csi/csi-lib-utils/connection/connection.go @@ -84,7 +84,8 @@ func ExitOnConnectionLoss() func() bool { if err := ioutil.WriteFile(terminationLogPath, []byte(terminationMsg), 0644); err != nil { klog.Errorf("%s: %s", terminationLogPath, err) } - klog.Fatalf(terminationMsg) + klog.Exit(terminationMsg) + // Not reached. return false } } diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go index 788759bde..ab531f4c0 100644 --- a/vendor/google.golang.org/grpc/balancer/balancer.go +++ b/vendor/google.golang.org/grpc/balancer/balancer.go @@ -101,6 +101,9 @@ type SubConn interface { // a new connection will be created. // // This will trigger a state transition for the SubConn. + // + // Deprecated: This method is now part of the ClientConn interface and will + // eventually be removed from here. UpdateAddresses([]resolver.Address) // Connect starts the connecting for this SubConn. Connect() @@ -143,6 +146,13 @@ type ClientConn interface { // RemoveSubConn removes the SubConn from ClientConn. // The SubConn will be shutdown. RemoveSubConn(SubConn) + // UpdateAddresses updates the addresses used in the passed in SubConn. + // gRPC checks if the currently connected address is still in the new list. + // If so, the connection will be kept. Else, the connection will be + // gracefully closed, and a new connection will be created. + // + // This will trigger a state transition for the SubConn. + UpdateAddresses(SubConn, []resolver.Address) // UpdateState notifies gRPC that the balancer's internal state has // changed. diff --git a/vendor/google.golang.org/grpc/balancer/base/balancer.go b/vendor/google.golang.org/grpc/balancer/base/balancer.go index e0d34288c..c883efa0b 100644 --- a/vendor/google.golang.org/grpc/balancer/base/balancer.go +++ b/vendor/google.golang.org/grpc/balancer/base/balancer.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" + "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" @@ -41,7 +42,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) cc: cc, pickerBuilder: bb.pickerBuilder, - subConns: make(map[resolver.Address]balancer.SubConn), + subConns: make(map[resolver.Address]subConnInfo), scStates: make(map[balancer.SubConn]connectivity.State), csEvltr: &balancer.ConnectivityStateEvaluator{}, config: bb.config, @@ -57,6 +58,11 @@ func (bb *baseBuilder) Name() string { return bb.name } +type subConnInfo struct { + subConn balancer.SubConn + attrs *attributes.Attributes +} + type baseBalancer struct { cc balancer.ClientConn pickerBuilder PickerBuilder @@ -64,7 +70,7 @@ type baseBalancer struct { csEvltr *balancer.ConnectivityStateEvaluator state connectivity.State - subConns map[resolver.Address]balancer.SubConn // `attributes` is stripped from the keys of this map (the addresses) + subConns map[resolver.Address]subConnInfo // `attributes` is stripped from the keys of this map (the addresses) scStates map[balancer.SubConn]connectivity.State picker balancer.Picker config Config @@ -114,7 +120,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { aNoAttrs := a aNoAttrs.Attributes = nil addrsSet[aNoAttrs] = struct{}{} - if sc, ok := b.subConns[aNoAttrs]; !ok { + if scInfo, ok := b.subConns[aNoAttrs]; !ok { // a is a new address (not existing in b.subConns). // // When creating SubConn, the original address with attributes is @@ -125,7 +131,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err) continue } - b.subConns[aNoAttrs] = sc + b.subConns[aNoAttrs] = subConnInfo{subConn: sc, attrs: a.Attributes} b.scStates[sc] = connectivity.Idle sc.Connect() } else { @@ -135,13 +141,15 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { // The SubConn does a reflect.DeepEqual of the new and old // addresses. So this is a noop if the current address is the same // as the old one (including attributes). - sc.UpdateAddresses([]resolver.Address{a}) + scInfo.attrs = a.Attributes + b.subConns[aNoAttrs] = scInfo + b.cc.UpdateAddresses(scInfo.subConn, []resolver.Address{a}) } } - for a, sc := range b.subConns { + for a, scInfo := range b.subConns { // a was removed by resolver. if _, ok := addrsSet[a]; !ok { - b.cc.RemoveSubConn(sc) + b.cc.RemoveSubConn(scInfo.subConn) delete(b.subConns, a) // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. // The entry will be deleted in UpdateSubConnState. @@ -184,9 +192,10 @@ func (b *baseBalancer) regeneratePicker() { readySCs := make(map[balancer.SubConn]SubConnInfo) // Filter out all ready SCs from full subConn map. - for addr, sc := range b.subConns { - if st, ok := b.scStates[sc]; ok && st == connectivity.Ready { - readySCs[sc] = SubConnInfo{Address: addr} + for addr, scInfo := range b.subConns { + if st, ok := b.scStates[scInfo.subConn]; ok && st == connectivity.Ready { + addr.Attributes = scInfo.attrs + readySCs[scInfo.subConn] = SubConnInfo{Address: addr} } } b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs}) diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go index 11e592aab..41061d6d3 100644 --- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go +++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go @@ -163,6 +163,14 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) } +func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { + acbw, ok := sc.(*acBalancerWrapper) + if !ok { + return + } + acbw.UpdateAddresses(addrs) +} + func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { ccb.mu.Lock() defer ccb.mu.Unlock() diff --git a/vendor/google.golang.org/grpc/go.mod b/vendor/google.golang.org/grpc/go.mod index cab74e557..b177cfa66 100644 --- a/vendor/google.golang.org/grpc/go.mod +++ b/vendor/google.golang.org/grpc/go.mod @@ -4,7 +4,7 @@ go 1.11 require ( github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 - github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad + github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/golang/protobuf v1.4.2 github.com/google/go-cmp v0.5.0 diff --git a/vendor/google.golang.org/grpc/go.sum b/vendor/google.golang.org/grpc/go.sum index 77ee70b44..bb25cd491 100644 --- a/vendor/google.golang.org/grpc/go.sum +++ b/vendor/google.golang.org/grpc/go.sum @@ -12,8 +12,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= -github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad h1:EmNYJhPYy0pOFjCx2PrgtaBXmee0iUX9hLlxE1xHOJE= -github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d h1:QyzYnTnPE15SQyUeqU6qLbWxMkwyAyu+vGksa0b7j00= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= diff --git a/vendor/google.golang.org/grpc/internal/resolver/config_selector.go b/vendor/google.golang.org/grpc/internal/resolver/config_selector.go index e69900400..5e7f36703 100644 --- a/vendor/google.golang.org/grpc/internal/resolver/config_selector.go +++ b/vendor/google.golang.org/grpc/internal/resolver/config_selector.go @@ -24,6 +24,7 @@ import ( "sync" "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" ) @@ -51,6 +52,74 @@ type RPCConfig struct { Context context.Context MethodConfig serviceconfig.MethodConfig // configuration to use for this RPC OnCommitted func() // Called when the RPC has been committed (retries no longer possible) + Interceptor ClientInterceptor +} + +// ClientStream is the same as grpc.ClientStream, but defined here for circular +// dependency reasons. +type ClientStream interface { + // Header returns the header metadata received from the server if there + // is any. It blocks if the metadata is not ready to read. + Header() (metadata.MD, error) + // Trailer returns the trailer metadata from the server, if there is any. + // It must only be called after stream.CloseAndRecv has returned, or + // stream.Recv has returned a non-nil error (including io.EOF). + Trailer() metadata.MD + // CloseSend closes the send direction of the stream. It closes the stream + // when non-nil error is met. It is also not safe to call CloseSend + // concurrently with SendMsg. + CloseSend() error + // Context returns the context for this stream. + // + // It should not be called until after Header or RecvMsg has returned. Once + // called, subsequent client-side retries are disabled. + Context() context.Context + // SendMsg is generally called by generated code. On error, SendMsg aborts + // the stream. If the error was generated by the client, the status is + // returned directly; otherwise, io.EOF is returned and the status of + // the stream may be discovered using RecvMsg. + // + // SendMsg blocks until: + // - There is sufficient flow control to schedule m with the transport, or + // - The stream is done, or + // - The stream breaks. + // + // SendMsg does not wait until the message is received by the server. An + // untimely stream closure may result in lost messages. To ensure delivery, + // users should ensure the RPC completed successfully using RecvMsg. + // + // It is safe to have a goroutine calling SendMsg and another goroutine + // calling RecvMsg on the same stream at the same time, but it is not safe + // to call SendMsg on the same stream in different goroutines. It is also + // not safe to call CloseSend concurrently with SendMsg. + SendMsg(m interface{}) error + // RecvMsg blocks until it receives a message into m or the stream is + // done. It returns io.EOF when the stream completes successfully. On + // any other error, the stream is aborted and the error contains the RPC + // status. + // + // It is safe to have a goroutine calling SendMsg and another goroutine + // calling RecvMsg on the same stream at the same time, but it is not + // safe to call RecvMsg on the same stream in different goroutines. + RecvMsg(m interface{}) error +} + +// ClientInterceptor is an interceptor for gRPC client streams. +type ClientInterceptor interface { + // NewStream produces a ClientStream for an RPC which may optionally use + // the provided function to produce a stream for delegation. Note: + // RPCInfo.Context should not be used (will be nil). + // + // done is invoked when the RPC is finished using its connection, or could + // not be assigned a connection. RPC operations may still occur on + // ClientStream after done is called, since the interceptor is invoked by + // application-layer operations. done must never be nil when called. + NewStream(ctx context.Context, ri RPCInfo, done func(), newStream func(ctx context.Context, done func()) (ClientStream, error)) (ClientStream, error) +} + +// ServerInterceptor is unimplementable; do not use. +type ServerInterceptor interface { + notDefined() } type csKeyType string diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go index 8902b7f90..d5bbe720d 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go @@ -414,6 +414,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { buf: newRecvBuffer(), headerChan: make(chan struct{}), contentSubtype: callHdr.ContentSubtype, + doneFunc: callHdr.DoneFunc, } s.wq = newWriteQuota(defaultWriteQuota, s.done) s.requestRead = func(n int) { @@ -832,6 +833,9 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2. t.controlBuf.executeAndPut(addBackStreamQuota, cleanup) // This will unblock write. close(s.done) + if s.doneFunc != nil { + s.doneFunc() + } } // Close kicks off the shutdown process of the transport. This should be called diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go index 0cf1cc320..7c6c89d4f 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go @@ -26,6 +26,7 @@ import ( "io" "math" "net" + "net/http" "strconv" "sync" "sync/atomic" @@ -402,6 +403,20 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( return true } t.maxStreamID = streamID + if state.data.httpMethod != http.MethodPost { + t.mu.Unlock() + if logger.V(logLevel) { + logger.Warningf("transport: http2Server.operateHeaders parsed a :method field: %v which should be POST", state.data.httpMethod) + } + t.controlBuf.put(&cleanupStream{ + streamID: streamID, + rst: true, + rstCode: http2.ErrCodeProtocol, + onWrite: func() {}, + }) + s.cancel() + return false + } t.activeStreams[streamID] = s if len(t.activeStreams) == 1 { t.idle = time.Time{} diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go index 7e41d1183..c7dee140c 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http_util.go +++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go @@ -111,6 +111,7 @@ type parsedHeaderData struct { timeoutSet bool timeout time.Duration method string + httpMethod string // key-value metadata map from the peer. mdata map[string][]string statsTags []byte @@ -363,6 +364,8 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) { } d.data.statsTrace = v d.addMetadata(f.Name, string(v)) + case ":method": + d.data.httpMethod = f.Value default: if isReservedHeader(f.Name) && !isWhitelistedHeader(f.Name) { break diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go index 9c8f79cb4..5cf7c5f80 100644 --- a/vendor/google.golang.org/grpc/internal/transport/transport.go +++ b/vendor/google.golang.org/grpc/internal/transport/transport.go @@ -241,6 +241,7 @@ type Stream struct { ctx context.Context // the associated context of the stream cancel context.CancelFunc // always nil for client side Stream done chan struct{} // closed at the end of stream to unblock writers. On the client side. + doneFunc func() // invoked at the end of stream on client side. ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance) method string // the associated RPC method of the stream recvCompress string @@ -611,6 +612,8 @@ type CallHdr struct { ContentSubtype string PreviousAttempts int // value of grpc-previous-rpc-attempts header to set + + DoneFunc func() // called when the stream is finished } // ClientTransport is the common interface for all gRPC client-side transport diff --git a/vendor/google.golang.org/grpc/pickfirst.go b/vendor/google.golang.org/grpc/pickfirst.go index 56e33f6c7..b858c2a5e 100644 --- a/vendor/google.golang.org/grpc/pickfirst.go +++ b/vendor/google.golang.org/grpc/pickfirst.go @@ -84,7 +84,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) e b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}}) b.sc.Connect() } else { - b.sc.UpdateAddresses(cs.ResolverState.Addresses) + b.cc.UpdateAddresses(b.sc, cs.ResolverState.Addresses) b.sc.Connect() } return nil diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index eda1248d6..77d25742c 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -166,7 +166,6 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } }() } - c := defaultCallInfo() // Provide an opportunity for the first RPC to see the first service config // provided by the resolver. if err := cc.waitForResolvedAddrs(ctx); err != nil { @@ -175,18 +174,40 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth var mc serviceconfig.MethodConfig var onCommit func() - rpcConfig, err := cc.safeConfigSelector.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: method}) + var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) { + return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...) + } + + rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method} + rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo) if err != nil { - return nil, status.Convert(err).Err() + return nil, toRPCErr(err) } + if rpcConfig != nil { if rpcConfig.Context != nil { ctx = rpcConfig.Context } mc = rpcConfig.MethodConfig onCommit = rpcConfig.OnCommitted + if rpcConfig.Interceptor != nil { + rpcInfo.Context = nil + ns := newStream + newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) { + cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns) + if err != nil { + return nil, toRPCErr(err) + } + return cs, nil + } + } } + return newStream(ctx, func() {}) +} + +func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) { + c := defaultCallInfo() if mc.WaitForReady != nil { c.failFast = !*mc.WaitForReady } @@ -223,6 +244,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth Host: cc.authority, Method: method, ContentSubtype: c.contentSubtype, + DoneFunc: doneFunc, } // Set our outgoing compression according to the UseCompressor CallOption, if diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go index 51024d6b3..c3b87eb5a 100644 --- a/vendor/google.golang.org/grpc/version.go +++ b/vendor/google.golang.org/grpc/version.go @@ -19,4 +19,4 @@ package grpc // Version is the current grpc version. -const Version = "1.36.0" +const Version = "1.37.0" diff --git a/vendor/google.golang.org/grpc/vet.sh b/vendor/google.golang.org/grpc/vet.sh index b41df6dc8..dcd939bb3 100644 --- a/vendor/google.golang.org/grpc/vet.sh +++ b/vendor/google.golang.org/grpc/vet.sh @@ -28,7 +28,8 @@ cleanup() { } trap cleanup EXIT -PATH="${GOPATH}/bin:${GOROOT}/bin:${PATH}" +PATH="${HOME}/go/bin:${GOROOT}/bin:${PATH}" +go version if [[ "$1" = "-install" ]]; then # Check for module support @@ -107,7 +108,7 @@ go list -f {{.Dir}} ./... | xargs go run test/go_vet/vet.go # - gofmt, goimports, golint (with exceptions for generated code), go vet. gofmt -s -d -l . 2>&1 | fail_on_output goimports -l . 2>&1 | not grep -vE "\.pb\.go" -golint ./... 2>&1 | not grep -vE "\.pb\.go:" +golint ./... 2>&1 | not grep -vE "/testv3\.pb\.go:" go vet -all ./... misspell -error . @@ -141,8 +142,11 @@ not grep -Fv '.CredsBundle .NewAddress .NewServiceConfig .Type is deprecated: use Attributes +BuildVersion is deprecated balancer.ErrTransientFailure balancer.Picker +extDesc.Filename is deprecated +github.com/golang/protobuf/jsonpb is deprecated grpc.CallCustomCodec grpc.Code grpc.Compressor @@ -164,13 +168,7 @@ grpc.WithServiceConfig grpc.WithTimeout http.CloseNotifier info.SecurityVersion -resolver.Backend -resolver.GRPCLB -extDesc.Filename is deprecated -BuildVersion is deprecated -github.com/golang/protobuf/jsonpb is deprecated proto is deprecated -xxx_messageInfo_ proto.InternalMessageInfo is deprecated proto.EnumName is deprecated proto.ErrInternalBadWireType is deprecated @@ -184,7 +182,12 @@ proto.RegisterExtension is deprecated proto.RegisteredExtension is deprecated proto.RegisteredExtensions is deprecated proto.RegisterMapType is deprecated -proto.Unmarshaler is deprecated' "${SC_OUT}" +proto.Unmarshaler is deprecated +resolver.Backend +resolver.GRPCLB +Target is deprecated: Use the Target field in the BuildOptions instead. +xxx_messageInfo_ +' "${SC_OUT}" # - special golint on package comments. lint_package_comment_per_package() { diff --git a/vendor/modules.txt b/vendor/modules.txt index 64c82143e..013c23857 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -58,8 +58,9 @@ github.com/hashicorp/golang-lru/simplelru github.com/imdario/mergo # github.com/json-iterator/go v1.1.10 github.com/json-iterator/go -# github.com/kubernetes-csi/csi-lib-utils v0.9.1 +# github.com/kubernetes-csi/csi-lib-utils v0.9.1 => github.com/chrishenzie/csi-lib-utils v0.9.2-0.20210603000358-d0686c7b81af ## explicit +github.com/kubernetes-csi/csi-lib-utils/accessmodes github.com/kubernetes-csi/csi-lib-utils/connection github.com/kubernetes-csi/csi-lib-utils/leaderelection github.com/kubernetes-csi/csi-lib-utils/metrics @@ -137,7 +138,7 @@ google.golang.org/appengine/urlfetch # google.golang.org/genproto v0.0.0-20210317182105-75c7a8546eb9 ## explicit google.golang.org/genproto/googleapis/rpc/status -# google.golang.org/grpc v1.36.0 +# google.golang.org/grpc v1.37.0 ## explicit google.golang.org/grpc google.golang.org/grpc/attributes @@ -224,7 +225,7 @@ gopkg.in/yaml.v2 # gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ## explicit gopkg.in/yaml.v3 -# k8s.io/api v0.21.0 => github.com/chrishenzie/kubernetes/staging/src/k8s.io/api v0.0.0-20210507180302-a29b4b67ec78 +# k8s.io/api v0.21.1 => github.com/chrishenzie/kubernetes/staging/src/k8s.io/api v0.0.0-20210507180302-a29b4b67ec78 ## explicit k8s.io/api/admissionregistration/v1 k8s.io/api/admissionregistration/v1beta1 @@ -315,7 +316,7 @@ k8s.io/apimachinery/pkg/version k8s.io/apimachinery/pkg/watch k8s.io/apimachinery/third_party/forked/golang/json k8s.io/apimachinery/third_party/forked/golang/reflect -# k8s.io/client-go v0.21.0 => github.com/chrishenzie/kubernetes/staging/src/k8s.io/client-go v0.0.0-20210507180302-a29b4b67ec78 +# k8s.io/client-go v0.21.1 => github.com/chrishenzie/kubernetes/staging/src/k8s.io/client-go v0.0.0-20210507180302-a29b4b67ec78 ## explicit k8s.io/client-go/applyconfigurations/admissionregistration/v1 k8s.io/client-go/applyconfigurations/admissionregistration/v1beta1 @@ -576,8 +577,7 @@ k8s.io/client-go/util/flowcontrol k8s.io/client-go/util/homedir k8s.io/client-go/util/keyutil k8s.io/client-go/util/workqueue -# k8s.io/component-base v0.21.0 => github.com/chrishenzie/kubernetes/staging/src/k8s.io/component-base v0.0.0-20210507180302-a29b4b67ec78 -## explicit +# k8s.io/component-base v0.21.1 => github.com/chrishenzie/kubernetes/staging/src/k8s.io/component-base v0.0.0-20210507180302-a29b4b67ec78 k8s.io/component-base/metrics k8s.io/component-base/version # k8s.io/csi-translation-lib v0.21.0 => github.com/chrishenzie/kubernetes/staging/src/k8s.io/csi-translation-lib v0.0.0-20210507180302-a29b4b67ec78 @@ -603,6 +603,7 @@ sigs.k8s.io/structured-merge-diff/v4/typed sigs.k8s.io/structured-merge-diff/v4/value # sigs.k8s.io/yaml v1.2.0 sigs.k8s.io/yaml +# github.com/kubernetes-csi/csi-lib-utils => github.com/chrishenzie/csi-lib-utils v0.9.2-0.20210603000358-d0686c7b81af # k8s.io/component-base => github.com/chrishenzie/kubernetes/staging/src/k8s.io/component-base v0.0.0-20210507180302-a29b4b67ec78 # k8s.io/node-api => github.com/chrishenzie/kubernetes/staging/src/k8s.io/node-api v0.0.0-20210507180302-a29b4b67ec78 # k8s.io/api => github.com/chrishenzie/kubernetes/staging/src/k8s.io/api v0.0.0-20210507180302-a29b4b67ec78