diff --git a/pkg/skaffold/build/cache/hash.go b/pkg/skaffold/build/cache/hash.go index 375c8ea94b1..0956d7a1ccc 100644 --- a/pkg/skaffold/build/cache/hash.go +++ b/pkg/skaffold/build/cache/hash.go @@ -66,20 +66,10 @@ func newArtifactHasher(artifacts build.ArtifactGraph, lister DependencyLister, m } func (h *artifactHasherImpl) hash(ctx context.Context, a *latest.Artifact) (string, error) { - val := h.syncStore.Exec(a.ImageName, - func() interface{} { - hash, err := singleArtifactHash(ctx, h.lister, a, h.mode) - if err != nil { - return err - } - return hash - }) - - if err, ok := val.(error); ok { + hash, err := h.safeHash(ctx, a) + if err != nil { return "", err } - hash := val.(string) - hashes := []string{hash} for _, dep := range sortedDependencies(a, h.artifacts) { depHash, err := h.hash(ctx, dep) @@ -95,6 +85,25 @@ func (h *artifactHasherImpl) hash(ctx context.Context, a *latest.Artifact) (stri return encode(hashes) } +func (h *artifactHasherImpl) safeHash(ctx context.Context, a *latest.Artifact) (string, error) { + val := h.syncStore.Exec(a.ImageName, + func() interface{} { + hash, err := singleArtifactHash(ctx, h.lister, a, h.mode) + if err != nil { + return err + } + return hash + }) + switch t := val.(type) { + case error: + return "", t + case string: + return t, nil + default: + return "", fmt.Errorf("internal error when retrieving cache result of type %T", t) + } +} + // singleArtifactHash calculates the hash for a single artifact, and ignores its required artifacts. func singleArtifactHash(ctx context.Context, depLister DependencyLister, a *latest.Artifact, mode config.RunMode) (string, error) { var inputs []string diff --git a/pkg/skaffold/docker/dependencies.go b/pkg/skaffold/docker/dependencies.go index 0ce36a28226..d6f7d322d9b 100644 --- a/pkg/skaffold/docker/dependencies.go +++ b/pkg/skaffold/docker/dependencies.go @@ -22,22 +22,15 @@ import ( "os" "path/filepath" "sort" - "sync" "github.com/docker/docker/builder/dockerignore" - "github.com/golang/groupcache/singleflight" + "github.com/GoogleContainerTools/skaffold/pkg/skaffold/util" "github.com/GoogleContainerTools/skaffold/pkg/skaffold/walk" ) var ( - // sfGetDependencies ensures `GetDependencies` is called only once at any time - // for a given dockerfile. - // sfGetDependencies along with sync.Map will ensure no two concurrent processes read or - // write dependencies for a given dockerfile. - sfGetDependencies = singleflight.Group{} - - dependencyCache = sync.Map{} + dependencyCache = util.NewSyncStore() ) // NormalizeDockerfilePath returns the absolute path to the dockerfile. @@ -62,21 +55,18 @@ func GetDependencies(ctx context.Context, workspace string, dockerfilePath strin return nil, fmt.Errorf("normalizing dockerfile path: %w", err) } - deps, _ := sfGetDependencies.Do(absDockerfilePath, func() (interface{}, error) { - if dep, ok := dependencyCache.Load(absDockerfilePath); ok { - return dep, nil - } - dep := getDependencies(workspace, dockerfilePath, absDockerfilePath, buildArgs, cfg) - dependencyCache.Store(absDockerfilePath, dep) - return dep, nil + deps := dependencyCache.Exec(absDockerfilePath, func() interface{} { + return getDependencies(workspace, dockerfilePath, absDockerfilePath, buildArgs, cfg) }) - if paths, ok := deps.([]string); ok { - return paths, nil - } else if err, ok := deps.(error); ok { - return nil, err + switch t := deps.(type) { + case error: + return nil, t + case []string: + return t, nil + default: + return nil, fmt.Errorf("internal error when retrieving cache result of type %T", t) } - return nil, fmt.Errorf("unexpected skaffold internal error encountered converting dependencies to []string") } func getDependencies(workspace string, dockerfilePath string, absDockerfilePath string, buildArgs map[string]*string, cfg Config) interface{} { diff --git a/pkg/skaffold/docker/dependencies_test.go b/pkg/skaffold/docker/dependencies_test.go index 7e7ba1aeb05..30769a8897b 100644 --- a/pkg/skaffold/docker/dependencies_test.go +++ b/pkg/skaffold/docker/dependencies_test.go @@ -21,7 +21,6 @@ import ( "fmt" "os" "path/filepath" - "sync" "testing" v1 "github.com/google/go-containerregistry/pkg/v1" @@ -699,13 +698,15 @@ func TestGetDependenciesCached(t *testing.T) { testutil.Run(t, test.description, func(t *testutil.T) { t.Override(&RetrieveImage, test.retrieveImgMock) t.Override(&util.OSEnviron, func() []string { return []string{} }) + t.Override(&dependencyCache, util.NewSyncStore()) tmpDir := t.NewTempDir().Touch("server.go", "random.go") tmpDir.Write("Dockerfile", copyServerGo) - // construct cache for abs dockerfile paths. - defer func() { dependencyCache = sync.Map{} }() + for k, v := range test.dependencyCache { - dependencyCache.Store(tmpDir.Path(k), v) + dependencyCache.Exec(tmpDir.Path(k), func() interface{} { + return v + }) } deps, err := GetDependencies(context.Background(), tmpDir.Root(), "Dockerfile", map[string]*string{}, nil) t.CheckErrorAndDeepEqual(test.shouldErr, err, test.expected, deps) diff --git a/pkg/skaffold/util/store.go b/pkg/skaffold/util/store.go index 2f4227d1596..f75275c192c 100644 --- a/pkg/skaffold/util/store.go +++ b/pkg/skaffold/util/store.go @@ -17,33 +17,62 @@ limitations under the License. package util import ( + "fmt" "sync" + + "github.com/golang/groupcache/singleflight" ) -// SyncStore exports a single method `Exec` to ensure single execution of a function and share the result between all callers of the function. +// SyncStore exports a single method `Exec` to ensure single execution of a function +// and share the result between all callers of the function. type SyncStore struct { - oncePerKey *sync.Map - results *sync.Map + sf singleflight.Group + results sync.Map } // Exec executes the function f if and only if it's being called the first time for a specific key. // If it's called multiple times for the same key only the first call will execute and store the result of f. // All other calls will be blocked until the running instance of f returns and all of them receive the same result. -func (o *SyncStore) Exec(key interface{}, f func() interface{}) interface{} { - once, _ := o.oncePerKey.LoadOrStore(key, new(sync.Once)) - once.(*sync.Once).Do(func() { - res := f() - o.results.Store(key, res) +func (o *SyncStore) Exec(key string, f func() interface{}) interface{} { + val, err := o.sf.Do(key, func() (_ interface{}, err error) { + // trap any runtime error due to synchronization issues. + defer func() { + if rErr := recover(); rErr != nil { + err = retrieveError(key, rErr) + } + }() + v, ok := o.results.Load(key) + if !ok { + v = f() + o.results.Store(key, v) + } + return v, nil }) - - val, _ := o.results.Load(key) + if err != nil { + return err + } return val } // NewSyncStore returns a new instance of `SyncStore` func NewSyncStore() *SyncStore { return &SyncStore{ - oncePerKey: new(sync.Map), - results: new(sync.Map), + sf: singleflight.Group{}, + results: sync.Map{}, + } +} + +// StoreError represent any error that when retrieving errors from the store. +type StoreError struct { + message string +} + +func (e StoreError) Error() string { + return e.message +} + +func retrieveError(key string, i interface{}) StoreError { + return StoreError{ + message: fmt.Sprintf("internal error retrieving cached results for key %s: %v", key, i), } } diff --git a/pkg/skaffold/util/store_test.go b/pkg/skaffold/util/store_test.go index c5254a88ff4..455c3b6d217 100644 --- a/pkg/skaffold/util/store_test.go +++ b/pkg/skaffold/util/store_test.go @@ -17,6 +17,8 @@ limitations under the License. package util import ( + "fmt" + "strconv" "sync" "sync/atomic" "testing" @@ -25,7 +27,7 @@ import ( ) func TestSyncStore(t *testing.T) { - testutil.Run(t, "test util.once", func(t *testutil.T) { + testutil.Run(t, "test store", func(t *testutil.T) { // This test runs a counter function twice for each key from [0, 5) and tests that the function only executes once for each key when called inside `once.Do` method. counts := make([]int32, 5) f := func(i int) int { @@ -39,7 +41,8 @@ func TestSyncStore(t *testing.T) { for i := 0; i < 5; i++ { for j := 0; j < 2; j++ { go func(i int) { - val := s.Exec(i, func() interface{} { + k := strconv.Itoa(i) + val := s.Exec(k, func() interface{} { return f(i) }) t.CheckDeepEqual(i, val) @@ -54,4 +57,18 @@ func TestSyncStore(t *testing.T) { } } }) + + testutil.Run(t, "test panic handled correctly", func(t *testutil.T) { + s := NewSyncStore() + val := s.Exec("panic", func() interface{} { + panic(fmt.Errorf("message")) + }) + // make sure val is of type StoreError + switch tv := val.(type) { + case StoreError: + t.CheckDeepEqual("internal error retrieving cached results for key panic: message", tv.Error()) + default: + t.Fatalf("expected to retrieve result of type StoreError but found %T", tv) + } + }) }