Skip to content

Commit

Permalink
Merge pull request kubernetes#24202 from sttts/sttts-restore-dropped-…
Browse files Browse the repository at this point in the history
…upstreams-jsafrane

Bug 1776351: Restore UPSTREAMs dropped in kubernetes#24159
  • Loading branch information
openshift-merge-robot authored Nov 26, 2019
2 parents ddc8d37 + 6827d56 commit 0c0e943
Show file tree
Hide file tree
Showing 18 changed files with 382 additions and 141 deletions.
49 changes: 29 additions & 20 deletions pkg/kubelet/volumemanager/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"time"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -352,10 +353,10 @@ type reconstructedVolume struct {
volumeSpec *volumepkg.Spec
outerVolumeSpecName string
pod *v1.Pod
attachablePlugin volumepkg.AttachableVolumePlugin
volumeGidValue string
devicePath string
mounter volumepkg.Mounter
deviceMounter volumepkg.DeviceMounter
blockVolumeMapper volumepkg.BlockVolumeMapper
}

Expand Down Expand Up @@ -499,6 +500,7 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,

var volumeMapper volumepkg.BlockVolumeMapper
var volumeMounter volumepkg.Mounter
var deviceMounter volumepkg.DeviceMounter
// Path to the mount or block device to check
var checkPath string

Expand All @@ -518,7 +520,8 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
pod.UID,
newMapperErr)
}
checkPath, _ = volumeMapper.GetPodDeviceMapPath()
mapDir, linkName := volumeMapper.GetPodDeviceMapPath()
checkPath = filepath.Join(mapDir, linkName)
} else {
var err error
volumeMounter, err = plugin.NewMounter(
Expand All @@ -535,6 +538,17 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
err)
}
checkPath = volumeMounter.GetPath()
if deviceMountablePlugin != nil {
deviceMounter, err = deviceMountablePlugin.NewDeviceMounter()
if err != nil {
return nil, fmt.Errorf("reconstructVolume.NewDeviceMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
uniqueVolumeName,
volumeSpec.Name(),
volume.podName,
pod.UID,
err)
}
}
}

// Check existence of mount point for filesystem volume or symbolic link for block volume
Expand All @@ -556,7 +570,7 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume,
// TODO: in case pod is added back before reconciler starts to unmount, we can update this field from desired state information
outerVolumeSpecName: volume.volumeSpecName,
pod: pod,
attachablePlugin: attachablePlugin,
deviceMounter: deviceMounter,
volumeGidValue: "",
// devicePath is updated during updateStates() by checking node status's VolumesAttached data.
// TODO: get device path directly from the volume mount path.
Expand All @@ -583,25 +597,19 @@ func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName
}
}

// getDeviceMountPath returns device mount path for block volume which
// implements BlockVolumeMapper or filesystem volume which implements
// DeviceMounter
func getDeviceMountPath(volume *reconstructedVolume) (string, error) {
volumeAttacher, err := volume.attachablePlugin.NewAttacher()
if volumeAttacher == nil || err != nil {
return "", err
}
deviceMountPath, err :=
volumeAttacher.GetDeviceMountPath(volume.volumeSpec)
if err != nil {
return "", err
}

if volume.blockVolumeMapper != nil {
deviceMountPath, err =
volume.blockVolumeMapper.GetGlobalMapPath(volume.volumeSpec)
if err != nil {
return "", err
}
// for block volume, we return its global map path
return volume.blockVolumeMapper.GetGlobalMapPath(volume.volumeSpec)
} else if volume.deviceMounter != nil {
// for filesystem volume, we return its device mount path if the plugin implements DeviceMounter
return volume.deviceMounter.GetDeviceMountPath(volume.volumeSpec)
} else {
return "", fmt.Errorf("blockVolumeMapper or deviceMounter required")
}
return deviceMountPath, nil
}

