From e16a7e3778bc0b5a756f308bd5e93a92bbffd129 Mon Sep 17 00:00:00 2001 From: "Philip K. Warren" Date: Mon, 27 Feb 2023 16:15:55 -0600 Subject: [PATCH] Update cache to use tamper proofing (#1847) Update the buf CLI's cache to take advantage of tamper proofing features (manifest, blobs, and digests) to store files as content addressable storage. When enabled, the new cache directory is stored as: ``` ~/.cache/buf/v2/module/{remote}/{owner}/{repo} blobs/ {digest[:2]}/ {digest[2:]} => The contents of the module's blob/manifest. commits/ {commit} => The manifest digest ``` --- .golangci.yml | 6 + private/buf/bufcli/bufcli.go | 120 +++++--- private/bufpkg/bufapimodule/module_reader.go | 12 +- private/bufpkg/bufmanifest/bucket.go | 22 +- private/bufpkg/bufmanifest/mapper.go | 31 ++ private/bufpkg/bufmodule/bufmodule.go | 30 +- .../bufmodulecache/bufmodulecache.go | 18 ++ .../bufmodulecache/bufmodulecache_test.go | 24 +- .../bufmodule/bufmodulecache/cache_stats.go | 48 +++ .../bufmodulecache/cas_module_cacher.go | 275 ++++++++++++++++++ .../bufmodulecache/cas_module_reader.go | 109 +++++++ .../bufmodulecache/cas_module_reader_test.go | 248 ++++++++++++++++ .../bufmodule/bufmodulecache/module_cacher.go | 12 + .../bufmodule/bufmodulecache/module_reader.go | 54 +--- .../bufpkg/bufmodule/bufmodulecache/util.go | 35 +++ private/bufpkg/bufmodule/module.go | 35 +++ private/pkg/manifest/manifest.go | 23 +- private/pkg/manifest/manifest_test.go | 35 ++- private/pkg/manifest/module.go | 7 + private/pkg/manifest/storage.go | 4 +- private/pkg/manifest/storage_test.go | 14 +- private/pkg/storage/bucket.go | 15 + private/pkg/storage/storagemem/bucket.go | 1 + private/pkg/storage/storageos/bucket.go | 42 ++- 24 files changed, 1062 insertions(+), 158 deletions(-) create mode 100644 private/bufpkg/bufmodule/bufmodulecache/cache_stats.go create mode 100644 private/bufpkg/bufmodule/bufmodulecache/cas_module_cacher.go create mode 100644 private/bufpkg/bufmodule/bufmodulecache/cas_module_reader.go create mode 100644 private/bufpkg/bufmodule/bufmodulecache/cas_module_reader_test.go diff --git a/.golangci.yml b/.golangci.yml index ec9ad62820..0e35999d1a 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -82,6 +82,12 @@ issues: - forbidigo # this is the one file we want to allow exec.Cmd functions in path: private/pkg/command/runner.go + - linters: + - forbidigo + # we use os.Rename here to rename files in the same directory + # This is safe (we aren't traversing filesystem boundaries). + path: private/pkg/storage/storageos/bucket.go + text: "os.Rename" - linters: - stylecheck text: "ST1005:" diff --git a/private/buf/bufcli/bufcli.go b/private/buf/bufcli/bufcli.go index a6659ded08..9b931337db 100644 --- a/private/buf/bufcli/bufcli.go +++ b/private/buf/bufcli/bufcli.go @@ -121,6 +121,7 @@ var ( v1CacheModuleDataRelDirPath, v1CacheModuleLockRelDirPath, v1CacheModuleSumRelDirPath, + v2CacheModuleRelDirPath, } // ErrNotATTY is returned when an input io.Reader is not a TTY where it is expected. @@ -154,6 +155,12 @@ var ( // These digests are used to make sure that the data written is actually what we expect, and if it is not, // we clear an entry from the cache, i.e. delete the relevant data directory. v1CacheModuleSumRelDirPath = normalpath.Join("v1", "module", "sum") + // v2CacheModuleRelDirPath is the relative path to the cache directory for content addressable storage. + // + // Normalized. + // This directory replaces the use of v1CacheModuleDataRelDirPath, v1CacheModuleLockRelDirPath, and + // v1CacheModuleSumRelDirPath for modules which support tamper proofing. + v2CacheModuleRelDirPath = normalpath.Join("v2", "module") // allVisibiltyStrings are the possible options that a user can set the visibility flag with. allVisibiltyStrings = []string{ @@ -546,62 +553,81 @@ func newModuleReaderAndCreateCacheDirs( clientConfig *connectclient.Config, cacheModuleReaderOpts ...bufmodulecache.ModuleReaderOption, ) (bufmodule.ModuleReader, error) { - cacheModuleDataDirPath := normalpath.Join(container.CacheDirPath(), v1CacheModuleDataRelDirPath) - cacheModuleLockDirPath := normalpath.Join(container.CacheDirPath(), v1CacheModuleLockRelDirPath) - cacheModuleSumDirPath := normalpath.Join(container.CacheDirPath(), v1CacheModuleSumRelDirPath) - if err := checkExistingCacheDirs( - container.CacheDirPath(), - container.CacheDirPath(), - cacheModuleDataDirPath, - cacheModuleLockDirPath, - cacheModuleSumDirPath, - ); err != nil { - return nil, err - } - if err := createCacheDirs( - cacheModuleDataDirPath, - cacheModuleLockDirPath, - cacheModuleSumDirPath, - ); err != nil { - return nil, err - } - storageosProvider := storageos.NewProvider(storageos.ProviderWithSymlinks()) - // do NOT want to enable symlinks for our cache - dataReadWriteBucket, err := storageosProvider.NewReadWriteBucket(cacheModuleDataDirPath) + cacheModuleDataDirPathV1 := normalpath.Join(container.CacheDirPath(), v1CacheModuleDataRelDirPath) + cacheModuleLockDirPathV1 := normalpath.Join(container.CacheDirPath(), v1CacheModuleLockRelDirPath) + cacheModuleSumDirPathV1 := normalpath.Join(container.CacheDirPath(), v1CacheModuleSumRelDirPath) + cacheModuleDirPathV2 := normalpath.Join(container.CacheDirPath(), v2CacheModuleRelDirPath) + // Check if tamper proofing env var is enabled + tamperProofingEnabled, err := IsBetaTamperProofingEnabled(container) if err != nil { return nil, err } - // do NOT want to enable symlinks for our cache - sumReadWriteBucket, err := storageosProvider.NewReadWriteBucket(cacheModuleSumDirPath) - if err != nil { - return nil, err + var cacheDirsToCreate []string + if tamperProofingEnabled { + cacheDirsToCreate = append(cacheDirsToCreate, cacheModuleDirPathV2) + } else { + cacheDirsToCreate = append( + cacheDirsToCreate, + cacheModuleDataDirPathV1, + cacheModuleLockDirPathV1, + cacheModuleSumDirPathV1, + ) } - fileLocker, err := filelock.NewLocker(cacheModuleLockDirPath) - if err != nil { + if err := checkExistingCacheDirs(container.CacheDirPath(), cacheDirsToCreate...); err != nil { return nil, err } - var moduleReaderOpts []bufapimodule.ModuleReaderOption - // Check if tamper proofing env var is enabled - tamperProofingEnabled, err := IsBetaTamperProofingEnabled(container) - if err != nil { + if err := createCacheDirs(cacheDirsToCreate...); err != nil { return nil, err } + var moduleReaderOpts []bufapimodule.ModuleReaderOption if tamperProofingEnabled { moduleReaderOpts = append(moduleReaderOpts, bufapimodule.WithTamperProofing()) } - moduleReader := bufmodulecache.NewModuleReader( - container.Logger(), - container.VerbosePrinter(), - fileLocker, - dataReadWriteBucket, - sumReadWriteBucket, - bufapimodule.NewModuleReader( - bufapimodule.NewDownloadServiceClientFactory(clientConfig), - moduleReaderOpts..., - ), - bufmodulecache.NewRepositoryServiceClientFactory(clientConfig), - cacheModuleReaderOpts..., + delegateReader := bufapimodule.NewModuleReader( + bufapimodule.NewDownloadServiceClientFactory(clientConfig), + moduleReaderOpts..., ) + repositoryClientFactory := bufmodulecache.NewRepositoryServiceClientFactory(clientConfig) + storageosProvider := storageos.NewProvider(storageos.ProviderWithSymlinks()) + var moduleReader bufmodule.ModuleReader + if tamperProofingEnabled { + casModuleBucket, err := storageosProvider.NewReadWriteBucket(cacheModuleDirPathV2) + if err != nil { + return nil, err + } + moduleReader = bufmodulecache.NewCASModuleReader( + container.Logger(), + container.VerbosePrinter(), + casModuleBucket, + delegateReader, + repositoryClientFactory, + ) + } else { + // do NOT want to enable symlinks for our cache + dataReadWriteBucket, err := storageosProvider.NewReadWriteBucket(cacheModuleDataDirPathV1) + if err != nil { + return nil, err + } + // do NOT want to enable symlinks for our cache + sumReadWriteBucket, err := storageosProvider.NewReadWriteBucket(cacheModuleSumDirPathV1) + if err != nil { + return nil, err + } + fileLocker, err := filelock.NewLocker(cacheModuleLockDirPathV1) + if err != nil { + return nil, err + } + moduleReader = bufmodulecache.NewModuleReader( + container.Logger(), + container.VerbosePrinter(), + fileLocker, + dataReadWriteBucket, + sumReadWriteBucket, + delegateReader, + repositoryClientFactory, + cacheModuleReaderOpts..., + ) + } return moduleReader, nil } @@ -1040,7 +1066,11 @@ func newFetchImageReader( } func checkExistingCacheDirs(baseCacheDirPath string, dirPaths ...string) error { - for _, dirPath := range dirPaths { + dirPathsToCheck := make([]string, 0, len(dirPaths)+1) + // Check base cache directory in addition to subdirectories + dirPathsToCheck = append(dirPathsToCheck, baseCacheDirPath) + dirPathsToCheck = append(dirPathsToCheck, dirPaths...) + for _, dirPath := range dirPathsToCheck { dirPath = normalpath.Unnormalize(dirPath) // OK to use os.Stat instead of os.LStat here as this is CLI-only fileInfo, err := os.Stat(dirPath) diff --git a/private/bufpkg/bufapimodule/module_reader.go b/private/bufpkg/bufapimodule/module_reader.go index bc0cf6fc1a..b9b0dac42f 100644 --- a/private/bufpkg/bufapimodule/module_reader.go +++ b/private/bufpkg/bufapimodule/module_reader.go @@ -67,15 +67,15 @@ func (m *moduleReader) GetModule(ctx context.Context, modulePin bufmoduleref.Mod return nil, errors.New("expected non-nil manifest with tamper proofing enabled") } // use manifest and blobs - bucket, err := bufmanifest.NewBucketFromManifestBlobs( - ctx, - resp.Manifest, - resp.Blobs, - ) + moduleManifest, err := bufmanifest.NewManifestFromProto(ctx, resp.Manifest) if err != nil { return nil, err } - return bufmodule.NewModuleForBucket(ctx, bucket, identityAndCommitOpt) + blobSet, err := bufmanifest.NewBlobSetFromProto(ctx, resp.Blobs) + if err != nil { + return nil, err + } + return bufmodule.NewModuleForManifestAndBlobSet(ctx, moduleManifest, blobSet) } resp, err := m.download(ctx, modulePin) if err != nil { diff --git a/private/bufpkg/bufmanifest/bucket.go b/private/bufpkg/bufmanifest/bucket.go index 3fa75dd545..3bdef97518 100644 --- a/private/bufpkg/bufmanifest/bucket.go +++ b/private/bufpkg/bufmanifest/bucket.go @@ -15,7 +15,6 @@ package bufmanifest import ( - "bytes" "context" "fmt" @@ -34,26 +33,13 @@ func NewBucketFromManifestBlobs( manifestBlob *modulev1alpha1.Blob, blobs []*modulev1alpha1.Blob, ) (storage.ReadBucket, error) { - if _, err := NewBlobFromProto(manifestBlob); err != nil { - return nil, fmt.Errorf("invalid manifest: %w", err) - } - parsedManifest, err := manifest.NewFromReader( - bytes.NewReader(manifestBlob.Content), - ) + parsedManifest, err := NewManifestFromProto(ctx, manifestBlob) if err != nil { - return nil, fmt.Errorf("parse manifest content: %w", err) - } - var memBlobs []manifest.Blob - for i, modBlob := range blobs { - memBlob, err := NewBlobFromProto(modBlob) - if err != nil { - return nil, fmt.Errorf("invalid blob at index %d: %w", i, err) - } - memBlobs = append(memBlobs, memBlob) + return nil, err } - blobSet, err := manifest.NewBlobSet(ctx, memBlobs) + blobSet, err := NewBlobSetFromProto(ctx, blobs) if err != nil { - return nil, fmt.Errorf("invalid blobs: %w", err) + return nil, err } manifestBucket, err := manifest.NewBucket( *parsedManifest, diff --git a/private/bufpkg/bufmanifest/mapper.go b/private/bufpkg/bufmanifest/mapper.go index a8325231fd..f8d3685ec2 100644 --- a/private/bufpkg/bufmanifest/mapper.go +++ b/private/bufpkg/bufmanifest/mapper.go @@ -71,6 +71,37 @@ func AsProtoBlob(ctx context.Context, b manifest.Blob) (_ *modulev1alpha1.Blob, }, nil } +// NewManifestFromProto returns a Manifest from a proto module blob. It makes sure the +// digest and content matches. +func NewManifestFromProto(ctx context.Context, b *modulev1alpha1.Blob) (_ *manifest.Manifest, retErr error) { + blob, err := NewBlobFromProto(b) + if err != nil { + return nil, fmt.Errorf("invalid manifest: %w", err) + } + r, err := blob.Open(ctx) + if err != nil { + return nil, err + } + defer func() { + retErr = multierr.Append(retErr, r.Close()) + }() + return manifest.NewFromReader(r) +} + +// NewBlobSetFromProto returns a BlobSet from a slice of proto module blobs. +// It makes sure the digest and content matches for each blob. +func NewBlobSetFromProto(ctx context.Context, blobs []*modulev1alpha1.Blob) (*manifest.BlobSet, error) { + var memBlobs []manifest.Blob + for i, modBlob := range blobs { + memBlob, err := NewBlobFromProto(modBlob) + if err != nil { + return nil, fmt.Errorf("invalid blob at index %d: %w", i, err) + } + memBlobs = append(memBlobs, memBlob) + } + return manifest.NewBlobSet(ctx, memBlobs) +} + // NewBlobFromProto returns a Blob from a proto module blob. It makes sure the // digest and content matches. func NewBlobFromProto(b *modulev1alpha1.Blob) (manifest.Blob, error) { diff --git a/private/bufpkg/bufmodule/bufmodule.go b/private/bufpkg/bufmodule/bufmodule.go index 5c93d9c70f..0a35a60bbb 100644 --- a/private/bufpkg/bufmodule/bufmodule.go +++ b/private/bufpkg/bufmodule/bufmodule.go @@ -28,6 +28,7 @@ import ( breakingv1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/breaking/v1" lintv1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/lint/v1" modulev1alpha1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/module/v1alpha1" + "github.com/bufbuild/buf/private/pkg/manifest" "github.com/bufbuild/buf/private/pkg/storage" "go.uber.org/multierr" ) @@ -110,6 +111,21 @@ type Module interface { // // This may be nil, since older versions of the module would not have this stored. LintConfig() *buflintconfig.Config + // Manifest returns the manifest for the module (possibly nil). + // A manifest's contents contain a lexicographically sorted list of path names along + // with each path's digest. The manifest also stores a digest of its own contents which + // allows verification of the entire Buf module. In addition to the .proto files in + // the module, it also lists the buf.yaml, LICENSE, buf.md, and buf.lock files (if + // present). + Manifest() *manifest.Manifest + // BlobSet returns the raw data for the module (possibly nil). + // Each blob in the blob set is indexed by the digest of the blob's contents. For + // example, the buf.yaml file will be listed in the Manifest with a given digest, + // whose contents can be retrieved by looking up the corresponding digest in the + // blob set. This allows API consumers to get access to the original file contents + // of every file in the module, which is useful for caching or recreating a module's + // original files. + BlobSet() *manifest.BlobSet getSourceReadBucket() storage.ReadBucket // Note this *can* be nil if we did not build from a named module. @@ -146,7 +162,7 @@ func ModuleWithModuleIdentityAndCommit(moduleIdentity bufmoduleref.ModuleIdentit } } -// NewModuleForBucket returns a new Module. It attempts reads dependencies +// NewModuleForBucket returns a new Module. It attempts to read dependencies // from a lock file in the read bucket. func NewModuleForBucket( ctx context.Context, @@ -165,6 +181,16 @@ func NewModuleForProto( return newModuleForProto(ctx, protoModule, options...) } +// NewModuleForManifestAndBlobSet returns a new Module given the manifest and blob set. +func NewModuleForManifestAndBlobSet( + ctx context.Context, + manifest *manifest.Manifest, + blobSet *manifest.BlobSet, + options ...ModuleOption, +) (Module, error) { + return newModuleForManifestAndBlobSet(ctx, manifest, blobSet, options...) +} + // ModuleWithTargetPaths returns a new Module that specifies specific file or directory paths to build. // // These paths must exist. @@ -237,7 +263,7 @@ func NewNopModuleResolver() ModuleResolver { type ModuleReader interface { // GetModule gets the Module for the ModulePin. // - // Returns an error that fufills storage.IsNotExist if the Module does not exist. + // Returns an error that fulfills storage.IsNotExist if the Module does not exist. GetModule(ctx context.Context, modulePin bufmoduleref.ModulePin) (Module, error) } diff --git a/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache.go b/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache.go index a28e367b95..a212b9d2b1 100644 --- a/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache.go +++ b/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache.go @@ -67,6 +67,24 @@ func ModuleReaderWithExternalPaths() ModuleReaderOption { } } +// NewCASModuleReader creates a new module reader using content addressable storage. +// This doesn't require file locking and enables support for tamper proofing. +func NewCASModuleReader( + logger *zap.Logger, + verbosePrinter verbose.Printer, + bucket storage.ReadWriteBucket, + delegate bufmodule.ModuleReader, + repositoryClientFactory RepositoryServiceClientFactory, +) bufmodule.ModuleReader { + return newCASModuleReader( + bucket, + delegate, + repositoryClientFactory, + logger, + verbosePrinter, + ) +} + type moduleReaderOptions struct { allowCacheExternalPaths bool } diff --git a/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache_test.go b/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache_test.go index 0e77a18e1f..8104d3d312 100644 --- a/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache_test.go +++ b/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache_test.go @@ -127,8 +127,8 @@ func TestReaderBasic(t *testing.T) { getModule, err = moduleReader.GetModule(ctx, modulePin) require.NoError(t, err) testFile1HasNoExternalPath(t, ctx, getModule) - require.Equal(t, 2, moduleReader.getCount()) - require.Equal(t, 1, moduleReader.getCacheHits()) + require.Equal(t, 2, moduleReader.stats.Count()) + require.Equal(t, 1, moduleReader.stats.Hits()) // put some data that will not match the sum and make sure that we have a cache miss require.NoError(t, storage.PutPath(ctx, mainDataReadWriteBucket, normalpath.Join(newCacheKey(modulePin), "1234.proto"), []byte("foo"))) @@ -142,13 +142,13 @@ func TestReaderBasic(t *testing.T) { diff, err = storage.DiffBytes(ctx, runner, readBucket, filteredReadBucket) require.NoError(t, err) require.Empty(t, string(diff)) - require.Equal(t, 3, moduleReader.getCount()) - require.Equal(t, 1, moduleReader.getCacheHits()) + require.Equal(t, 3, moduleReader.stats.Count()) + require.Equal(t, 1, moduleReader.stats.Hits()) _, err = moduleReader.GetModule(ctx, modulePin) require.NoError(t, err) - require.Equal(t, 4, moduleReader.getCount()) - require.Equal(t, 2, moduleReader.getCacheHits()) + require.Equal(t, 4, moduleReader.stats.Count()) + require.Equal(t, 2, moduleReader.stats.Hits()) // overwrite the sum file and make sure that we have a cache miss require.NoError(t, storage.PutPath(ctx, mainSumReadWriteBucket, newCacheKey(modulePin), []byte("foo"))) @@ -162,13 +162,13 @@ func TestReaderBasic(t *testing.T) { diff, err = storage.DiffBytes(ctx, runner, readBucket, filteredReadBucket) require.NoError(t, err) require.Empty(t, string(diff)) - require.Equal(t, 5, moduleReader.getCount()) - require.Equal(t, 2, moduleReader.getCacheHits()) + require.Equal(t, 5, moduleReader.stats.Count()) + require.Equal(t, 2, moduleReader.stats.Hits()) _, err = moduleReader.GetModule(ctx, modulePin) require.NoError(t, err) - require.Equal(t, 6, moduleReader.getCount()) - require.Equal(t, 3, moduleReader.getCacheHits()) + require.Equal(t, 6, moduleReader.stats.Count()) + require.Equal(t, 3, moduleReader.stats.Hits()) // delete the sum file and make sure that we have a cache miss require.NoError(t, mainSumReadWriteBucket.Delete(ctx, newCacheKey(modulePin))) @@ -182,8 +182,8 @@ func TestReaderBasic(t *testing.T) { diff, err = storage.DiffBytes(ctx, runner, readBucket, filteredReadBucket) require.NoError(t, err) require.Empty(t, string(diff)) - require.Equal(t, 7, moduleReader.getCount()) - require.Equal(t, 3, moduleReader.getCacheHits()) + require.Equal(t, 7, moduleReader.stats.Count()) + require.Equal(t, 3, moduleReader.stats.Hits()) require.Equal(t, 4, observedLogs.Filter(func(entry observer.LoggedEntry) bool { return strings.Contains(entry.Message, deprecationMessage) }).Len()) diff --git a/private/bufpkg/bufmodule/bufmodulecache/cache_stats.go b/private/bufpkg/bufmodule/bufmodulecache/cache_stats.go new file mode 100644 index 0000000000..82ba23e636 --- /dev/null +++ b/private/bufpkg/bufmodule/bufmodulecache/cache_stats.go @@ -0,0 +1,48 @@ +// Copyright 2020-2023 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bufmodulecache + +import "sync" + +type cacheStats struct { + lock sync.RWMutex + count int + hits int +} + +func (s *cacheStats) MarkHit() { + s.lock.Lock() + defer s.lock.Unlock() + s.count++ + s.hits++ +} + +func (s *cacheStats) MarkMiss() { + s.lock.Lock() + defer s.lock.Unlock() + s.count++ +} + +func (s *cacheStats) Count() int { + s.lock.RLock() + defer s.lock.RUnlock() + return s.count +} + +func (s *cacheStats) Hits() int { + s.lock.RLock() + defer s.lock.RUnlock() + return s.hits +} diff --git a/private/bufpkg/bufmodule/bufmodulecache/cas_module_cacher.go b/private/bufpkg/bufmodule/bufmodulecache/cas_module_cacher.go new file mode 100644 index 0000000000..30e2cbb292 --- /dev/null +++ b/private/bufpkg/bufmodule/bufmodulecache/cas_module_cacher.go @@ -0,0 +1,275 @@ +// Copyright 2020-2023 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bufmodulecache + +import ( + "context" + "errors" + "fmt" + "io" + "strings" + + "github.com/bufbuild/buf/private/bufpkg/bufmodule" + "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduleref" + "github.com/bufbuild/buf/private/pkg/manifest" + "github.com/bufbuild/buf/private/pkg/normalpath" + "github.com/bufbuild/buf/private/pkg/storage" + "go.uber.org/multierr" + "go.uber.org/zap" +) + +// subdirectories under ~/.cache/buf/v2/{remote}/{owner}/{repo} +const ( + blobsDir = "blobs" + commitsDir = "commits" +) + +type casModuleCacher struct { + logger *zap.Logger + bucket storage.ReadWriteBucket +} + +var _ moduleCache = (*casModuleCacher)(nil) + +func (c *casModuleCacher) GetModule( + ctx context.Context, + modulePin bufmoduleref.ModulePin, +) (_ bufmodule.Module, retErr error) { + moduleBasedir := normalpath.Join(modulePin.Remote(), modulePin.Owner(), modulePin.Repository()) + manifestDigestStr := modulePin.Digest() + if manifestDigestStr == "" { + // Attempt to look up manifest digest from commit + commitPath := normalpath.Join(moduleBasedir, commitsDir, modulePin.Commit()) + manifestDigestBytes, err := c.loadPath(ctx, commitPath) + if err != nil { + return nil, err + } + manifestDigestStr = string(manifestDigestBytes) + } + manifestDigest, err := manifest.NewDigestFromString(manifestDigestStr) + if err != nil { + return nil, err + } + manifestFromCache, err := c.readManifest(ctx, moduleBasedir, manifestDigest) + if err != nil { + return nil, err + } + var blobs []manifest.Blob + blobDigests := make(map[string]struct{}) + for _, path := range manifestFromCache.Paths() { + digest, found := manifestFromCache.DigestFor(path) + if !found { + return nil, fmt.Errorf("digest not found for path: %s", path) + } + if _, ok := blobDigests[digest.String()]; ok { + // We've already loaded this blob + continue + } + blob, err := c.readBlob(ctx, moduleBasedir, digest) + if err != nil { + return nil, err + } + blobs = append(blobs, blob) + } + blobSet, err := manifest.NewBlobSet(ctx, blobs) + if err != nil { + return nil, err + } + return bufmodule.NewModuleForManifestAndBlobSet(ctx, manifestFromCache, blobSet) +} + +func (c *casModuleCacher) PutModule( + ctx context.Context, + modulePin bufmoduleref.ModulePin, + module bufmodule.Module, +) (retErr error) { + moduleManifest := module.Manifest() + if moduleManifest == nil { + return fmt.Errorf("manifest must be non-nil") + } + manifestBlob, err := moduleManifest.Blob() + if err != nil { + return err + } + digest := manifestBlob.Digest() + if digest == nil { + return errors.New("empty manifest digest") + } + if modulePinDigestEncoded := modulePin.Digest(); modulePinDigestEncoded != "" { + modulePinDigest, err := manifest.NewDigestFromString(modulePinDigestEncoded) + if err != nil { + return fmt.Errorf("invalid digest %q: %w", modulePinDigestEncoded, err) + } + if !digest.Equal(*modulePinDigest) { + return fmt.Errorf("manifest digest mismatch: expected=%q, found=%q", modulePinDigest.String(), digest.String()) + } + } + moduleBasedir := normalpath.Join(modulePin.Remote(), modulePin.Owner(), modulePin.Repository()) + // Write blobs + writtenDigests := make(map[string]struct{}) + for _, path := range moduleManifest.Paths() { + blobDigest, found := moduleManifest.DigestFor(path) + if !found { + return fmt.Errorf("failed to find digest for path=%q", path) + } + blobDigestStr := blobDigest.String() + if _, ok := writtenDigests[blobDigestStr]; ok { + continue + } + blob, found := module.BlobSet().BlobFor(blobDigestStr) + if !found { + return fmt.Errorf("blob not found for path=%q, digest=%q", path, blobDigestStr) + } + if err := c.writeBlob(ctx, moduleBasedir, blob); err != nil { + return err + } + writtenDigests[blobDigestStr] = struct{}{} + } + // Write manifest + if err := c.writeBlob(ctx, moduleBasedir, manifestBlob); err != nil { + return err + } + // Write commit + commitPath := normalpath.Join(moduleBasedir, commitsDir, modulePin.Commit()) + if err := c.atomicWrite(ctx, strings.NewReader(manifestBlob.Digest().String()), commitPath); err != nil { + return err + } + return nil +} + +func (c *casModuleCacher) readBlob( + ctx context.Context, + moduleBasedir string, + digest *manifest.Digest, +) (_ manifest.Blob, retErr error) { + hexDigest := digest.Hex() + blobPath := normalpath.Join(moduleBasedir, blobsDir, hexDigest[:2], hexDigest[2:]) + contents, err := c.loadPath(ctx, blobPath) + if err != nil { + return nil, err + } + blob, err := manifest.NewMemoryBlob(*digest, contents, manifest.MemoryBlobWithDigestValidation()) + if err != nil { + return nil, fmt.Errorf("failed to create blob from path %s: %w", blobPath, err) + } + return blob, nil +} + +func (c *casModuleCacher) validateBlob( + ctx context.Context, + moduleBasedir string, + digest *manifest.Digest, +) (bool, error) { + hexDigest := digest.Hex() + blobPath := normalpath.Join(moduleBasedir, blobsDir, hexDigest[:2], hexDigest[2:]) + f, err := c.bucket.Get(ctx, blobPath) + if err != nil { + return false, err + } + defer func() { + if err := f.Close(); err != nil { + c.logger.Debug("err closing blob", zap.Error(err)) + } + }() + digester, err := manifest.NewDigester(digest.Type()) + if err != nil { + return false, err + } + cacheDigest, err := digester.Digest(f) + if err != nil { + return false, err + } + return digest.Equal(*cacheDigest), nil +} + +func (c *casModuleCacher) readManifest( + ctx context.Context, + moduleBasedir string, + manifestDigest *manifest.Digest, +) (_ *manifest.Manifest, retErr error) { + blob, err := c.readBlob(ctx, moduleBasedir, manifestDigest) + if err != nil { + return nil, err + } + f, err := blob.Open(ctx) + if err != nil { + return nil, err + } + defer func() { + retErr = multierr.Append(retErr, f.Close()) + }() + moduleManifest, err := manifest.NewFromReader(f) + if err != nil { + return nil, fmt.Errorf("failed to read manifest %s: %w", manifestDigest.String(), err) + } + return moduleManifest, nil +} + +func (c *casModuleCacher) writeBlob( + ctx context.Context, + moduleBasedir string, + blob manifest.Blob, +) (retErr error) { + // Avoid unnecessary write if the blob is already written to disk + valid, err := c.validateBlob(ctx, moduleBasedir, blob.Digest()) + if err == nil && valid { + return nil + } + if !storage.IsNotExist(err) { + c.logger.Debug( + "repairing cache entry", + zap.String("basedir", moduleBasedir), + zap.String("digest", blob.Digest().String()), + ) + } + contents, err := blob.Open(ctx) + if err != nil { + return err + } + defer func() { + retErr = multierr.Append(retErr, contents.Close()) + }() + hexDigest := blob.Digest().Hex() + blobPath := normalpath.Join(moduleBasedir, blobsDir, hexDigest[:2], hexDigest[2:]) + return c.atomicWrite(ctx, contents, blobPath) +} + +func (c *casModuleCacher) atomicWrite(ctx context.Context, contents io.Reader, path string) (retErr error) { + f, err := c.bucket.Put(ctx, path, storage.PutWithAtomic()) + if err != nil { + return err + } + defer func() { + retErr = multierr.Append(retErr, f.Close()) + }() + if _, err := io.Copy(f, contents); err != nil { + return err + } + return nil +} + +func (c *casModuleCacher) loadPath( + ctx context.Context, + path string, +) (_ []byte, retErr error) { + f, err := c.bucket.Get(ctx, path) + if err != nil { + return nil, err + } + defer func() { + retErr = multierr.Append(retErr, f.Close()) + }() + return io.ReadAll(f) +} diff --git a/private/bufpkg/bufmodule/bufmodulecache/cas_module_reader.go b/private/bufpkg/bufmodule/bufmodulecache/cas_module_reader.go new file mode 100644 index 0000000000..87fdc8c5d3 --- /dev/null +++ b/private/bufpkg/bufmodule/bufmodulecache/cas_module_reader.go @@ -0,0 +1,109 @@ +// Copyright 2020-2023 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bufmodulecache + +import ( + "context" + "fmt" + + "github.com/bufbuild/buf/private/bufpkg/bufmodule" + "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduleref" + "github.com/bufbuild/buf/private/pkg/manifest" + "github.com/bufbuild/buf/private/pkg/storage" + "github.com/bufbuild/buf/private/pkg/verbose" + "go.uber.org/zap" +) + +type casModuleReader struct { + // required parameters + delegate bufmodule.ModuleReader + repositoryClientFactory RepositoryServiceClientFactory + logger *zap.Logger + verbosePrinter verbose.Printer + // initialized in newCASModuleReader + cache *casModuleCacher + stats *cacheStats +} + +var _ bufmodule.ModuleReader = (*casModuleReader)(nil) + +func newCASModuleReader( + bucket storage.ReadWriteBucket, + delegate bufmodule.ModuleReader, + repositoryClientFactory RepositoryServiceClientFactory, + logger *zap.Logger, + verbosePrinter verbose.Printer, +) *casModuleReader { + return &casModuleReader{ + delegate: delegate, + repositoryClientFactory: repositoryClientFactory, + logger: logger, + verbosePrinter: verbosePrinter, + cache: &casModuleCacher{ + logger: logger, + bucket: bucket, + }, + stats: &cacheStats{}, + } +} + +func (c *casModuleReader) GetModule( + ctx context.Context, + modulePin bufmoduleref.ModulePin, +) (bufmodule.Module, error) { + var modulePinDigest *manifest.Digest + if digest := modulePin.Digest(); digest != "" { + var err error + modulePinDigest, err = manifest.NewDigestFromString(digest) + // Fail fast if the buf.lock file contains a malformed digest + if err != nil { + return nil, fmt.Errorf("malformed module digest %q: %w", digest, err) + } + } + cachedModule, err := c.cache.GetModule(ctx, modulePin) + if err == nil { + c.stats.MarkHit() + return cachedModule, nil + } + c.logger.Debug("module cache miss", zap.Error(err)) + c.stats.MarkMiss() + remoteModule, err := c.delegate.GetModule(ctx, modulePin) + if err != nil { + return nil, err + } + // Manifest and BlobSet should always be set if tamper proofing is enabled. + // If not, the BSR doesn't support tamper proofing while the CLI feature is enabled. + if remoteModule.Manifest() == nil || remoteModule.BlobSet() == nil { + return nil, fmt.Errorf("required manifest/blobSet not set on module") + } + if modulePinDigest != nil { + manifestBlob, err := remoteModule.Manifest().Blob() + if err != nil { + return nil, err + } + manifestDigest := manifestBlob.Digest() + if !modulePinDigest.Equal(*manifestDigest) { + // buf.lock module digest and BSR module don't match - fail without overwriting cache + return nil, fmt.Errorf("module digest mismatch - expected: %q, found: %q", modulePinDigest, manifestDigest) + } + } + if err := c.cache.PutModule(ctx, modulePin, remoteModule); err != nil { + return nil, err + } + if err := warnIfDeprecated(ctx, c.repositoryClientFactory, modulePin, c.logger); err != nil { + return nil, err + } + return remoteModule, nil +} diff --git a/private/bufpkg/bufmodule/bufmodulecache/cas_module_reader_test.go b/private/bufpkg/bufmodule/bufmodulecache/cas_module_reader_test.go new file mode 100644 index 0000000000..9820d74f19 --- /dev/null +++ b/private/bufpkg/bufmodule/bufmodulecache/cas_module_reader_test.go @@ -0,0 +1,248 @@ +// Copyright 2020-2023 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bufmodulecache + +import ( + "bytes" + "context" + "io" + "strings" + "testing" + "time" + + "github.com/bufbuild/buf/private/bufpkg/bufmodule" + "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduleref" + "github.com/bufbuild/buf/private/gen/proto/connect/buf/alpha/registry/v1alpha1/registryv1alpha1connect" + registryv1alpha1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/registry/v1alpha1" + "github.com/bufbuild/buf/private/pkg/manifest" + "github.com/bufbuild/buf/private/pkg/normalpath" + "github.com/bufbuild/buf/private/pkg/storage" + "github.com/bufbuild/buf/private/pkg/storage/storageos" + "github.com/bufbuild/buf/private/pkg/verbose" + "github.com/bufbuild/connect-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +const pingProto = `syntax = "proto3"; + +package connect.ping.v1; + +message PingRequest { + int64 number = 1; + string text = 2; +} + +message PingResponse { + int64 number = 1; + string text = 2; +} + +service PingService { + rpc Ping(PingRequest) returns (PingResponse) {} +} +` + +func TestCASModuleReaderHappyPath(t *testing.T) { + t.Parallel() + moduleManifest, blobs := createSampleManifestAndBlobs(t) + moduleBlob, err := moduleManifest.Blob() + require.NoError(t, err) + testModule, err := bufmodule.NewModuleForManifestAndBlobSet(context.Background(), moduleManifest, blobs) + require.NoError(t, err) + storageProvider := storageos.NewProvider() + storageBucket, err := storageProvider.NewReadWriteBucket(t.TempDir()) + require.NoError(t, err) + + moduleReader := newCASModuleReader(storageBucket, &testModuleReader{module: testModule}, func(_ string) registryv1alpha1connect.RepositoryServiceClient { + return &testRepositoryServiceClient{} + }, zaptest.NewLogger(t), &testVerbosePrinter{t: t}) + pin, err := bufmoduleref.NewModulePin( + "buf.build", + "test", + "ping", + "", + "abcd", + moduleBlob.Digest().String(), + time.Now(), + ) + require.NoError(t, err) + _, err = moduleReader.GetModule(context.Background(), pin) + require.NoError(t, err) + assert.Equal(t, 1, moduleReader.stats.Count()) + assert.Equal(t, 0, moduleReader.stats.Hits()) + verifyCache(t, storageBucket, pin, moduleManifest, blobs) + + _, err = moduleReader.GetModule(context.Background(), pin) + require.NoError(t, err) + assert.Equal(t, 2, moduleReader.stats.Count()) + assert.Equal(t, 1, moduleReader.stats.Hits()) // We should have a cache hit the second time + verifyCache(t, storageBucket, pin, moduleManifest, blobs) +} + +func TestCASModuleReaderNoDigest(t *testing.T) { + t.Parallel() + moduleManifest, blobs := createSampleManifestAndBlobs(t) + testModule, err := bufmodule.NewModuleForManifestAndBlobSet(context.Background(), moduleManifest, blobs) + require.NoError(t, err) + storageProvider := storageos.NewProvider() + storageBucket, err := storageProvider.NewReadWriteBucket(t.TempDir()) + require.NoError(t, err) + moduleReader := newCASModuleReader(storageBucket, &testModuleReader{module: testModule}, func(_ string) registryv1alpha1connect.RepositoryServiceClient { + return &testRepositoryServiceClient{} + }, zaptest.NewLogger(t), &testVerbosePrinter{t: t}) + pin, err := bufmoduleref.NewModulePin( + "buf.build", + "test", + "ping", + "", + "abcd", + "", + time.Now(), + ) + require.NoError(t, err) + _, err = moduleReader.GetModule(context.Background(), pin) + require.NoError(t, err) + assert.Equal(t, 1, moduleReader.stats.Count()) + assert.Equal(t, 0, moduleReader.stats.Hits()) + verifyCache(t, storageBucket, pin, moduleManifest, blobs) +} + +func TestCASModuleReaderDigestMismatch(t *testing.T) { + t.Parallel() + moduleManifest, blobs := createSampleManifestAndBlobs(t) + testModule, err := bufmodule.NewModuleForManifestAndBlobSet(context.Background(), moduleManifest, blobs) + require.NoError(t, err) + storageProvider := storageos.NewProvider() + storageBucket, err := storageProvider.NewReadWriteBucket(t.TempDir()) + require.NoError(t, err) + moduleReader := newCASModuleReader(storageBucket, &testModuleReader{module: testModule}, func(_ string) registryv1alpha1connect.RepositoryServiceClient { + return &testRepositoryServiceClient{} + }, zaptest.NewLogger(t), &testVerbosePrinter{t: t}) + pin, err := bufmoduleref.NewModulePin( + "buf.build", + "test", + "ping", + "", + "abcd", + "shake256:"+strings.Repeat("00", 64), // Digest which doesn't match module's digest + time.Now(), + ) + require.NoError(t, err) + _, err = moduleReader.GetModule(context.Background(), pin) + require.Error(t, err) + numFiles := 0 + err = storageBucket.Walk(context.Background(), "", func(info storage.ObjectInfo) error { + numFiles++ + return nil + }) + require.NoError(t, err) + assert.Equal(t, 0, numFiles) // Verify nothing written to cache on digest mismatch +} + +func verifyCache( + t *testing.T, + bucket storage.ReadWriteBucket, + pin bufmoduleref.ModulePin, + moduleManifest *manifest.Manifest, + blobs *manifest.BlobSet, +) { + t.Helper() + ctx := context.Background() + moduleCacheDir := normalpath.Join(pin.Remote(), pin.Owner(), pin.Repository()) + // {remote}/{owner}/{repo}/manifests/{..}/{....} => should return manifest contents + moduleBlob, err := moduleManifest.Blob() + require.NoError(t, err) + verifyBlobContents(t, bucket, normalpath.Join(moduleCacheDir, blobsDir), moduleBlob) + for _, path := range moduleManifest.Paths() { + protoDigest, found := moduleManifest.DigestFor(path) + require.True(t, found) + protoBlob, found := blobs.BlobFor(protoDigest.String()) + require.True(t, found) + // {remote}/{owner}/{repo}/blobs/{..}/{....} => should return proto blob contents + verifyBlobContents(t, bucket, normalpath.Join(moduleCacheDir, blobsDir), protoBlob) + } + f, err := bucket.Get(ctx, normalpath.Join(moduleCacheDir, commitsDir, pin.Commit())) + require.NoError(t, err) + defer f.Close() + commitContents, err := io.ReadAll(f) + require.NoError(t, err) + // {remote}/{owner}/{repo}/commits/{commit} => should return digest string format + assert.Equal(t, []byte(moduleBlob.Digest().String()), commitContents) +} + +func createSampleManifestAndBlobs(t *testing.T) (*manifest.Manifest, *manifest.BlobSet) { + t.Helper() + blob, err := manifest.NewMemoryBlobFromReader(strings.NewReader(pingProto)) + require.NoError(t, err) + var moduleManifest manifest.Manifest + err = moduleManifest.AddEntry("connect/ping/v1/ping.proto", *blob.Digest()) + require.NoError(t, err) + blobSet, err := manifest.NewBlobSet(context.Background(), []manifest.Blob{blob}) + require.NoError(t, err) + return &moduleManifest, blobSet +} + +func verifyBlobContents(t *testing.T, bucket storage.ReadWriteBucket, basedir string, blob manifest.Blob) { + t.Helper() + moduleHexDigest := blob.Digest().Hex() + r, err := blob.Open(context.Background()) + require.NoError(t, err) + var bb bytes.Buffer + _, err = io.Copy(&bb, r) + require.NoError(t, err) + f, err := bucket.Get(context.Background(), normalpath.Join(basedir, moduleHexDigest[:2], moduleHexDigest[2:])) + require.NoError(t, err) + defer f.Close() + cachedModule, err := io.ReadAll(f) + require.NoError(t, err) + assert.Equal(t, bb.Bytes(), cachedModule) +} + +type testModuleReader struct { + module bufmodule.Module +} + +var _ bufmodule.ModuleReader = (*testModuleReader)(nil) + +func (t *testModuleReader) GetModule(_ context.Context, _ bufmoduleref.ModulePin) (bufmodule.Module, error) { + return t.module, nil +} + +type testRepositoryServiceClient struct { + registryv1alpha1connect.UnimplementedRepositoryServiceHandler +} + +var _ registryv1alpha1connect.RepositoryServiceClient = (*testRepositoryServiceClient)(nil) + +func (t *testRepositoryServiceClient) GetRepositoryByFullName( + _ context.Context, + _ *connect.Request[registryv1alpha1.GetRepositoryByFullNameRequest], +) (*connect.Response[registryv1alpha1.GetRepositoryByFullNameResponse], error) { + return connect.NewResponse(®istryv1alpha1.GetRepositoryByFullNameResponse{ + Repository: ®istryv1alpha1.Repository{}, + }), nil +} + +type testVerbosePrinter struct { + t *testing.T +} + +var _ verbose.Printer = (*testVerbosePrinter)(nil) + +func (t testVerbosePrinter) Printf(format string, args ...interface{}) { + t.t.Logf(format, args...) +} diff --git a/private/bufpkg/bufmodule/bufmodulecache/module_cacher.go b/private/bufpkg/bufmodule/bufmodulecache/module_cacher.go index 14f808e2ba..59b841423c 100644 --- a/private/bufpkg/bufmodule/bufmodulecache/module_cacher.go +++ b/private/bufpkg/bufmodule/bufmodulecache/module_cacher.go @@ -25,6 +25,16 @@ import ( "go.uber.org/zap" ) +type moduleCache interface { + bufmodule.ModuleReader + // PutModule stores the module in the cache. + PutModule( + ctx context.Context, + modulePin bufmoduleref.ModulePin, + module bufmodule.Module, + ) error +} + type moduleCacher struct { logger *zap.Logger dataReadWriteBucket storage.ReadWriteBucket @@ -32,6 +42,8 @@ type moduleCacher struct { allowCacheExternalPaths bool } +var _ moduleCache = (*moduleCacher)(nil) + func newModuleCacher( logger *zap.Logger, dataReadWriteBucket storage.ReadWriteBucket, diff --git a/private/bufpkg/bufmodule/bufmodulecache/module_reader.go b/private/bufpkg/bufmodule/bufmodulecache/module_reader.go index c8d4b6ce9b..ac0bbdce18 100644 --- a/private/bufpkg/bufmodule/bufmodulecache/module_reader.go +++ b/private/bufpkg/bufmodule/bufmodulecache/module_reader.go @@ -16,16 +16,12 @@ package bufmodulecache import ( "context" - "fmt" - "sync" "github.com/bufbuild/buf/private/bufpkg/bufmodule" "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduleref" - registryv1alpha1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/registry/v1alpha1" "github.com/bufbuild/buf/private/pkg/filelock" "github.com/bufbuild/buf/private/pkg/storage" "github.com/bufbuild/buf/private/pkg/verbose" - "github.com/bufbuild/connect-go" "go.uber.org/multierr" "go.uber.org/zap" ) @@ -38,9 +34,7 @@ type moduleReader struct { delegate bufmodule.ModuleReader repositoryClientFactory RepositoryServiceClientFactory - count int - cacheHits int - lock sync.RWMutex + stats *cacheStats } func newModuleReader( @@ -69,6 +63,7 @@ func newModuleReader( ), delegate: delegate, repositoryClientFactory: repositoryClientFactory, + stats: &cacheStats{}, } } @@ -90,10 +85,7 @@ func (m *moduleReader) GetModule( "cache_hit", zap.String("module_pin", modulePin.String()), ) - m.lock.Lock() - m.count++ - m.cacheHits++ - m.lock.Unlock() + m.stats.MarkHit() return module, nil } if !storage.IsNotExist(err) { @@ -118,15 +110,13 @@ func (m *moduleReader) GetModule( "cache_hit", zap.String("module_pin", modulePin.String()), ) - m.lock.Lock() - m.count++ - m.cacheHits++ - m.lock.Unlock() + m.stats.MarkHit() return module, nil } if !storage.IsNotExist(err) { return nil, err } + m.stats.MarkMiss() // We now had a IsNotExist error within a write lock, so go to the delegate and then put. m.logger.Debug( @@ -145,40 +135,8 @@ func (m *moduleReader) GetModule( ); err != nil { return nil, err } - - repositoryService := m.repositoryClientFactory(modulePin.Remote()) - resp, err := repositoryService.GetRepositoryByFullName( - ctx, - connect.NewRequest(®istryv1alpha1.GetRepositoryByFullNameRequest{ - FullName: fmt.Sprintf("%s/%s", modulePin.Owner(), modulePin.Repository()), - }), - ) - if err != nil { + if err := warnIfDeprecated(ctx, m.repositoryClientFactory, modulePin, m.logger); err != nil { return nil, err } - repository := resp.Msg.Repository - if repository.Deprecated { - warnMsg := fmt.Sprintf(`Repository "%s" is deprecated`, modulePin.IdentityString()) - if repository.DeprecationMessage != "" { - warnMsg = fmt.Sprintf("%s: %s", warnMsg, repository.DeprecationMessage) - } - m.logger.Sugar().Warn(warnMsg) - } - - m.lock.Lock() - m.count++ - m.lock.Unlock() return module, nil } - -func (m *moduleReader) getCount() int { - m.lock.RLock() - defer m.lock.RUnlock() - return m.count -} - -func (m *moduleReader) getCacheHits() int { - m.lock.RLock() - defer m.lock.RUnlock() - return m.cacheHits -} diff --git a/private/bufpkg/bufmodule/bufmodulecache/util.go b/private/bufpkg/bufmodule/bufmodulecache/util.go index 69245bfe69..585b27140d 100644 --- a/private/bufpkg/bufmodule/bufmodulecache/util.go +++ b/private/bufpkg/bufmodule/bufmodulecache/util.go @@ -15,8 +15,14 @@ package bufmodulecache import ( + "context" + "fmt" + "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduleref" + "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/registry/v1alpha1" "github.com/bufbuild/buf/private/pkg/normalpath" + "github.com/bufbuild/connect-go" + "go.uber.org/zap" ) // newCacheKey returns the key associated with the given module pin. @@ -24,3 +30,32 @@ import ( func newCacheKey(modulePin bufmoduleref.ModulePin) string { return normalpath.Join(modulePin.Remote(), modulePin.Owner(), modulePin.Repository(), modulePin.Commit()) } + +// warnIfDeprecated emits a warning message to logger if the repository +// is deprecated on the BSR. +func warnIfDeprecated( + ctx context.Context, + clientFactory RepositoryServiceClientFactory, + modulePin bufmoduleref.ModulePin, + logger *zap.Logger, +) error { + repositoryService := clientFactory(modulePin.Remote()) + resp, err := repositoryService.GetRepositoryByFullName( + ctx, + connect.NewRequest(®istryv1alpha1.GetRepositoryByFullNameRequest{ + FullName: fmt.Sprintf("%s/%s", modulePin.Owner(), modulePin.Repository()), + }), + ) + if err != nil { + return err + } + repository := resp.Msg.Repository + if repository.Deprecated { + warnMsg := fmt.Sprintf(`Repository "%s" is deprecated`, modulePin.IdentityString()) + if repository.DeprecationMessage != "" { + warnMsg = fmt.Sprintf("%s: %s", warnMsg, repository.DeprecationMessage) + } + logger.Sugar().Warn(warnMsg) + } + return nil +} diff --git a/private/bufpkg/bufmodule/module.go b/private/bufpkg/bufmodule/module.go index 0be5af9be6..ccc5673666 100644 --- a/private/bufpkg/bufmodule/module.go +++ b/private/bufpkg/bufmodule/module.go @@ -25,6 +25,7 @@ import ( breakingv1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/breaking/v1" lintv1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/lint/v1" modulev1alpha1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/module/v1alpha1" + "github.com/bufbuild/buf/private/pkg/manifest" "github.com/bufbuild/buf/private/pkg/normalpath" "github.com/bufbuild/buf/private/pkg/storage" "github.com/bufbuild/buf/private/pkg/storage/storagemem" @@ -39,6 +40,8 @@ type module struct { license string breakingConfig *bufbreakingconfig.Config lintConfig *buflintconfig.Config + manifest *manifest.Manifest + blobSet *manifest.BlobSet } func newModuleForProto( @@ -171,6 +174,30 @@ func newModuleForBucket( ) } +func newModuleForManifestAndBlobSet( + ctx context.Context, + moduleManifest *manifest.Manifest, + blobSet *manifest.BlobSet, + options ...ModuleOption, +) (*module, error) { + bucket, err := manifest.NewBucket( + *moduleManifest, + *blobSet, + manifest.BucketWithAllManifestBlobsValidation(), + manifest.BucketWithNoExtraBlobsValidation(), + ) + if err != nil { + return nil, err + } + module, err := newModuleForBucket(ctx, bucket, options...) + if err != nil { + return nil, err + } + module.manifest = moduleManifest + module.blobSet = blobSet + return module, nil +} + // this should only be called by other newModule constructors func newModule( ctx context.Context, @@ -277,6 +304,14 @@ func (m *module) LintConfig() *buflintconfig.Config { return m.lintConfig } +func (m *module) Manifest() *manifest.Manifest { + return m.manifest +} + +func (m *module) BlobSet() *manifest.BlobSet { + return m.blobSet +} + func (m *module) getModuleIdentity() bufmoduleref.ModuleIdentity { return m.moduleIdentity } diff --git a/private/pkg/manifest/manifest.go b/private/pkg/manifest/manifest.go index 1767201474..561ade0448 100644 --- a/private/pkg/manifest/manifest.go +++ b/private/pkg/manifest/manifest.go @@ -70,18 +70,10 @@ type Manifest struct { var _ encoding.TextMarshaler = (*Manifest)(nil) var _ encoding.TextUnmarshaler = (*Manifest)(nil) -// New creates an empty manifest. -func New() *Manifest { - return &Manifest{ - pathToDigest: make(map[string]Digest), - digestToPaths: make(map[string][]string), - } -} - // NewFromReader builds a manifest from an encoded manifest, like one produced // by [Manifest.MarshalText]. func NewFromReader(manifest io.Reader) (*Manifest, error) { - m := New() + var m Manifest scanner := bufio.NewScanner(manifest) scanner.Split(splitManifest) lineno := 0 @@ -105,7 +97,7 @@ func NewFromReader(manifest io.Reader) (*Manifest, error) { } return nil, err } - return m, nil + return &m, nil } // AddEntry adds an entry to the manifest with a path and its digest. It fails @@ -126,8 +118,14 @@ func (m *Manifest) AddEntry(path string, digest Digest) error { digest.String(), path, existingDigest.String(), ) } + if m.pathToDigest == nil { + m.pathToDigest = make(map[string]Digest) + } m.pathToDigest[path] = digest key := digest.String() + if m.digestToPaths == nil { + m.digestToPaths = make(map[string][]string) + } m.digestToPaths[key] = append(m.digestToPaths[key], path) return nil } @@ -198,6 +196,11 @@ func (m *Manifest) Blob() (Blob, error) { return NewMemoryBlobFromReader(bytes.NewReader(manifestText)) } +// Empty returns true if the manifest has no entries. +func (m *Manifest) Empty() bool { + return len(m.pathToDigest) == 0 && len(m.digestToPaths) == 0 +} + func splitManifest(data []byte, atEOF bool) (int, []byte, error) { // Return a line without LF. if i := bytes.IndexByte(data, '\n'); i >= 0 { diff --git a/private/pkg/manifest/manifest_test.go b/private/pkg/manifest/manifest_test.go index fa1ca77415..f84cd5c51f 100644 --- a/private/pkg/manifest/manifest_test.go +++ b/private/pkg/manifest/manifest_test.go @@ -73,14 +73,14 @@ func TestRoundTripManifest(t *testing.T) { func TestEmptyManifest(t *testing.T) { t.Parallel() - content, err := manifest.New().MarshalText() + content, err := new(manifest.Manifest).MarshalText() assert.NoError(t, err) assert.Equal(t, 0, len(content)) } func TestAddEntry(t *testing.T) { t.Parallel() - m := manifest.New() + var m manifest.Manifest fileDigest := mustDigestShake256(t, nil) const filePath = "my/path" require.NoError(t, m.AddEntry(filePath, *fileDigest)) @@ -150,7 +150,7 @@ func TestUnmarshalBrokenManifest(t *testing.T) { func TestDigestPaths(t *testing.T) { t.Parallel() - m := manifest.New() + var m manifest.Manifest sharedDigest := mustDigestShake256(t, nil) err := m.AddEntry("path/one", *sharedDigest) require.NoError(t, err) @@ -166,7 +166,7 @@ func TestDigestPaths(t *testing.T) { func TestPathDigest(t *testing.T) { t.Parallel() - m := manifest.New() + var m manifest.Manifest digest := mustDigestShake256(t, nil) err := m.AddEntry("my/path", *digest) require.NoError(t, err) @@ -193,7 +193,7 @@ func testInvalidManifest( func TestAllPaths(t *testing.T) { t.Parallel() - m := manifest.New() + var m manifest.Manifest var addedPaths []string for i := 0; i < 20; i++ { path := fmt.Sprintf("path/to/file%0d", i) @@ -204,3 +204,28 @@ func TestAllPaths(t *testing.T) { assert.Equal(t, len(addedPaths), len(retPaths)) assert.ElementsMatch(t, addedPaths, retPaths) } + +func TestManifestZeroValue(t *testing.T) { + t.Parallel() + var m manifest.Manifest + blob, err := m.Blob() + require.NoError(t, err) + assert.NotEmpty(t, blob.Digest().Hex()) + assert.True(t, m.Empty()) + assert.Empty(t, m.Paths()) + paths, ok := m.PathsFor("anything") + assert.Empty(t, paths) + assert.False(t, ok) + digest, ok := m.DigestFor("anything") + assert.Nil(t, digest) + assert.False(t, ok) + digester, err := manifest.NewDigester(manifest.DigestTypeShake256) + require.NoError(t, err) + emptyDigest, err := digester.Digest(bytes.NewReader(nil)) + require.NoError(t, err) + require.NoError(t, m.AddEntry("a", *emptyDigest)) + digest, ok = m.DigestFor("a") + assert.True(t, ok) + assert.Equal(t, emptyDigest.Hex(), digest.Hex()) + assert.False(t, m.Empty()) +} diff --git a/private/pkg/manifest/module.go b/private/pkg/manifest/module.go index 8a06100f69..5db006e032 100644 --- a/private/pkg/manifest/module.go +++ b/private/pkg/manifest/module.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "io" + "os" "go.uber.org/multierr" ) @@ -78,10 +79,16 @@ func NewMemoryBlob(digest Digest, content []byte, opts ...MemoryBlobOption) (Blo } func (b *memoryBlob) Digest() *Digest { + if b == nil { + return nil + } return &b.digest } func (b *memoryBlob) Open(context.Context) (io.ReadCloser, error) { + if b == nil { + return nil, os.ErrNotExist + } return io.NopCloser(bytes.NewReader(b.content)), nil } diff --git a/private/pkg/manifest/storage.go b/private/pkg/manifest/storage.go index 7684590f73..7fc8067141 100644 --- a/private/pkg/manifest/storage.go +++ b/private/pkg/manifest/storage.go @@ -48,7 +48,7 @@ func NewFromBucket( ctx context.Context, bucket storage.ReadBucket, ) (*Manifest, *BlobSet, error) { - m := New() + var m Manifest digester, err := NewDigester(DigestTypeShake256) if err != nil { return nil, nil, err @@ -74,7 +74,7 @@ func NewFromBucket( if err != nil { return nil, nil, err } - return m, blobSet, nil + return &m, blobSet, nil } type bucketOptions struct { diff --git a/private/pkg/manifest/storage_test.go b/private/pkg/manifest/storage_test.go index db7978d140..beb9573bd2 100644 --- a/private/pkg/manifest/storage_test.go +++ b/private/pkg/manifest/storage_test.go @@ -70,7 +70,7 @@ func TestNewBucket(t *testing.T) { // same "mypkg" prefix for `Walk` test purposes "mypkglongername/v1/baz.proto": []byte("repeated proto content"), } - m := manifest.New() + var m manifest.Manifest var blobs []manifest.Blob digester, err := manifest.NewDigester(manifest.DigestTypeShake256) require.NoError(t, err) @@ -106,12 +106,12 @@ func TestNewBucket(t *testing.T) { require.NoError(t, err) _, err = manifest.NewBucket( - *m, *incompleteBlobSet, + m, *incompleteBlobSet, manifest.BucketWithAllManifestBlobsValidation(), ) assert.Error(t, err) - bucket, err := manifest.NewBucket(*m, *incompleteBlobSet) + bucket, err := manifest.NewBucket(m, *incompleteBlobSet) assert.NoError(t, err) assert.NotNil(t, bucket) var bucketFilesCount int @@ -134,7 +134,7 @@ func TestNewBucket(t *testing.T) { ) require.NoError(t, err) _, err = manifest.NewBucket( - *m, *tooLargeBlobSet, + m, *tooLargeBlobSet, manifest.BucketWithNoExtraBlobsValidation(), ) assert.Error(t, err) @@ -143,7 +143,7 @@ func TestNewBucket(t *testing.T) { t.Run("Valid", func(t *testing.T) { t.Parallel() bucket, err := manifest.NewBucket( - *m, *blobSet, + m, *blobSet, manifest.BucketWithAllManifestBlobsValidation(), manifest.BucketWithNoExtraBlobsValidation(), ) @@ -197,8 +197,8 @@ func TestNewBucket(t *testing.T) { func TestToBucketEmpty(t *testing.T) { t.Parallel() - m := manifest.New() - bucket, err := manifest.NewBucket(*m, manifest.BlobSet{}) + var m manifest.Manifest + bucket, err := manifest.NewBucket(m, manifest.BlobSet{}) require.NoError(t, err) // make sure there are no files in the bucket require.NoError(t, bucket.Walk(context.Background(), "", func(obj storage.ObjectInfo) error { diff --git a/private/pkg/storage/bucket.go b/private/pkg/storage/bucket.go index 96c8868d52..e2cb173818 100644 --- a/private/pkg/storage/bucket.go +++ b/private/pkg/storage/bucket.go @@ -56,6 +56,7 @@ type ReadBucket interface { type PutOptions struct { CustomChunkSize bool ChunkSize int64 // measured in bytes + Atomic bool } // PutOption are options passed when putting an object in a bucket. @@ -73,6 +74,20 @@ func PutWithChunkSize(sizeInBytes int64) PutOption { } } +// PutWithAtomic ensures that the Put fully writes the file before making it +// available to readers. This happens by default for some implementations, +// while others may need to perform a sequence of operations to ensure +// atomic writes. +// +// The Put operation is complete and the path will be readable once the +// returned WriteObjectCloser is written and closed (without an error). +// Any errors will cause the Put to be skipped (no path will be created). +func PutWithAtomic() PutOption { + return func(opts *PutOptions) { + opts.Atomic = true + } +} + // WriteBucket is a write-only bucket. type WriteBucket interface { // Put returns a WriteObjectCloser to write to the path. diff --git a/private/pkg/storage/storagemem/bucket.go b/private/pkg/storage/storagemem/bucket.go index 53c37bc267..8afeb0db5d 100644 --- a/private/pkg/storage/storagemem/bucket.go +++ b/private/pkg/storage/storagemem/bucket.go @@ -93,6 +93,7 @@ func (b *bucket) Put(ctx context.Context, path string, _ ...storage.PutOption) ( if err != nil { return nil, err } + // storagemem writes are already atomic - don't need special handling for PutWithAtomic. return newWriteObjectCloser(b, path), nil } diff --git a/private/pkg/storage/storageos/bucket.go b/private/pkg/storage/storageos/bucket.go index 212040dbba..c07d7446e8 100644 --- a/private/pkg/storage/storageos/bucket.go +++ b/private/pkg/storage/storageos/bucket.go @@ -25,6 +25,8 @@ import ( "github.com/bufbuild/buf/private/pkg/normalpath" "github.com/bufbuild/buf/private/pkg/storage" "github.com/bufbuild/buf/private/pkg/storage/storageutil" + "go.uber.org/atomic" + "go.uber.org/multierr" ) // errNotDir is the error returned if a path is not a directory. @@ -161,7 +163,11 @@ func (b *bucket) Walk( return nil } -func (b *bucket) Put(ctx context.Context, path string, _ ...storage.PutOption) (storage.WriteObjectCloser, error) { +func (b *bucket) Put(ctx context.Context, path string, opts ...storage.PutOption) (storage.WriteObjectCloser, error) { + var putOptions storage.PutOptions + for _, opt := range opts { + opt(&putOptions) + } externalPath, err := b.getExternalPath(path) if err != nil { return nil, err @@ -184,12 +190,20 @@ func (b *bucket) Put(ctx context.Context, path string, _ ...storage.PutOption) ( } else if !fileInfo.IsDir() { return nil, newErrNotDir(externalDir) } - file, err := os.Create(externalPath) + var file *os.File + var finalPath string + if putOptions.Atomic { + file, err = os.CreateTemp(externalDir, ".tmp"+filepath.Base(externalPath)+"*") + finalPath = externalPath + } else { + file, err = os.Create(externalPath) + } if err != nil { return nil, err } return newWriteObjectCloser( file, + finalPath, ), nil } @@ -343,18 +357,29 @@ func (r *readObjectCloser) Close() error { type writeObjectCloser struct { file *os.File + // path is set during atomic writes to the final path where the file should be created. + // If set, the file is a temp file that needs to be renamed to this path if Write/Close are successful. + path string + // writeErr contains the first non-nil error caught by a call to Write. + // This is returned in Close for atomic writes to prevent writing an incomplete file. + writeErr atomic.Error } func newWriteObjectCloser( file *os.File, + path string, ) *writeObjectCloser { return &writeObjectCloser{ file: file, + path: path, } } func (w *writeObjectCloser) Write(p []byte) (int, error) { n, err := w.file.Write(p) + if err != nil { + w.writeErr.CompareAndSwap(nil, err) + } return n, toStorageError(err) } @@ -364,6 +389,17 @@ func (w *writeObjectCloser) SetExternalPath(string) error { func (w *writeObjectCloser) Close() error { err := toStorageError(w.file.Close()) + // This is an atomic write operation - we need to rename to the final path + if w.path != "" { + atomicWriteErr := multierr.Append(w.writeErr.Load(), err) + // Failed during Write or Close - remove temporary file without rename + if atomicWriteErr != nil { + return toStorageError(multierr.Append(atomicWriteErr, os.Remove(w.file.Name()))) + } + if err := os.Rename(w.file.Name(), w.path); err != nil { + return toStorageError(multierr.Append(err, os.Remove(w.file.Name()))) + } + } return err } @@ -373,7 +409,7 @@ func newErrNotDir(path string) *normalpath.Error { } func toStorageError(err error) error { - if err == os.ErrClosed { + if errors.Is(err, os.ErrClosed) { return storage.ErrClosed } return err