Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace util.SyncStore implementation to use singleflight and sync.Map. #5016

Merged
merged 10 commits into from
Nov 13, 2020
33 changes: 21 additions & 12 deletions pkg/skaffold/build/cache/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,10 @@ func newArtifactHasher(artifacts build.ArtifactGraph, lister DependencyLister, m
}

func (h *artifactHasherImpl) hash(ctx context.Context, a *latest.Artifact) (string, error) {
val := h.syncStore.Exec(a.ImageName,
func() interface{} {
hash, err := singleArtifactHash(ctx, h.lister, a, h.mode)
if err != nil {
return err
}
return hash
})

if err, ok := val.(error); ok {
hash, err := h.safeHash(ctx, a)
if err != nil {
return "", err
}
hash := val.(string)

hashes := []string{hash}
for _, dep := range sortedDependencies(a, h.artifacts) {
depHash, err := h.hash(ctx, dep)
Expand All @@ -95,6 +85,25 @@ func (h *artifactHasherImpl) hash(ctx context.Context, a *latest.Artifact) (stri
return encode(hashes)
}

func (h *artifactHasherImpl) safeHash(ctx context.Context, a *latest.Artifact) (string, error) {
val := h.syncStore.Exec(a.ImageName,
func() interface{} {
hash, err := singleArtifactHash(ctx, h.lister, a, h.mode)
if err != nil {
return err
}
return hash
})
switch t := val.(type) {
case error:
return "", t
case string:
return t, nil
default:
return "", fmt.Errorf("internal error when retrieving cache result of type %T", t)
}
}

// singleArtifactHash calculates the hash for a single artifact, and ignores its required artifacts.
func singleArtifactHash(ctx context.Context, depLister DependencyLister, a *latest.Artifact, mode config.RunMode) (string, error) {
var inputs []string
Expand Down
32 changes: 11 additions & 21 deletions pkg/skaffold/docker/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,15 @@ import (
"os"
"path/filepath"
"sort"
"sync"

"github.com/docker/docker/builder/dockerignore"
"github.com/golang/groupcache/singleflight"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/walk"
)

var (
// sfGetDependencies ensures `GetDependencies` is called only once at any time
// for a given dockerfile.
// sfGetDependencies along with sync.Map will ensure no two concurrent processes read or
// write dependencies for a given dockerfile.
sfGetDependencies = singleflight.Group{}

dependencyCache = sync.Map{}
dependencyCache = util.NewSyncStore()
)

// NormalizeDockerfilePath returns the absolute path to the dockerfile.
Expand All @@ -62,21 +55,18 @@ func GetDependencies(ctx context.Context, workspace string, dockerfilePath strin
return nil, fmt.Errorf("normalizing dockerfile path: %w", err)
}

deps, _ := sfGetDependencies.Do(absDockerfilePath, func() (interface{}, error) {
if dep, ok := dependencyCache.Load(absDockerfilePath); ok {
return dep, nil
}
dep := getDependencies(workspace, dockerfilePath, absDockerfilePath, buildArgs, cfg)
dependencyCache.Store(absDockerfilePath, dep)
return dep, nil
deps := dependencyCache.Exec(absDockerfilePath, func() interface{} {
return getDependencies(workspace, dockerfilePath, absDockerfilePath, buildArgs, cfg)
})

if paths, ok := deps.([]string); ok {
return paths, nil
} else if err, ok := deps.(error); ok {
return nil, err
switch t := deps.(type) {
case error:
return nil, t
case []string:
return t, nil
default:
return nil, fmt.Errorf("internal error when retrieving cache result of type %T", t)
}
return nil, fmt.Errorf("unexpected skaffold internal error encountered converting dependencies to []string")
}

func getDependencies(workspace string, dockerfilePath string, absDockerfilePath string, buildArgs map[string]*string, cfg Config) interface{} {
Expand Down
9 changes: 5 additions & 4 deletions pkg/skaffold/docker/dependencies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"testing"

v1 "github.com/google/go-containerregistry/pkg/v1"
Expand Down Expand Up @@ -699,13 +698,15 @@ func TestGetDependenciesCached(t *testing.T) {
testutil.Run(t, test.description, func(t *testutil.T) {
t.Override(&RetrieveImage, test.retrieveImgMock)
t.Override(&util.OSEnviron, func() []string { return []string{} })
t.Override(&dependencyCache, util.NewSyncStore())

tmpDir := t.NewTempDir().Touch("server.go", "random.go")
tmpDir.Write("Dockerfile", copyServerGo)
// construct cache for abs dockerfile paths.
defer func() { dependencyCache = sync.Map{} }()

for k, v := range test.dependencyCache {
dependencyCache.Store(tmpDir.Path(k), v)
dependencyCache.Exec(tmpDir.Path(k), func() interface{} {
return v
})
}
deps, err := GetDependencies(context.Background(), tmpDir.Root(), "Dockerfile", map[string]*string{}, nil)
t.CheckErrorAndDeepEqual(test.shouldErr, err, test.expected, deps)
Expand Down
53 changes: 41 additions & 12 deletions pkg/skaffold/util/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,62 @@ limitations under the License.
package util

import (
"fmt"
"sync"

"github.com/golang/groupcache/singleflight"
)

// SyncStore exports a single method `Exec` to ensure single execution of a function and share the result between all callers of the function.
// SyncStore exports a single method `Exec` to ensure single execution of a function
// and share the result between all callers of the function.
type SyncStore struct {
oncePerKey *sync.Map
results *sync.Map
sf singleflight.Group
tejal29 marked this conversation as resolved.
Show resolved Hide resolved
results sync.Map
}

// Exec executes the function f if and only if it's being called the first time for a specific key.
// If it's called multiple times for the same key only the first call will execute and store the result of f.
// All other calls will be blocked until the running instance of f returns and all of them receive the same result.
func (o *SyncStore) Exec(key interface{}, f func() interface{}) interface{} {
once, _ := o.oncePerKey.LoadOrStore(key, new(sync.Once))
once.(*sync.Once).Do(func() {
res := f()
o.results.Store(key, res)
func (o *SyncStore) Exec(key string, f func() interface{}) interface{} {
val, err := o.sf.Do(key, func() (_ interface{}, err error) {
// trap any runtime error due to synchronization issues.
defer func() {
if rErr := recover(); rErr != nil {
err = retrieveError(key, rErr)
}
}()
v, ok := o.results.Load(key)
if !ok {
v = f()
o.results.Store(key, v)
}
return v, nil
})

val, _ := o.results.Load(key)
if err != nil {
return err
}
return val
}

// NewSyncStore returns a new instance of `SyncStore`
func NewSyncStore() *SyncStore {
return &SyncStore{
oncePerKey: new(sync.Map),
results: new(sync.Map),
sf: singleflight.Group{},
results: sync.Map{},
}
}

// StoreError represent any error that when retrieving errors from the store.
type StoreError struct {
message string
}

func (e StoreError) Error() string {
return e.message
}

func retrieveError(key string, i interface{}) StoreError {
return StoreError{
message: fmt.Sprintf("internal error retrieving cached results for key %s: %v", key, i),
}
}
21 changes: 19 additions & 2 deletions pkg/skaffold/util/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package util

import (
"fmt"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand All @@ -25,7 +27,7 @@ import (
)

func TestSyncStore(t *testing.T) {
testutil.Run(t, "test util.once", func(t *testutil.T) {
testutil.Run(t, "test store", func(t *testutil.T) {
// This test runs a counter function twice for each key from [0, 5) and tests that the function only executes once for each key when called inside `once.Do` method.
counts := make([]int32, 5)
f := func(i int) int {
Expand All @@ -39,7 +41,8 @@ func TestSyncStore(t *testing.T) {
for i := 0; i < 5; i++ {
for j := 0; j < 2; j++ {
go func(i int) {
val := s.Exec(i, func() interface{} {
k := strconv.Itoa(i)
val := s.Exec(k, func() interface{} {
return f(i)
})
t.CheckDeepEqual(i, val)
Expand All @@ -54,4 +57,18 @@ func TestSyncStore(t *testing.T) {
}
}
})

testutil.Run(t, "test panic handled correctly", func(t *testutil.T) {
s := NewSyncStore()
val := s.Exec("panic", func() interface{} {
panic(fmt.Errorf("message"))
})
// make sure val is of type StoreError
switch tv := val.(type) {
case StoreError:
t.CheckDeepEqual("internal error retrieving cached results for key panic: message", tv.Error())
default:
t.Fatalf("expected to retrieve result of type StoreError but found %T", tv)
}
})
}