Skip to content

Commit

Permalink
Replace util.SyncStore implementation to use singleflight and sync.Ma…
Browse files Browse the repository at this point in the history
…p. (#5016)

* warn and continue when ONBUILD image could not be retrieved

* wip

* Cache results of getDependencies

* fix race condition

* use go singleflight to ensure getDependency is called once and only once for every artifact

* use singleflight and sync.Map

* use singleflight and sync.Map

* fix tests

* use names return type

* remove prefix
  • Loading branch information
tejal29 authored Nov 13, 2020
1 parent be1c5e0 commit e2a4106
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 51 deletions.
33 changes: 21 additions & 12 deletions pkg/skaffold/build/cache/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,10 @@ func newArtifactHasher(artifacts build.ArtifactGraph, lister DependencyLister, m
}

func (h *artifactHasherImpl) hash(ctx context.Context, a *latest.Artifact) (string, error) {
val := h.syncStore.Exec(a.ImageName,
func() interface{} {
hash, err := singleArtifactHash(ctx, h.lister, a, h.mode)
if err != nil {
return err
}
return hash
})

if err, ok := val.(error); ok {
hash, err := h.safeHash(ctx, a)
if err != nil {
return "", err
}
hash := val.(string)

hashes := []string{hash}
for _, dep := range sortedDependencies(a, h.artifacts) {
depHash, err := h.hash(ctx, dep)
Expand All @@ -95,6 +85,25 @@ func (h *artifactHasherImpl) hash(ctx context.Context, a *latest.Artifact) (stri
return encode(hashes)
}

func (h *artifactHasherImpl) safeHash(ctx context.Context, a *latest.Artifact) (string, error) {
val := h.syncStore.Exec(a.ImageName,
func() interface{} {
hash, err := singleArtifactHash(ctx, h.lister, a, h.mode)
if err != nil {
return err
}
return hash
})
switch t := val.(type) {
case error:
return "", t
case string:
return t, nil
default:
return "", fmt.Errorf("internal error when retrieving cache result of type %T", t)
}
}

// singleArtifactHash calculates the hash for a single artifact, and ignores its required artifacts.
func singleArtifactHash(ctx context.Context, depLister DependencyLister, a *latest.Artifact, mode config.RunMode) (string, error) {
var inputs []string
Expand Down
32 changes: 11 additions & 21 deletions pkg/skaffold/docker/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,15 @@ import (
"os"
"path/filepath"
"sort"
"sync"

"github.com/docker/docker/builder/dockerignore"
"github.com/golang/groupcache/singleflight"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/walk"
)

var (
// sfGetDependencies ensures `GetDependencies` is called only once at any time
// for a given dockerfile.
// sfGetDependencies along with sync.Map will ensure no two concurrent processes read or
// write dependencies for a given dockerfile.
sfGetDependencies = singleflight.Group{}

dependencyCache = sync.Map{}
dependencyCache = util.NewSyncStore()
)

// NormalizeDockerfilePath returns the absolute path to the dockerfile.
Expand All @@ -62,21 +55,18 @@ func GetDependencies(ctx context.Context, workspace string, dockerfilePath strin
return nil, fmt.Errorf("normalizing dockerfile path: %w", err)
}

deps, _ := sfGetDependencies.Do(absDockerfilePath, func() (interface{}, error) {
if dep, ok := dependencyCache.Load(absDockerfilePath); ok {
return dep, nil
}
dep := getDependencies(workspace, dockerfilePath, absDockerfilePath, buildArgs, cfg)
dependencyCache.Store(absDockerfilePath, dep)
return dep, nil
deps := dependencyCache.Exec(absDockerfilePath, func() interface{} {
return getDependencies(workspace, dockerfilePath, absDockerfilePath, buildArgs, cfg)
})

if paths, ok := deps.([]string); ok {
return paths, nil
} else if err, ok := deps.(error); ok {
return nil, err
switch t := deps.(type) {
case error:
return nil, t
case []string:
return t, nil
default:
return nil, fmt.Errorf("internal error when retrieving cache result of type %T", t)
}
return nil, fmt.Errorf("unexpected skaffold internal error encountered converting dependencies to []string")
}

func getDependencies(workspace string, dockerfilePath string, absDockerfilePath string, buildArgs map[string]*string, cfg Config) interface{} {
Expand Down
9 changes: 5 additions & 4 deletions pkg/skaffold/docker/dependencies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"testing"

v1 "github.com/google/go-containerregistry/pkg/v1"
Expand Down Expand Up @@ -699,13 +698,15 @@ func TestGetDependenciesCached(t *testing.T) {
testutil.Run(t, test.description, func(t *testutil.T) {
t.Override(&RetrieveImage, test.retrieveImgMock)
t.Override(&util.OSEnviron, func() []string { return []string{} })
t.Override(&dependencyCache, util.NewSyncStore())

tmpDir := t.NewTempDir().Touch("server.go", "random.go")
tmpDir.Write("Dockerfile", copyServerGo)
// construct cache for abs dockerfile paths.
defer func() { dependencyCache = sync.Map{} }()

for k, v := range test.dependencyCache {
dependencyCache.Store(tmpDir.Path(k), v)
dependencyCache.Exec(tmpDir.Path(k), func() interface{} {
return v
})
}
deps, err := GetDependencies(context.Background(), tmpDir.Root(), "Dockerfile", map[string]*string{}, nil)
t.CheckErrorAndDeepEqual(test.shouldErr, err, test.expected, deps)
Expand Down
53 changes: 41 additions & 12 deletions pkg/skaffold/util/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,62 @@ limitations under the License.
package util

import (
"fmt"
"sync"

"github.com/golang/groupcache/singleflight"
)

// SyncStore exports a single method `Exec` to ensure single execution of a function and share the result between all callers of the function.
// SyncStore exports a single method `Exec` to ensure single execution of a function
// and share the result between all callers of the function.
type SyncStore struct {
oncePerKey *sync.Map
results *sync.Map
sf singleflight.Group
results sync.Map
}

// Exec executes the function f if and only if it's being called the first time for a specific key.
// If it's called multiple times for the same key only the first call will execute and store the result of f.
// All other calls will be blocked until the running instance of f returns and all of them receive the same result.
func (o *SyncStore) Exec(key interface{}, f func() interface{}) interface{} {
once, _ := o.oncePerKey.LoadOrStore(key, new(sync.Once))
once.(*sync.Once).Do(func() {
res := f()
o.results.Store(key, res)
func (o *SyncStore) Exec(key string, f func() interface{}) interface{} {
val, err := o.sf.Do(key, func() (_ interface{}, err error) {
// trap any runtime error due to synchronization issues.
defer func() {
if rErr := recover(); rErr != nil {
err = retrieveError(key, rErr)
}
}()
v, ok := o.results.Load(key)
if !ok {
v = f()
o.results.Store(key, v)
}
return v, nil
})

val, _ := o.results.Load(key)
if err != nil {
return err
}
return val
}

// NewSyncStore returns a new instance of `SyncStore`
func NewSyncStore() *SyncStore {
return &SyncStore{
oncePerKey: new(sync.Map),
results: new(sync.Map),
sf: singleflight.Group{},
results: sync.Map{},
}
}

// StoreError represent any error that when retrieving errors from the store.
type StoreError struct {
message string
}

func (e StoreError) Error() string {
return e.message
}

func retrieveError(key string, i interface{}) StoreError {
return StoreError{
message: fmt.Sprintf("internal error retrieving cached results for key %s: %v", key, i),
}
}
21 changes: 19 additions & 2 deletions pkg/skaffold/util/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package util

import (
"fmt"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand All @@ -25,7 +27,7 @@ import (
)

func TestSyncStore(t *testing.T) {
testutil.Run(t, "test util.once", func(t *testutil.T) {
testutil.Run(t, "test store", func(t *testutil.T) {
// This test runs a counter function twice for each key from [0, 5) and tests that the function only executes once for each key when called inside `once.Do` method.
counts := make([]int32, 5)
f := func(i int) int {
Expand All @@ -39,7 +41,8 @@ func TestSyncStore(t *testing.T) {
for i := 0; i < 5; i++ {
for j := 0; j < 2; j++ {
go func(i int) {
val := s.Exec(i, func() interface{} {
k := strconv.Itoa(i)
val := s.Exec(k, func() interface{} {
return f(i)
})
t.CheckDeepEqual(i, val)
Expand All @@ -54,4 +57,18 @@ func TestSyncStore(t *testing.T) {
}
}
})

testutil.Run(t, "test panic handled correctly", func(t *testutil.T) {
s := NewSyncStore()
val := s.Exec("panic", func() interface{} {
panic(fmt.Errorf("message"))
})
// make sure val is of type StoreError
switch tv := val.(type) {
case StoreError:
t.CheckDeepEqual("internal error retrieving cached results for key panic: message", tv.Error())
default:
t.Fatalf("expected to retrieve result of type StoreError but found %T", tv)
}
})
}

0 comments on commit e2a4106

Please sign in to comment.