From e5fd7a6a811e01de48b0900d63431aceeb4c5e1e Mon Sep 17 00:00:00 2001 From: Thomas Stromberg Date: Thu, 9 Jan 2020 12:58:10 -0800 Subject: [PATCH 1/4] Replace integration-test specific hacks with actual protection against racey events --- pkg/minikube/cluster/cluster.go | 24 +++++++++++++++ pkg/minikube/driver/install.go | 14 +++++++++ pkg/minikube/image/cache.go | 34 +++++++++++++------- pkg/util/downloader.go | 16 +++++++++- test/integration/aab_offline_test.go | 1 - test/integration/addons_test.go | 1 - test/integration/docker_test.go | 1 - test/integration/guest_env_test.go | 2 +- test/integration/gvisor_addon_test.go | 1 - test/integration/helpers.go | 36 ---------------------- test/integration/start_stop_delete_test.go | 2 -- test/integration/version_upgrade_test.go | 2 -- 12 files changed, 77 insertions(+), 57 deletions(-) diff --git a/pkg/minikube/cluster/cluster.go b/pkg/minikube/cluster/cluster.go index e295fe170c84..6c550068cc76 100644 --- a/pkg/minikube/cluster/cluster.go +++ b/pkg/minikube/cluster/cluster.go @@ -24,6 +24,7 @@ import ( "math" "net" "os/exec" + "path/filepath" "regexp" "strconv" "strings" @@ -38,6 +39,7 @@ import ( "github.com/docker/machine/libmachine/ssh" "github.com/docker/machine/libmachine/state" "github.com/golang/glog" + "github.com/juju/mutex" "github.com/pkg/errors" "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/disk" @@ -53,6 +55,7 @@ import ( "k8s.io/minikube/pkg/minikube/out" "k8s.io/minikube/pkg/minikube/registry" pkgutil "k8s.io/minikube/pkg/util" + "k8s.io/minikube/pkg/util/lock" "k8s.io/minikube/pkg/util/retry" ) @@ -89,6 +92,13 @@ func CacheISO(config cfg.MachineConfig) error { // StartHost starts a host VM. func StartHost(api libmachine.API, config cfg.MachineConfig) (*host.Host, error) { + // Prevent machine-driver boot races, as well as our own certificate race + releaser, err := acquireMachinesLock() + if err != nil { + return nil, errors.Wrap(err, "boot lock") + } + defer releaser.Release() + exists, err := api.Exists(config.Name) if err != nil { return nil, errors.Wrapf(err, "exists: %s", config.Name) @@ -139,6 +149,20 @@ func StartHost(api libmachine.API, config cfg.MachineConfig) (*host.Host, error) return h, nil } +// acquireMachinesLock protects against code that is not parallel-safe (libmachine, cert setup) +func acquireMachinesLock() (mutex.Releaser, error) { + // NOTE: Provisioning generally completes within 60 seconds + spec := lock.PathMutexSpec(filepath.Join(localpath.MiniPath(), "machines")) + spec.Timeout = 5 * time.Minute + + glog.Infof("acquiring machines lock: %+v", spec) + start := time.Now() + defer func() { + glog.Infof("acquired machines lock within %s", time.Since(start)) + }() + return mutex.Acquire(spec) +} + // configureHost handles any post-powerup configuration required func configureHost(h *host.Host, e *engine.Options) error { start := time.Now() diff --git a/pkg/minikube/driver/install.go b/pkg/minikube/driver/install.go index 1d0b856e3987..2732fe6f36f0 100644 --- a/pkg/minikube/driver/install.go +++ b/pkg/minikube/driver/install.go @@ -23,14 +23,17 @@ import ( "path/filepath" "regexp" "strings" + "time" "github.com/blang/semver" "github.com/golang/glog" "github.com/hashicorp/go-getter" + "github.com/juju/mutex" "github.com/pkg/errors" "k8s.io/minikube/pkg/minikube/out" "k8s.io/minikube/pkg/util" + "k8s.io/minikube/pkg/util/lock" ) // InstallOrUpdate downloads driver if it is not present, or updates it if there's a newer version @@ -40,6 +43,17 @@ func InstallOrUpdate(name string, directory string, v semver.Version, interactiv } executable := fmt.Sprintf("docker-machine-driver-%s", name) + + // Lock before we check for existence to avoid thundering herd issues + spec := lock.PathMutexSpec(executable) + spec.Timeout = 10 * time.Minute + glog.Infof("acquiring lock: %+v", spec) + releaser, err := mutex.Acquire(spec) + if err != nil { + return errors.Wrapf(err, "unable to acquire lock for %+v", spec) + } + defer releaser.Release() + exists := driverExists(executable) path, err := validateDriver(executable, v) if !exists || (err != nil && autoUpdate) { diff --git a/pkg/minikube/image/cache.go b/pkg/minikube/image/cache.go index 93ac59e59e01..3a29e779b5f1 100644 --- a/pkg/minikube/image/cache.go +++ b/pkg/minikube/image/cache.go @@ -25,10 +25,12 @@ import ( "github.com/golang/glog" "github.com/google/go-containerregistry/pkg/name" "github.com/google/go-containerregistry/pkg/v1/tarball" + "github.com/juju/mutex" "github.com/pkg/errors" "golang.org/x/sync/errgroup" "k8s.io/minikube/pkg/minikube/constants" "k8s.io/minikube/pkg/minikube/localpath" + "k8s.io/minikube/pkg/util/lock" ) // DeleteFromCacheDir deletes tar files stored in cache dir @@ -72,23 +74,33 @@ func SaveToDir(images []string, cacheDir string) error { } // saveToTarFile caches an image -func saveToTarFile(image, dst string) error { +func saveToTarFile(image, rawDest string) error { start := time.Now() defer func() { - glog.Infof("cache image %q -> %s to local destination -> %q", image, dst, time.Since(start)) + glog.Infof("cache image %q -> %q took %s", image, rawDest, time.Since(start)) }() - if _, err := os.Stat(dst); err == nil { - glog.Infof("%s exists", dst) - return nil + // OS-specific mangling of destination path + dst, err := localpath.DstPath(rawDest) + if err != nil { + return errors.Wrap(err, "getting destination path") } - dstPath, err := localpath.DstPath(dst) + spec := lock.PathMutexSpec(dst) + spec.Timeout = 10 * time.Minute + glog.Infof("acquiring lock: %+v", spec) + releaser, err := mutex.Acquire(spec) if err != nil { - return errors.Wrap(err, "getting destination path") + return errors.Wrapf(err, "unable to acquire lock for %+v", spec) + } + defer releaser.Release() + + if _, err := os.Stat(dst); err == nil { + glog.Infof("%s exists", dst) + return nil } - if err := os.MkdirAll(filepath.Dir(dstPath), 0777); err != nil { + if err := os.MkdirAll(filepath.Dir(dst), 0777); err != nil { return errors.Wrapf(err, "making cache image directory: %s", dst) } @@ -101,8 +113,8 @@ func saveToTarFile(image, dst string) error { if err != nil { glog.Warningf("unable to retrieve image: %v", err) } - glog.Infoln("opening: ", dstPath) - f, err := ioutil.TempFile(filepath.Dir(dstPath), filepath.Base(dstPath)+".*.tmp") + glog.Infoln("opening: ", dst) + f, err := ioutil.TempFile(filepath.Dir(dst), filepath.Base(dst)+".*.tmp") if err != nil { return err } @@ -128,7 +140,7 @@ func saveToTarFile(image, dst string) error { if err != nil { return errors.Wrap(err, "close") } - err = os.Rename(f.Name(), dstPath) + err = os.Rename(f.Name(), dst) if err != nil { return errors.Wrap(err, "rename") } diff --git a/pkg/util/downloader.go b/pkg/util/downloader.go index 20ae0550ba53..72b478256589 100644 --- a/pkg/util/downloader.go +++ b/pkg/util/downloader.go @@ -20,13 +20,16 @@ import ( "net/url" "os" "path/filepath" + "time" "github.com/golang/glog" "github.com/hashicorp/go-getter" + "github.com/juju/mutex" "github.com/pkg/errors" "k8s.io/minikube/pkg/minikube/constants" "k8s.io/minikube/pkg/minikube/localpath" "k8s.io/minikube/pkg/minikube/out" + "k8s.io/minikube/pkg/util/lock" ) const fileScheme = "file" @@ -56,6 +59,18 @@ func (f DefaultDownloader) GetISOFileURI(isoURL string) string { // CacheMinikubeISOFromURL downloads the ISO, if it doesn't exist in cache func (f DefaultDownloader) CacheMinikubeISOFromURL(url string) error { + dst := f.GetISOCacheFilepath(url) + + // Lock before we check for existence to avoid thundering herd issues + spec := lock.PathMutexSpec(dst) + spec.Timeout = 10 * time.Minute + glog.Infof("acquiring lock: %+v", spec) + releaser, err := mutex.Acquire(spec) + if err != nil { + return errors.Wrapf(err, "unable to acquire lock for %+v", spec) + } + defer releaser.Release() + if !f.ShouldCacheMinikubeISO(url) { glog.Infof("Not caching ISO, using %s", url) return nil @@ -66,7 +81,6 @@ func (f DefaultDownloader) CacheMinikubeISOFromURL(url string) error { urlWithChecksum = url + "?checksum=file:" + constants.DefaultISOSHAURL } - dst := f.GetISOCacheFilepath(url) // Predictable temp destination so that resume can function tmpDst := dst + ".download" diff --git a/test/integration/aab_offline_test.go b/test/integration/aab_offline_test.go index 4224ecda8dc1..523850624b2b 100644 --- a/test/integration/aab_offline_test.go +++ b/test/integration/aab_offline_test.go @@ -32,7 +32,6 @@ func TestOffline(t *testing.T) { for _, runtime := range []string{"docker", "crio", "containerd"} { t.Run(runtime, func(t *testing.T) { MaybeParallel(t) - WaitForStartSlot(t) if runtime != "docker" && NoneDriver() { t.Skipf("skipping %s - incompatible with none driver", t.Name()) diff --git a/test/integration/addons_test.go b/test/integration/addons_test.go index 7b0ffeee7c62..86e49d716b43 100644 --- a/test/integration/addons_test.go +++ b/test/integration/addons_test.go @@ -36,7 +36,6 @@ import ( // TestAddons tests addons that require no special environment -- in parallel func TestAddons(t *testing.T) { - WaitForStartSlot(t) profile := UniqueProfileName("addons") ctx, cancel := context.WithTimeout(context.Background(), 40*time.Minute) defer CleanupWithLogs(t, profile, cancel) diff --git a/test/integration/docker_test.go b/test/integration/docker_test.go index 6ed9f76202ee..24b19fe5d263 100644 --- a/test/integration/docker_test.go +++ b/test/integration/docker_test.go @@ -31,7 +31,6 @@ func TestDockerFlags(t *testing.T) { t.Skip("skipping: none driver does not support ssh or bundle docker") } MaybeParallel(t) - WaitForStartSlot(t) profile := UniqueProfileName("docker-flags") ctx, cancel := context.WithTimeout(context.Background(), 20*time.Minute) diff --git a/test/integration/guest_env_test.go b/test/integration/guest_env_test.go index 0e7dfb66347d..f93239a1a8c4 100644 --- a/test/integration/guest_env_test.go +++ b/test/integration/guest_env_test.go @@ -28,7 +28,7 @@ import ( func TestGuestEnvironment(t *testing.T) { MaybeParallel(t) - WaitForStartSlot(t) + profile := UniqueProfileName("guest") ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute) diff --git a/test/integration/gvisor_addon_test.go b/test/integration/gvisor_addon_test.go index ada8765562fd..cd8051e4ef3b 100644 --- a/test/integration/gvisor_addon_test.go +++ b/test/integration/gvisor_addon_test.go @@ -35,7 +35,6 @@ func TestGvisorAddon(t *testing.T) { } MaybeParallel(t) - WaitForStartSlot(t) profile := UniqueProfileName("gvisor") ctx, cancel := context.WithTimeout(context.Background(), 60*time.Minute) defer func() { diff --git a/test/integration/helpers.go b/test/integration/helpers.go index 1539170f7720..8d145da091bf 100644 --- a/test/integration/helpers.go +++ b/test/integration/helpers.go @@ -30,7 +30,6 @@ import ( "io/ioutil" "os/exec" "strings" - "sync" "testing" "time" @@ -41,13 +40,6 @@ import ( "k8s.io/minikube/pkg/kapi" ) -var ( - // startTimes is a list of startup times, to guarantee --start-offset - startTimes = []time.Time{} - // startTimesMutex is a lock to update startTimes without a race condition - startTimesMutex = &sync.Mutex{} -) - // RunResult stores the result of an cmd.Run call type RunResult struct { Stdout *bytes.Buffer @@ -349,34 +341,6 @@ func MaybeParallel(t *testing.T) { t.Parallel() } -// WaitForStartSlot enforces --start-offset to avoid startup race conditions -func WaitForStartSlot(t *testing.T) { - // Not parallel - if NoneDriver() { - return - } - - wakeup := time.Now() - startTimesMutex.Lock() - if len(startTimes) > 0 { - nextStart := startTimes[len(startTimes)-1].Add(*startOffset) - // Ignore nextStart if it is in the past - to guarantee offset for next caller - if time.Now().Before(nextStart) { - wakeup = nextStart - } - } - startTimes = append(startTimes, wakeup) - startTimesMutex.Unlock() - - if time.Now().Before(wakeup) { - d := time.Until(wakeup) - t.Logf("Waiting for start slot at %s (sleeping %s) ...", wakeup, d) - time.Sleep(d) - } else { - t.Logf("No need to wait for start slot, it is already %s", time.Now()) - } -} - // killProcessFamily kills a pid and all of its children func killProcessFamily(t *testing.T, pid int) { parent, err := process.NewProcess(int32(pid)) diff --git a/test/integration/start_stop_delete_test.go b/test/integration/start_stop_delete_test.go index 5b43864614cc..6ecf70e72c01 100644 --- a/test/integration/start_stop_delete_test.go +++ b/test/integration/start_stop_delete_test.go @@ -78,7 +78,6 @@ func TestStartStop(t *testing.T) { tc := tc t.Run(tc.name, func(t *testing.T) { MaybeParallel(t) - WaitForStartSlot(t) if !strings.Contains(tc.name, "docker") && NoneDriver() { t.Skipf("skipping %s - incompatible with none driver", t.Name()) @@ -140,7 +139,6 @@ func TestStartStop(t *testing.T) { t.Errorf("status = %q; want = %q", got, state.Stopped) } - WaitForStartSlot(t) rr, err = Run(t, exec.CommandContext(ctx, Target(), startArgs...)) if err != nil { // Explicit fatal so that failures don't move directly to deletion diff --git a/test/integration/version_upgrade_test.go b/test/integration/version_upgrade_test.go index cec42dcb9d37..f4caae450a82 100644 --- a/test/integration/version_upgrade_test.go +++ b/test/integration/version_upgrade_test.go @@ -42,7 +42,6 @@ import ( // and it tries to upgrade from the older supported k8s to news supported k8s func TestVersionUpgrade(t *testing.T) { MaybeParallel(t) - WaitForStartSlot(t) profile := UniqueProfileName("vupgrade") ctx, cancel := context.WithTimeout(context.Background(), 55*time.Minute) @@ -92,7 +91,6 @@ func TestVersionUpgrade(t *testing.T) { t.Errorf("status = %q; want = %q", got, state.Stopped.String()) } - WaitForStartSlot(t) args = append([]string{"start", "-p", profile, fmt.Sprintf("--kubernetes-version=%s", constants.NewestKubernetesVersion), "--alsologtostderr", "-v=1"}, StartArgs()...) rr, err = Run(t, exec.CommandContext(ctx, Target(), args...)) if err != nil { From 4091486679e577210d365f0144fe457b77eb7931 Mon Sep 17 00:00:00 2001 From: Thomas Stromberg Date: Thu, 9 Jan 2020 13:41:52 -0800 Subject: [PATCH 2/4] Remove --start-offset flag --- test/integration/main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/integration/main.go b/test/integration/main.go index fe351e070d46..bffb12f07966 100644 --- a/test/integration/main.go +++ b/test/integration/main.go @@ -33,7 +33,6 @@ var defaultDriver = flag.String("expected-default-driver", "", "Expected default var forceProfile = flag.String("profile", "", "force tests to run against a particular profile") var cleanup = flag.Bool("cleanup", true, "cleanup failed test run") var enableGvisor = flag.Bool("gvisor", false, "run gvisor integration test (slow)") -var startOffset = flag.Duration("start-offset", 30*time.Second, "how much time to offset between cluster starts") var postMortemLogs = flag.Bool("postmortem-logs", true, "show logs after a failed test run") // Paths to files - normally set for CI From 418082e3687a96acc0b277337ba34cdae94d3215 Mon Sep 17 00:00:00 2001 From: Thomas Stromberg Date: Thu, 9 Jan 2020 14:46:56 -0800 Subject: [PATCH 3/4] Log machine name and how long we held the lock for --- pkg/minikube/cluster/cluster.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/pkg/minikube/cluster/cluster.go b/pkg/minikube/cluster/cluster.go index 6c550068cc76..028566e99570 100644 --- a/pkg/minikube/cluster/cluster.go +++ b/pkg/minikube/cluster/cluster.go @@ -93,11 +93,15 @@ func CacheISO(config cfg.MachineConfig) error { // StartHost starts a host VM. func StartHost(api libmachine.API, config cfg.MachineConfig) (*host.Host, error) { // Prevent machine-driver boot races, as well as our own certificate race - releaser, err := acquireMachinesLock() + releaser, err := acquireMachinesLock(config.Name) if err != nil { return nil, errors.Wrap(err, "boot lock") } - defer releaser.Release() + start := time.Now() + defer func() { + glog.Infof("releasing machines lock for %q, held for %s", config.Name, time.Since(start)) + releaser.Release() + }() exists, err := api.Exists(config.Name) if err != nil { @@ -150,17 +154,18 @@ func StartHost(api libmachine.API, config cfg.MachineConfig) (*host.Host, error) } // acquireMachinesLock protects against code that is not parallel-safe (libmachine, cert setup) -func acquireMachinesLock() (mutex.Releaser, error) { - // NOTE: Provisioning generally completes within 60 seconds +func acquireMachinesLock(name string) (mutex.Releaser, error) { spec := lock.PathMutexSpec(filepath.Join(localpath.MiniPath(), "machines")) - spec.Timeout = 5 * time.Minute + // NOTE: Provisioning generally completes within 60 seconds + spec.Timeout = 10 * time.Minute - glog.Infof("acquiring machines lock: %+v", spec) + glog.Infof("acquiring machines lock for %s: %+v", name, spec) start := time.Now() - defer func() { - glog.Infof("acquired machines lock within %s", time.Since(start)) - }() - return mutex.Acquire(spec) + r, err := mutex.Acquire(spec) + if err == nil { + glog.Infof("acquired machines lock for %q in %s", name, time.Since(start)) + } + return r, err } // configureHost handles any post-powerup configuration required From 0c07e808219403a7241ee5a0fc6a85a897594339 Mon Sep 17 00:00:00 2001 From: tstromberg Date: Fri, 10 Jan 2020 09:55:50 -0800 Subject: [PATCH 4/4] Remove unused import from merge --- pkg/minikube/cluster/cluster.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/minikube/cluster/cluster.go b/pkg/minikube/cluster/cluster.go index c5a98c375b68..97492a33cf53 100644 --- a/pkg/minikube/cluster/cluster.go +++ b/pkg/minikube/cluster/cluster.go @@ -54,7 +54,6 @@ import ( "k8s.io/minikube/pkg/minikube/localpath" "k8s.io/minikube/pkg/minikube/out" "k8s.io/minikube/pkg/minikube/registry" - pkgutil "k8s.io/minikube/pkg/util" "k8s.io/minikube/pkg/util/lock" "k8s.io/minikube/pkg/util/retry" )