func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*reconstructedVolume) error {
Expand Down Expand Up @@ -630,7 +638,8 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*re
continue
}
klog.V(4).Infof("Volume: %s (pod UID %s) is marked as mounted and added into the actual state", volume.volumeName, volume.podName)
if volume.attachablePlugin != nil {
// If the volume has device to mount, we mark its device as mounted.
if volume.deviceMounter != nil || volume.blockVolumeMapper != nil {
deviceMountPath, err := getDeviceMountPath(volume)
if err != nil {
klog.Errorf("Could not find device mount path for volume %s", volume.volumeName)
Expand Down
38 changes: 5 additions & 33 deletions pkg/volume/awsebs/aws_ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,41 +263,13 @@ func (plugin *awsElasticBlockStorePlugin) ConstructVolumeSpec(volName, mountPath
if err != nil {
return nil, err
}
// This is a workaround to fix the issue in converting aws volume id from globalPDPath
// There are three aws volume id formats and their volumeID from GetDeviceNameFromMount() are:
// aws:///vol-1234 (aws/vol-1234)
// aws://us-east-1/vol-1234 (aws/us-east-1/vol-1234)
// vol-1234 (vol-1234)
// This code is for converting volume id to aws style volume id for the first two cases.
sourceName := volumeID
if strings.HasPrefix(volumeID, "aws/") {
names := strings.Split(volumeID, "/")
length := len(names)
if length < 2 || length > 3 {
return nil, fmt.Errorf("Failed to get AWS volume id from mount path %q: invalid volume name format %q", mountPath, volumeID)
}
volName := names[length-1]
if !strings.HasPrefix(volName, "vol-") {
return nil, fmt.Errorf("Invalid volume name format for AWS volume (%q) retrieved from mount path %q", volName, mountPath)
}
if length == 2 {
sourceName = awsURLNamePrefix + "" + "/" + volName // empty zone label
}
if length == 3 {
sourceName = awsURLNamePrefix + names[1] + "/" + volName // names[1] is the zone label
}
klog.V(4).Infof("Convert aws volume name from %q to %q ", volumeID, sourceName)
volumeID, err = formatVolumeID(volumeID)
if err != nil {
return nil, fmt.Errorf("failed to get AWS volume id from mount path %q: %v", mountPath, err)
}

awsVolume := &v1.Volume{
Name: volName,
VolumeSource: v1.VolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
VolumeID: sourceName,
},
},
}
return volume.NewSpecFromVolume(awsVolume), nil
file := v1.PersistentVolumeFilesystem
return newAWSVolumeSpec(volName, volumeID, file), nil
}

func (plugin *awsElasticBlockStorePlugin) RequiresFSResize() bool {
Expand Down
31 changes: 12 additions & 19 deletions pkg/volume/awsebs/aws_ebs_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,34 +51,27 @@ func (plugin *awsElasticBlockStorePlugin) ConstructBlockVolumeSpec(podUID types.
return nil, fmt.Errorf("failed to get volume plugin information from globalMapPathUUID: %v", globalMapPathUUID)
}

return getVolumeSpecFromGlobalMapPath(globalMapPath)
return plugin.getVolumeSpecFromGlobalMapPath(volumeName, globalMapPath)
}

func getVolumeSpecFromGlobalMapPath(globalMapPath string) (*volume.Spec, error) {
func (plugin *awsElasticBlockStorePlugin) getVolumeSpecFromGlobalMapPath(volumeName string, globalMapPath string) (*volume.Spec, error) {
// Get volume spec information from globalMapPath
// globalMapPath example:
// plugins/kubernetes.io/{PluginName}/{DefaultKubeletVolumeDevicesDirName}/{volumeID}
// plugins/kubernetes.io/aws-ebs/volumeDevices/vol-XXXXXX
vID := filepath.Base(globalMapPath)
if len(vID) <= 1 {
return nil, fmt.Errorf("failed to get volumeID from global path=%s", globalMapPath)
}
if !strings.Contains(vID, "vol-") {
return nil, fmt.Errorf("failed to get volumeID from global path=%s, invalid volumeID format = %s", globalMapPath, vID)
pluginDir := plugin.host.GetVolumeDevicePluginDir(awsElasticBlockStorePluginName)
if !strings.HasPrefix(globalMapPath, pluginDir) {
return nil, fmt.Errorf("volume symlink %s is not in global plugin directory", globalMapPath)
}
block := v1.PersistentVolumeBlock
awsVolume := &v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
VolumeID: vID,
},
},
VolumeMode: &block,
},
fullVolumeID := strings.TrimPrefix(globalMapPath, pluginDir) // /vol-XXXXXX
fullVolumeID = strings.TrimLeft(fullVolumeID, "/") // vol-XXXXXX
vID, err := formatVolumeID(fullVolumeID)
if err != nil {
return nil, fmt.Errorf("failed to get AWS volume id from map path %q: %v", globalMapPath, err)
}

return volume.NewSpecFromPersistentVolume(awsVolume, true), nil
block := v1.PersistentVolumeBlock
return newAWSVolumeSpec(volumeName, vID, block), nil
}

// NewBlockVolumeMapper creates a new volume.BlockVolumeMapper from an API specification.
Expand Down
15 changes: 13 additions & 2 deletions pkg/volume/awsebs/aws_ebs_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,28 @@ func TestGetVolumeSpecFromGlobalMapPath(t *testing.T) {

expectedGlobalPath := filepath.Join(tmpVDir, testGlobalPath)

plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, volumetest.NewFakeVolumeHost(tmpVDir, nil, nil))
plug, err := plugMgr.FindMapperPluginByName(awsElasticBlockStorePluginName)
if err != nil {
os.RemoveAll(tmpVDir)
t.Fatalf("Can't find the plugin by name: %q", awsElasticBlockStorePluginName)
}

//Bad Path
badspec, err := getVolumeSpecFromGlobalMapPath("")
badspec, err := plug.(*awsElasticBlockStorePlugin).getVolumeSpecFromGlobalMapPath("", "")
if badspec != nil || err == nil {
t.Fatalf("Expected not to get spec from GlobalMapPath but did")
}

// Good Path
spec, err := getVolumeSpecFromGlobalMapPath(expectedGlobalPath)
spec, err := plug.(*awsElasticBlockStorePlugin).getVolumeSpecFromGlobalMapPath("myVolume", expectedGlobalPath)
if spec == nil || err != nil {
t.Fatalf("Failed to get spec from GlobalMapPath: %v", err)
}
if spec.PersistentVolume.Name != "myVolume" {
t.Errorf("Invalid PV name from GlobalMapPath spec: %s", spec.PersistentVolume.Name)
}
if spec.PersistentVolume.Spec.AWSElasticBlockStore.VolumeID != testVolName {
t.Errorf("Invalid volumeID from GlobalMapPath spec: %s", spec.PersistentVolume.Spec.AWSElasticBlockStore.VolumeID)
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/volume/awsebs/aws_ebs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,3 +414,36 @@ func TestGetCandidateZone(t *testing.T) {
assert.Equal(t, test.expectedZones, zones)
}
}

func TestFormatVolumeID(t *testing.T) {
tests := []struct {
volumeIDFromPath string
expectedVolumeID string
}{
{
"aws/vol-1234",
"aws:///vol-1234",
},
{
"aws:/vol-1234",
"aws:///vol-1234",
},
{
"aws/us-east-1/vol-1234",
"aws://us-east-1/vol-1234",
},
{
"aws:/us-east-1/vol-1234",
"aws://us-east-1/vol-1234",
},
{
"vol-1234",
"vol-1234",
},
}
for _, test := range tests {
volumeID, err := formatVolumeID(test.volumeIDFromPath)
assert.Nil(t, err)
assert.Equal(t, test.expectedVolumeID, volumeID, test.volumeIDFromPath)
}
}
48 changes: 48 additions & 0 deletions pkg/volume/awsebs/aws_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
cloudprovider "k8s.io/cloud-provider"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
Expand Down Expand Up @@ -287,3 +288,50 @@ func findNvmeVolume(findName string) (device string, err error) {

return resolved, nil
}

func formatVolumeID(volumeID string) (string, error) {
// This is a workaround to fix the issue in converting aws volume id from globalPDPath and globalMapPath
// There are three formats for AWSEBSVolumeSource.VolumeID and they are stored on disk in paths like so:
// VolumeID mountPath mapPath
// aws:///vol-1234 aws/vol-1234 aws:/vol-1234
// aws://us-east-1/vol-1234 aws/us-east-1/vol-1234 aws:/us-east-1/vol-1234
// vol-1234 vol-1234 vol-1234
// This code is for converting volume ids from paths back to AWS style VolumeIDs
sourceName := volumeID
if strings.HasPrefix(volumeID, "aws/") || strings.HasPrefix(volumeID, "aws:/") {
names := strings.Split(volumeID, "/")
length := len(names)
if length < 2 || length > 3 {
return "", fmt.Errorf("invalid volume name format %q", volumeID)
}
volName := names[length-1]
if !strings.HasPrefix(volName, "vol-") {
return "", fmt.Errorf("Invalid volume name format for AWS volume (%q)", volName)
}
if length == 2 {
sourceName = awsURLNamePrefix + "" + "/" + volName // empty zone label
}
if length == 3 {
sourceName = awsURLNamePrefix + names[1] + "/" + volName // names[1] is the zone label
}
klog.V(4).Infof("Convert aws volume name from %q to %q ", volumeID, sourceName)
}
return sourceName, nil
}

func newAWSVolumeSpec(volumeName, volumeID string, mode v1.PersistentVolumeMode) *volume.Spec {
awsVolume := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: volumeName,
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
VolumeID: volumeID,
},
},
VolumeMode: &mode,
},
}
return volume.NewSpecFromPersistentVolume(awsVolume, false)
}
10 changes: 7 additions & 3 deletions pkg/volume/cinder/cinder_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
"fmt"
"path/filepath"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/util/mount"
Expand Down Expand Up @@ -53,10 +54,10 @@ func (plugin *cinderPlugin) ConstructBlockVolumeSpec(podUID types.UID, volumeNam
return nil, fmt.Errorf("failed to get volume plugin information from globalMapPathUUID: %v", globalMapPathUUID)
}

