Skip to content

Commit

Permalink
Merge pull request #6258 from tstromberg/unthread-start
Browse files Browse the repository at this point in the history
Replace integration-test hacks with mutex locks around shared resources
  • Loading branch information
tstromberg authored Jan 10, 2020
2 parents 75082b6 + 0c07e80 commit 0a9e8b0
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 60 deletions.
29 changes: 29 additions & 0 deletions pkg/minikube/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math"
"net"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/docker/machine/libmachine/ssh"
"github.com/docker/machine/libmachine/state"
"github.com/golang/glog"
"github.com/juju/mutex"
"github.com/pkg/errors"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/disk"
Expand All @@ -52,6 +54,7 @@ import (
"k8s.io/minikube/pkg/minikube/localpath"
"k8s.io/minikube/pkg/minikube/out"
"k8s.io/minikube/pkg/minikube/registry"
"k8s.io/minikube/pkg/util/lock"
"k8s.io/minikube/pkg/util/retry"
)

Expand Down Expand Up @@ -88,6 +91,17 @@ func CacheISO(config cfg.MachineConfig) error {

// StartHost starts a host VM.
func StartHost(api libmachine.API, config cfg.MachineConfig) (*host.Host, error) {
// Prevent machine-driver boot races, as well as our own certificate race
releaser, err := acquireMachinesLock(config.Name)
if err != nil {
return nil, errors.Wrap(err, "boot lock")
}
start := time.Now()
defer func() {
glog.Infof("releasing machines lock for %q, held for %s", config.Name, time.Since(start))
releaser.Release()
}()

exists, err := api.Exists(config.Name)
if err != nil {
return nil, errors.Wrapf(err, "exists: %s", config.Name)
Expand Down Expand Up @@ -138,6 +152,21 @@ func StartHost(api libmachine.API, config cfg.MachineConfig) (*host.Host, error)
return h, nil
}

// acquireMachinesLock protects against code that is not parallel-safe (libmachine, cert setup)
func acquireMachinesLock(name string) (mutex.Releaser, error) {
spec := lock.PathMutexSpec(filepath.Join(localpath.MiniPath(), "machines"))
// NOTE: Provisioning generally completes within 60 seconds
spec.Timeout = 10 * time.Minute

glog.Infof("acquiring machines lock for %s: %+v", name, spec)
start := time.Now()
r, err := mutex.Acquire(spec)
if err == nil {
glog.Infof("acquired machines lock for %q in %s", name, time.Since(start))
}
return r, err
}

// configureHost handles any post-powerup configuration required
func configureHost(h *host.Host, e *engine.Options) error {
start := time.Now()
Expand Down
14 changes: 14 additions & 0 deletions pkg/minikube/driver/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@ import (
"path/filepath"
"regexp"
"strings"
"time"

"github.com/blang/semver"
"github.com/golang/glog"
"github.com/hashicorp/go-getter"
"github.com/juju/mutex"
"github.com/pkg/errors"

"k8s.io/minikube/pkg/minikube/out"
"k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/util/lock"
)

// InstallOrUpdate downloads driver if it is not present, or updates it if there's a newer version
Expand All @@ -40,6 +43,17 @@ func InstallOrUpdate(name string, directory string, v semver.Version, interactiv
}

executable := fmt.Sprintf("docker-machine-driver-%s", name)

// Lock before we check for existence to avoid thundering herd issues
spec := lock.PathMutexSpec(executable)
spec.Timeout = 10 * time.Minute
glog.Infof("acquiring lock: %+v", spec)
releaser, err := mutex.Acquire(spec)
if err != nil {
return errors.Wrapf(err, "unable to acquire lock for %+v", spec)
}
defer releaser.Release()

exists := driverExists(executable)
path, err := validateDriver(executable, v)
if !exists || (err != nil && autoUpdate) {
Expand Down
34 changes: 23 additions & 11 deletions pkg/minikube/image/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
"github.com/golang/glog"
"github.com/google/go-containerregistry/pkg/name"
"github.com/google/go-containerregistry/pkg/v1/tarball"
"github.com/juju/mutex"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/localpath"
"k8s.io/minikube/pkg/util/lock"
)

// DeleteFromCacheDir deletes tar files stored in cache dir
Expand Down Expand Up @@ -72,23 +74,33 @@ func SaveToDir(images []string, cacheDir string) error {
}

// saveToTarFile caches an image
func saveToTarFile(image, dst string) error {
func saveToTarFile(image, rawDest string) error {
start := time.Now()
defer func() {
glog.Infof("cache image %q -> %s to local destination -> %q", image, dst, time.Since(start))
glog.Infof("cache image %q -> %q took %s", image, rawDest, time.Since(start))
}()

if _, err := os.Stat(dst); err == nil {
glog.Infof("%s exists", dst)
return nil
// OS-specific mangling of destination path
dst, err := localpath.DstPath(rawDest)
if err != nil {
return errors.Wrap(err, "getting destination path")
}

dstPath, err := localpath.DstPath(dst)
spec := lock.PathMutexSpec(dst)
spec.Timeout = 10 * time.Minute
glog.Infof("acquiring lock: %+v", spec)
releaser, err := mutex.Acquire(spec)
if err != nil {
return errors.Wrap(err, "getting destination path")
return errors.Wrapf(err, "unable to acquire lock for %+v", spec)
}
defer releaser.Release()

if _, err := os.Stat(dst); err == nil {
glog.Infof("%s exists", dst)
return nil
}

if err := os.MkdirAll(filepath.Dir(dstPath), 0777); err != nil {
if err := os.MkdirAll(filepath.Dir(dst), 0777); err != nil {
return errors.Wrapf(err, "making cache image directory: %s", dst)
}

Expand All @@ -101,8 +113,8 @@ func saveToTarFile(image, dst string) error {
if err != nil {
glog.Warningf("unable to retrieve image: %v", err)
}
glog.Infoln("opening: ", dstPath)
f, err := ioutil.TempFile(filepath.Dir(dstPath), filepath.Base(dstPath)+".*.tmp")
glog.Infoln("opening: ", dst)
f, err := ioutil.TempFile(filepath.Dir(dst), filepath.Base(dst)+".*.tmp")
if err != nil {
return err
}
Expand All @@ -128,7 +140,7 @@ func saveToTarFile(image, dst string) error {
if err != nil {
return errors.Wrap(err, "close")
}
err = os.Rename(f.Name(), dstPath)
err = os.Rename(f.Name(), dst)
if err != nil {
return errors.Wrap(err, "rename")
}
Expand Down
16 changes: 15 additions & 1 deletion pkg/util/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ import (
"net/url"
"os"
"path/filepath"
"time"

"github.com/golang/glog"
"github.com/hashicorp/go-getter"
"github.com/juju/mutex"
"github.com/pkg/errors"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/localpath"
"k8s.io/minikube/pkg/minikube/out"
"k8s.io/minikube/pkg/util/lock"
)

const fileScheme = "file"
Expand Down Expand Up @@ -56,6 +59,18 @@ func (f DefaultDownloader) GetISOFileURI(isoURL string) string {

// CacheMinikubeISOFromURL downloads the ISO, if it doesn't exist in cache
func (f DefaultDownloader) CacheMinikubeISOFromURL(url string) error {
dst := f.GetISOCacheFilepath(url)

// Lock before we check for existence to avoid thundering herd issues
spec := lock.PathMutexSpec(dst)
spec.Timeout = 10 * time.Minute
glog.Infof("acquiring lock: %+v", spec)
releaser, err := mutex.Acquire(spec)
if err != nil {
return errors.Wrapf(err, "unable to acquire lock for %+v", spec)
}
defer releaser.Release()

if !f.ShouldCacheMinikubeISO(url) {
glog.Infof("Not caching ISO, using %s", url)
return nil
Expand All @@ -66,7 +81,6 @@ func (f DefaultDownloader) CacheMinikubeISOFromURL(url string) error {
urlWithChecksum = url + "?checksum=file:" + constants.DefaultISOSHAURL
}

dst := f.GetISOCacheFilepath(url)
// Predictable temp destination so that resume can function
tmpDst := dst + ".download"

Expand Down
1 change: 0 additions & 1 deletion test/integration/aab_offline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func TestOffline(t *testing.T) {
runtime := runtime
t.Run(runtime, func(t *testing.T) {
MaybeParallel(t)
WaitForStartSlot(t)

if runtime != "docker" && NoneDriver() {
t.Skipf("skipping %s - incompatible with none driver", t.Name())
Expand Down
1 change: 0 additions & 1 deletion test/integration/addons_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (

// TestAddons tests addons that require no special environment -- in parallel
func TestAddons(t *testing.T) {
WaitForStartSlot(t)
profile := UniqueProfileName("addons")
ctx, cancel := context.WithTimeout(context.Background(), 40*time.Minute)
defer CleanupWithLogs(t, profile, cancel)
Expand Down
1 change: 0 additions & 1 deletion test/integration/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func TestDockerFlags(t *testing.T) {
t.Skip("skipping: none driver does not support ssh or bundle docker")
}
MaybeParallel(t)
WaitForStartSlot(t)

profile := UniqueProfileName("docker-flags")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
Expand Down
2 changes: 1 addition & 1 deletion test/integration/guest_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

func TestGuestEnvironment(t *testing.T) {
MaybeParallel(t)
WaitForStartSlot(t)


profile := UniqueProfileName("guest")
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
Expand Down
1 change: 0 additions & 1 deletion test/integration/gvisor_addon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func TestGvisorAddon(t *testing.T) {
}

MaybeParallel(t)
WaitForStartSlot(t)
profile := UniqueProfileName("gvisor")
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Minute)
defer func() {
Expand Down
36 changes: 0 additions & 36 deletions test/integration/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"io/ioutil"
"os/exec"
"strings"
"sync"
"testing"
"time"

Expand All @@ -41,13 +40,6 @@ import (
"k8s.io/minikube/pkg/kapi"
)

var (
// startTimes is a list of startup times, to guarantee --start-offset
startTimes = []time.Time{}
// startTimesMutex is a lock to update startTimes without a race condition
startTimesMutex = &sync.Mutex{}
)

// RunResult stores the result of an cmd.Run call
type RunResult struct {
Stdout *bytes.Buffer
Expand Down Expand Up @@ -349,34 +341,6 @@ func MaybeParallel(t *testing.T) {
t.Parallel()
}

// WaitForStartSlot enforces --start-offset to avoid startup race conditions
func WaitForStartSlot(t *testing.T) {
// Not parallel
if NoneDriver() {
return
}

wakeup := time.Now()
startTimesMutex.Lock()
if len(startTimes) > 0 {
nextStart := startTimes[len(startTimes)-1].Add(*startOffset)
// Ignore nextStart if it is in the past - to guarantee offset for next caller
if time.Now().Before(nextStart) {
wakeup = nextStart
}
}
startTimes = append(startTimes, wakeup)
startTimesMutex.Unlock()

if time.Now().Before(wakeup) {
d := time.Until(wakeup)
t.Logf("Waiting for start slot at %s (sleeping %s) ...", wakeup, d)
time.Sleep(d)
} else {
t.Logf("No need to wait for start slot, it is already %s", time.Now())
}
}

// killProcessFamily kills a pid and all of its children
func killProcessFamily(t *testing.T, pid int) {
parent, err := process.NewProcess(int32(pid))
Expand Down
3 changes: 0 additions & 3 deletions test/integration/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ var defaultDriver = flag.String("expected-default-driver", "", "Expected default
var forceProfile = flag.String("profile", "", "force tests to run against a particular profile")
var cleanup = flag.Bool("cleanup", true, "cleanup failed test run")
var enableGvisor = flag.Bool("gvisor", false, "run gvisor integration test (slow)")

// Ensure that multiple calls to startMachine do not collide with one another
var startOffset = flag.Duration("start-offset", 50*time.Second, "how much time to offset between cluster starts")
var postMortemLogs = flag.Bool("postmortem-logs", true, "show logs after a failed test run")

// Paths to files - normally set for CI
Expand Down
2 changes: 0 additions & 2 deletions test/integration/start_stop_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func TestStartStop(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
MaybeParallel(t)
WaitForStartSlot(t)

if !strings.Contains(tc.name, "docker") && NoneDriver() {
t.Skipf("skipping %s - incompatible with none driver", t.Name())
Expand Down Expand Up @@ -141,7 +140,6 @@ func TestStartStop(t *testing.T) {
t.Errorf("status = %q; want = %q", got, state.Stopped)
}

WaitForStartSlot(t)
rr, err = Run(t, exec.CommandContext(ctx, Target(), startArgs...))
if err != nil {
// Explicit fatal so that failures don't move directly to deletion
Expand Down
2 changes: 0 additions & 2 deletions test/integration/version_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
// and it tries to upgrade from the older supported k8s to news supported k8s
func TestVersionUpgrade(t *testing.T) {
MaybeParallel(t)
WaitForStartSlot(t)
profile := UniqueProfileName("vupgrade")
ctx, cancel := context.WithTimeout(context.Background(), 55*time.Minute)

Expand Down Expand Up @@ -92,7 +91,6 @@ func TestVersionUpgrade(t *testing.T) {
t.Errorf("status = %q; want = %q", got, state.Stopped.String())
}

WaitForStartSlot(t)
args = append([]string{"start", "-p", profile, fmt.Sprintf("--kubernetes-version=%s", constants.NewestKubernetesVersion), "--alsologtostderr", "-v=1"}, StartArgs()...)
rr, err = Run(t, exec.CommandContext(ctx, Target(), args...))
if err != nil {
Expand Down

0 comments on commit 0a9e8b0

Please sign in to comment.