return getVolumeSpecFromGlobalMapPath(globalMapPath)
return getVolumeSpecFromGlobalMapPath(volumeName, globalMapPath)
}

func getVolumeSpecFromGlobalMapPath(globalMapPath string) (*volume.Spec, error) {
func getVolumeSpecFromGlobalMapPath(volumeName, globalMapPath string) (*volume.Spec, error) {
// Get volume spec information from globalMapPath
// globalMapPath example:
// plugins/kubernetes.io/{PluginName}/{DefaultKubeletVolumeDevicesDirName}/{volumeID}
Expand All @@ -67,6 +68,9 @@ func getVolumeSpecFromGlobalMapPath(globalMapPath string) (*volume.Spec, error)
}
block := v1.PersistentVolumeBlock
cinderVolume := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: volumeName,
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
Cinder: &v1.CinderPersistentVolumeSource{
Expand Down
9 changes: 6 additions & 3 deletions pkg/volume/cinder/cinder_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"path/filepath"
"testing"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utiltesting "k8s.io/client-go/util/testing"
Expand Down Expand Up @@ -52,16 +52,19 @@ func TestGetVolumeSpecFromGlobalMapPath(t *testing.T) {
expectedGlobalPath := filepath.Join(tmpVDir, testGlobalPath)

//Bad Path
badspec, err := getVolumeSpecFromGlobalMapPath("")
badspec, err := getVolumeSpecFromGlobalMapPath("", "")
if badspec != nil || err == nil {
t.Errorf("Expected not to get spec from GlobalMapPath but did")
}

// Good Path
spec, err := getVolumeSpecFromGlobalMapPath(expectedGlobalPath)
spec, err := getVolumeSpecFromGlobalMapPath("myVolume", expectedGlobalPath)
if spec == nil || err != nil {
t.Fatalf("Failed to get spec from GlobalMapPath: %v", err)
}
if spec.PersistentVolume.Name != "myVolume" {
t.Errorf("Invalid PV name from GlobalMapPath spec: %s", spec.PersistentVolume.Name)
}
if spec.PersistentVolume.Spec.Cinder.VolumeID != testVolName {
t.Errorf("Invalid volumeID from GlobalMapPath spec: %s", spec.PersistentVolume.Spec.Cinder.VolumeID)
}
Expand Down
Loading

0 comments on commit 0c0e943

Please sign in to comment.