diff --git a/.golangci.yml b/.golangci.yml index ec9ad62820..0e35999d1a 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -82,6 +82,12 @@ issues: - forbidigo # this is the one file we want to allow exec.Cmd functions in path: private/pkg/command/runner.go + - linters: + - forbidigo + # we use os.Rename here to rename files in the same directory + # This is safe (we aren't traversing filesystem boundaries). + path: private/pkg/storage/storageos/bucket.go + text: "os.Rename" - linters: - stylecheck text: "ST1005:" diff --git a/private/buf/bufcli/bufcli.go b/private/buf/bufcli/bufcli.go index a6659ded08..9b931337db 100644 --- a/private/buf/bufcli/bufcli.go +++ b/private/buf/bufcli/bufcli.go @@ -121,6 +121,7 @@ var ( v1CacheModuleDataRelDirPath, v1CacheModuleLockRelDirPath, v1CacheModuleSumRelDirPath, + v2CacheModuleRelDirPath, } // ErrNotATTY is returned when an input io.Reader is not a TTY where it is expected. @@ -154,6 +155,12 @@ var ( // These digests are used to make sure that the data written is actually what we expect, and if it is not, // we clear an entry from the cache, i.e. delete the relevant data directory. v1CacheModuleSumRelDirPath = normalpath.Join("v1", "module", "sum") + // v2CacheModuleRelDirPath is the relative path to the cache directory for content addressable storage. + // + // Normalized. + // This directory replaces the use of v1CacheModuleDataRelDirPath, v1CacheModuleLockRelDirPath, and + // v1CacheModuleSumRelDirPath for modules which support tamper proofing. + v2CacheModuleRelDirPath = normalpath.Join("v2", "module") // allVisibiltyStrings are the possible options that a user can set the visibility flag with. allVisibiltyStrings = []string{ @@ -546,62 +553,81 @@ func newModuleReaderAndCreateCacheDirs( clientConfig *connectclient.Config, cacheModuleReaderOpts ...bufmodulecache.ModuleReaderOption, ) (bufmodule.ModuleReader, error) { - cacheModuleDataDirPath := normalpath.Join(container.CacheDirPath(), v1CacheModuleDataRelDirPath) - cacheModuleLockDirPath := normalpath.Join(container.CacheDirPath(), v1CacheModuleLockRelDirPath) - cacheModuleSumDirPath := normalpath.Join(container.CacheDirPath(), v1CacheModuleSumRelDirPath) - if err := checkExistingCacheDirs( - container.CacheDirPath(), - container.CacheDirPath(), - cacheModuleDataDirPath, - cacheModuleLockDirPath, - cacheModuleSumDirPath, - ); err != nil { - return nil, err - } - if err := createCacheDirs( - cacheModuleDataDirPath, - cacheModuleLockDirPath, - cacheModuleSumDirPath, - ); err != nil { - return nil, err - } - storageosProvider := storageos.NewProvider(storageos.ProviderWithSymlinks()) - // do NOT want to enable symlinks for our cache - dataReadWriteBucket, err := storageosProvider.NewReadWriteBucket(cacheModuleDataDirPath) + cacheModuleDataDirPathV1 := normalpath.Join(container.CacheDirPath(), v1CacheModuleDataRelDirPath) + cacheModuleLockDirPathV1 := normalpath.Join(container.CacheDirPath(), v1CacheModuleLockRelDirPath) + cacheModuleSumDirPathV1 := normalpath.Join(container.CacheDirPath(), v1CacheModuleSumRelDirPath) + cacheModuleDirPathV2 := normalpath.Join(container.CacheDirPath(), v2CacheModuleRelDirPath) + // Check if tamper proofing env var is enabled + tamperProofingEnabled, err := IsBetaTamperProofingEnabled(container) if err != nil { return nil, err } - // do NOT want to enable symlinks for our cache - sumReadWriteBucket, err := storageosProvider.NewReadWriteBucket(cacheModuleSumDirPath) - if err != nil { - return nil, err + var cacheDirsToCreate []string + if tamperProofingEnabled { + cacheDirsToCreate = append(cacheDirsToCreate, cacheModuleDirPathV2) + } else { + cacheDirsToCreate = append( + cacheDirsToCreate, + cacheModuleDataDirPathV1, + cacheModuleLockDirPathV1, + cacheModuleSumDirPathV1, + ) } - fileLocker, err := filelock.NewLocker(cacheModuleLockDirPath) - if err != nil { + if err := checkExistingCacheDirs(container.CacheDirPath(), cacheDirsToCreate...); err != nil { return nil, err } - var moduleReaderOpts []bufapimodule.ModuleReaderOption - // Check if tamper proofing env var is enabled - tamperProofingEnabled, err := IsBetaTamperProofingEnabled(container) - if err != nil { + if err := createCacheDirs(cacheDirsToCreate...); err != nil { return nil, err } + var moduleReaderOpts []bufapimodule.ModuleReaderOption if tamperProofingEnabled { moduleReaderOpts = append(moduleReaderOpts, bufapimodule.WithTamperProofing()) } - moduleReader := bufmodulecache.NewModuleReader( - container.Logger(), - container.VerbosePrinter(), - fileLocker, - dataReadWriteBucket, - sumReadWriteBucket, - bufapimodule.NewModuleReader( - bufapimodule.NewDownloadServiceClientFactory(clientConfig), - moduleReaderOpts..., - ), - bufmodulecache.NewRepositoryServiceClientFactory(clientConfig), - cacheModuleReaderOpts..., + delegateReader := bufapimodule.NewModuleReader( + bufapimodule.NewDownloadServiceClientFactory(clientConfig), + moduleReaderOpts..., ) + repositoryClientFactory := bufmodulecache.NewRepositoryServiceClientFactory(clientConfig) + storageosProvider := storageos.NewProvider(storageos.ProviderWithSymlinks()) + var moduleReader bufmodule.ModuleReader + if tamperProofingEnabled { + casModuleBucket, err := storageosProvider.NewReadWriteBucket(cacheModuleDirPathV2) + if err != nil { + return nil, err + } + moduleReader = bufmodulecache.NewCASModuleReader( + container.Logger(), + container.VerbosePrinter(), + casModuleBucket, + delegateReader, + repositoryClientFactory, + ) + } else { + // do NOT want to enable symlinks for our cache + dataReadWriteBucket, err := storageosProvider.NewReadWriteBucket(cacheModuleDataDirPathV1) + if err != nil { + return nil, err + } + // do NOT want to enable symlinks for our cache + sumReadWriteBucket, err := storageosProvider.NewReadWriteBucket(cacheModuleSumDirPathV1) + if err != nil { + return nil, err + } + fileLocker, err := filelock.NewLocker(cacheModuleLockDirPathV1) + if err != nil { + return nil, err + } + moduleReader = bufmodulecache.NewModuleReader( + container.Logger(), + container.VerbosePrinter(), + fileLocker, + dataReadWriteBucket, + sumReadWriteBucket, + delegateReader, + repositoryClientFactory, + cacheModuleReaderOpts..., + ) + } return moduleReader, nil } @@ -1040,7 +1066,11 @@ func newFetchImageReader( } func checkExistingCacheDirs(baseCacheDirPath string, dirPaths ...string) error { - for _, dirPath := range dirPaths { + dirPathsToCheck := make([]string, 0, len(dirPaths)+1) + // Check base cache directory in addition to subdirectories + dirPathsToCheck = append(dirPathsToCheck, baseCacheDirPath) + dirPathsToCheck = append(dirPathsToCheck, dirPaths...) + for _, dirPath := range dirPathsToCheck { dirPath = normalpath.Unnormalize(dirPath) // OK to use os.Stat instead of os.LStat here as this is CLI-only fileInfo, err := os.Stat(dirPath) diff --git a/private/bufpkg/bufapimodule/module_reader.go b/private/bufpkg/bufapimodule/module_reader.go index bc0cf6fc1a..b9b0dac42f 100644 --- a/private/bufpkg/bufapimodule/module_reader.go +++ b/private/bufpkg/bufapimodule/module_reader.go @@ -67,15 +67,15 @@ func (m *moduleReader) GetModule(ctx context.Context, modulePin bufmoduleref.Mod return nil, errors.New("expected non-nil manifest with tamper proofing enabled") } // use manifest and blobs - bucket, err := bufmanifest.NewBucketFromManifestBlobs( - ctx, - resp.Manifest, - resp.Blobs, - ) + moduleManifest, err := bufmanifest.NewManifestFromProto(ctx, resp.Manifest) if err != nil { return nil, err } - return bufmodule.NewModuleForBucket(ctx, bucket, identityAndCommitOpt) + blobSet, err := bufmanifest.NewBlobSetFromProto(ctx, resp.Blobs) + if err != nil { + return nil, err + } + return bufmodule.NewModuleForManifestAndBlobSet(ctx, moduleManifest, blobSet) } resp, err := m.download(ctx, modulePin) if err != nil { diff --git a/private/bufpkg/bufmanifest/bucket.go b/private/bufpkg/bufmanifest/bucket.go index 3fa75dd545..3bdef97518 100644 --- a/private/bufpkg/bufmanifest/bucket.go +++ b/private/bufpkg/bufmanifest/bucket.go @@ -15,7 +15,6 @@ package bufmanifest import ( - "bytes" "context" "fmt" @@ -34,26 +33,13 @@ func NewBucketFromManifestBlobs( manifestBlob *modulev1alpha1.Blob, blobs []*modulev1alpha1.Blob, ) (storage.ReadBucket, error) { - if _, err := NewBlobFromProto(manifestBlob); err != nil { - return nil, fmt.Errorf("invalid manifest: %w", err) - } - parsedManifest, err := manifest.NewFromReader( - bytes.NewReader(manifestBlob.Content), - ) + parsedManifest, err := NewManifestFromProto(ctx, manifestBlob) if err != nil { - return nil, fmt.Errorf("parse manifest content: %w", err) - } - var memBlobs []manifest.Blob - for i, modBlob := range blobs { - memBlob, err := NewBlobFromProto(modBlob) - if err != nil { - return nil, fmt.Errorf("invalid blob at index %d: %w", i, err) - } - memBlobs = append(memBlobs, memBlob) + return nil, err } - blobSet, err := manifest.NewBlobSet(ctx, memBlobs) + blobSet, err := NewBlobSetFromProto(ctx, blobs) if err != nil { - return nil, fmt.Errorf("invalid blobs: %w", err) + return nil, err } manifestBucket, err := manifest.NewBucket( *parsedManifest, diff --git a/private/bufpkg/bufmanifest/mapper.go b/private/bufpkg/bufmanifest/mapper.go index a8325231fd..f8d3685ec2 100644 --- a/private/bufpkg/bufmanifest/mapper.go +++ b/private/bufpkg/bufmanifest/mapper.go @@ -71,6 +71,37 @@ func AsProtoBlob(ctx context.Context, b manifest.Blob) (_ *modulev1alpha1.Blob, }, nil } +// NewManifestFromProto returns a Manifest from a proto module blob. It makes sure the +// digest and content matches. +func NewManifestFromProto(ctx context.Context, b *modulev1alpha1.Blob) (_ *manifest.Manifest, retErr error) { + blob, err := NewBlobFromProto(b) + if err != nil { + return nil, fmt.Errorf("invalid manifest: %w", err) + } + r, err := blob.Open(ctx) + if err != nil { + return nil, err + } + defer func() { + retErr = multierr.Append(retErr, r.Close()) + }() + return manifest.NewFromReader(r) +} + +// NewBlobSetFromProto returns a BlobSet from a slice of proto module blobs. +// It makes sure the digest and content matches for each blob. +func NewBlobSetFromProto(ctx context.Context, blobs []*modulev1alpha1.Blob) (*manifest.BlobSet, error) { + var memBlobs []manifest.Blob + for i, modBlob := range blobs { + memBlob, err := NewBlobFromProto(modBlob) + if err != nil { + return nil, fmt.Errorf("invalid blob at index %d: %w", i, err) + } + memBlobs = append(memBlobs, memBlob) + } + return manifest.NewBlobSet(ctx, memBlobs) +} + // NewBlobFromProto returns a Blob from a proto module blob. It makes sure the // digest and content matches. func NewBlobFromProto(b *modulev1alpha1.Blob) (manifest.Blob, error) { diff --git a/private/bufpkg/bufmodule/bufmodule.go b/private/bufpkg/bufmodule/bufmodule.go index 5c93d9c70f..0a35a60bbb 100644 --- a/private/bufpkg/bufmodule/bufmodule.go +++ b/private/bufpkg/bufmodule/bufmodule.go @@ -28,6 +28,7 @@ import ( breakingv1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/breaking/v1" lintv1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/lint/v1" modulev1alpha1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/module/v1alpha1" + "github.com/bufbuild/buf/private/pkg/manifest" "github.com/bufbuild/buf/private/pkg/storage" "go.uber.org/multierr" ) @@ -110,6 +111,21 @@ type Module interface { // // This may be nil, since older versions of the module would not have this stored. LintConfig() *buflintconfig.Config + // Manifest returns the manifest for the module (possibly nil). + // A manifest's contents contain a lexicographically sorted list of path names along + // with each path's digest. The manifest also stores a digest of its own contents which + // allows verification of the entire Buf module. In addition to the .proto files in + // the module, it also lists the buf.yaml, LICENSE, buf.md, and buf.lock files (if + // present). + Manifest() *manifest.Manifest + // BlobSet returns the raw data for the module (possibly nil). + // Each blob in the blob set is indexed by the digest of the blob's contents. For + // example, the buf.yaml file will be listed in the Manifest with a given digest, + // whose contents can be retrieved by looking up the corresponding digest in the + // blob set. This allows API consumers to get access to the original file contents + // of every file in the module, which is useful for caching or recreating a module's + // original files. + BlobSet() *manifest.BlobSet getSourceReadBucket() storage.ReadBucket // Note this *can* be nil if we did not build from a named module. @@ -146,7 +162,7 @@ func ModuleWithModuleIdentityAndCommit(moduleIdentity bufmoduleref.ModuleIdentit } } -// NewModuleForBucket returns a new Module. It attempts reads dependencies +// NewModuleForBucket returns a new Module. It attempts to read dependencies // from a lock file in the read bucket. func NewModuleForBucket( ctx context.Context, @@ -165,6 +181,16 @@ func NewModuleForProto( return newModuleForProto(ctx, protoModule, options...) } +// NewModuleForManifestAndBlobSet returns a new Module given the manifest and blob set. +func NewModuleForManifestAndBlobSet( + ctx context.Context, + manifest *manifest.Manifest, + blobSet *manifest.BlobSet, + options ...ModuleOption, +) (Module, error) { + return newModuleForManifestAndBlobSet(ctx, manifest, blobSet, options...) +} + // ModuleWithTargetPaths returns a new Module that specifies specific file or directory paths to build. // // These paths must exist. @@ -237,7 +263,7 @@ func NewNopModuleResolver() ModuleResolver { type ModuleReader interface { // GetModule gets the Module for the ModulePin. // - // Returns an error that fufills storage.IsNotExist if the Module does not exist. + // Returns an error that fulfills storage.IsNotExist if the Module does not exist. GetModule(ctx context.Context, modulePin bufmoduleref.ModulePin) (Module, error) } diff --git a/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache.go b/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache.go index a28e367b95..a212b9d2b1 100644 --- a/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache.go +++ b/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache.go @@ -67,6 +67,24 @@ func ModuleReaderWithExternalPaths() ModuleReaderOption { } } +// NewCASModuleReader creates a new module reader using content addressable storage. +// This doesn't require file locking and enables support for tamper proofing. +func NewCASModuleReader( + logger *zap.Logger, + verbosePrinter verbose.Printer, + bucket storage.ReadWriteBucket, + delegate bufmodule.ModuleReader, + repositoryClientFactory RepositoryServiceClientFactory, +) bufmodule.ModuleReader { + return newCASModuleReader( + bucket, + delegate, + repositoryClientFactory, + logger, + verbosePrinter, + ) +} + type moduleReaderOptions struct { allowCacheExternalPaths bool } diff --git a/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache_test.go b/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache_test.go index 0e77a18e1f..8104d3d312 100644 --- a/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache_test.go +++ b/private/bufpkg/bufmodule/bufmodulecache/bufmodulecache_test.go @@ -127,8 +127,8 @@ func TestReaderBasic(t *testing.T) { getModule, err = moduleReader.GetModule(ctx, modulePin) require.NoError(t, err) testFile1HasNoExternalPath(t, ctx, getModule) - require.Equal(t, 2, moduleReader.getCount()) - require.Equal(t, 1, moduleReader.getCacheHits()) + require.Equal(t, 2, moduleReader.stats.Count()) + require.Equal(t, 1, moduleReader.stats.Hits()) // put some data that will not match the sum and make sure that we have a cache miss require.NoError(t, storage.PutPath(ctx, mainDataReadWriteBucket, normalpath.Join(newCacheKey(modulePin), "1234.proto"), []byte("foo"))) @@ -142,13 +142,13 @@ func TestReaderBasic(t *testing.T) { diff, err = storage.DiffBytes(ctx, runner, readBucket, filteredReadBucket) require.NoError(t, err) require.Empty(t, string(diff)) - require.Equal(t, 3, moduleReader.getCount()) - require.Equal(t, 1, moduleReader.getCacheHits()) + require.Equal(t, 3, moduleReader.stats.Count()) + require.Equal(t, 1, moduleReader.stats.Hits()) _, err = moduleReader.GetModule(ctx, modulePin) require.NoError(t, err) - require.Equal(t, 4, moduleReader.getCount()) - require.Equal(t, 2, moduleReader.getCacheHits()) + require.Equal(t, 4, moduleReader.stats.Count()) + require.Equal(t, 2, moduleReader.stats.Hits()) // overwrite the sum file and make sure that we have a cache miss require.NoError(t, storage.PutPath(ctx, mainSumReadWriteBucket, newCacheKey(modulePin), []byte("foo"))) @@ -162,13 +162,13 @@ func TestReaderBasic(t *testing.T) { diff, err = storage.DiffBytes(ctx, runner, readBucket, filteredReadBucket) require.NoError(t, err) require.Empty(t, string(diff)) - require.Equal(t, 5, moduleReader.getCount()) - require.Equal(t, 2, moduleReader.getCacheHits()) + require.Equal(t, 5, moduleReader.stats.Count()) + require.Equal(t, 2, moduleReader.stats.Hits()) _, err = moduleReader.GetModule(ctx, modulePin) require.NoError(t, err) - require.Equal(t, 6, moduleReader.getCount()) - require.Equal(t, 3, moduleReader.getCacheHits()) + require.Equal(t, 6, moduleReader.stats.Count()) + require.Equal(t, 3, moduleReader.stats.Hits()) // delete the sum file and make sure that we have a cache miss require.NoError(t, mainSumReadWriteBucket.Delete(ctx, newCacheKey(modulePin))) @@ -182,8 +182,8 @@ func TestReaderBasic(t *testing.T) { diff, err = storage.DiffBytes(ctx, runner, readBucket, filteredReadBucket) require.NoError(t, err) require.Empty(t, string(diff)) - require.Equal(t, 7, moduleReader.getCount()) - require.Equal(t, 3, moduleReader.getCacheHits()) + require.Equal(t, 7, moduleReader.stats.Count()) + require.Equal(t, 3, moduleReader.stats.Hits()) require.Equal(t, 4, observedLogs.Filter(func(entry observer.LoggedEntry) bool { return strings.Contains(entry.Message, deprecationMessage) }).Len()) diff --git a/private/bufpkg/bufmodule/bufmodulecache/cache_stats.go b/private/bufpkg/bufmodule/bufmodulecache/cache_stats.go new file mode 100644 index 0000000000..82ba23e636 --- /dev/null +++ b/private/bufpkg/bufmodule/bufmodulecache/cache_stats.go @@ -0,0 +1,48 @@ +// Copyright 2020-2023 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bufmodulecache + +import "sync" + +type cacheStats struct { + lock sync.RWMutex + count int + hits int +} + +func (s *cacheStats) MarkHit() { + s.lock.Lock() + defer s.lock.Unlock() + s.count++ + s.hits++ +} + +func (s *cacheStats) MarkMiss() { + s.lock.Lock() + defer s.lock.Unlock() + s.count++ +} + +func (s *cacheStats) Count() int { + s.lock.RLock() + defer s.lock.RUnlock() + return s.count +} + +func (s *cacheStats) Hits() int { + s.lock.RLock() + defer s.lock.RUnlock() + return s.hits +} diff --git a/private/bufpkg/bufmodule/bufmodulecache/cas_module_cacher.go b/private/bufpkg/bufmodule/bufmodulecache/cas_module_cacher.go new file mode 100644 index 0000000000..30e2cbb292 --- /dev/null +++ b/private/bufpkg/bufmodule/bufmodulecache/cas_module_cacher.go @@ -0,0 +1,275 @@ +// Copyright 2020-2023 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bufmodulecache + +import ( + "context" + "errors" + "fmt" + "io" + "strings" + + "github.com/bufbuild/buf/private/bufpkg/bufmodule" + "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduleref" + "github.com/bufbuild/buf/private/pkg/manifest" + "github.com/bufbuild/buf/private/pkg/normalpath" + "github.com/bufbuild/buf/private/pkg/storage" + "go.uber.org/multierr" + "go.uber.org/zap" +) + +// subdirectories under ~/.cache/buf/v2/{remote}/{owner}/{repo} +const ( + blobsDir = "blobs" + commitsDir = "commits" +) + +type casModuleCacher struct { + logger *zap.Logger + bucket storage.ReadWriteBucket +} + +var _ moduleCache = (*casModuleCacher)(nil) + +func (c *casModuleCacher) GetModule( + ctx context.Context, + modulePin bufmoduleref.ModulePin, +) (_ bufmodule.Module, retErr error) { + moduleBasedir := normalpath.Join(modulePin.Remote(), modulePin.Owner(), modulePin.Repository()) + manifestDigestStr := modulePin.Digest() + if manifestDigestStr == "" { + // Attempt to look up manifest digest from commit + commitPath := normalpath.Join(moduleBasedir, commitsDir, modulePin.Commit()) + manifestDigestBytes, err := c.loadPath(ctx, commitPath) + if err != nil { + return nil, err + } + manifestDigestStr = string(manifestDigestBytes) + } + manifestDigest, err := manifest.NewDigestFromString(manifestDigestStr) + if err != nil { + return nil, err + } + manifestFromCache, err := c.readManifest(ctx, moduleBasedir, manifestDigest) + if err != nil { + return nil, err + } + var blobs []manifest.Blob + blobDigests := make(map[string]struct{}) + for _, path := range manifestFromCache.Paths() { + digest, found := manifestFromCache.DigestFor(path) + if !found { + return nil, fmt.Errorf("digest not found for path: %s", path) + } + if _, ok := blobDigests[digest.String()]; ok { + // We've already loaded this blob + continue + } + blob, err := c.readBlob(ctx, moduleBasedir, digest) + if err != nil { + return nil, err + } + blobs = append(blobs, blob) + } + blobSet, err := manifest.NewBlobSet(ctx, blobs) + if err != nil { + return nil, err + } + return bufmodule.NewModuleForManifestAndBlobSet(ctx, manifestFromCache, blobSet) +} + +func (c *casModuleCacher) PutModule( + ctx context.Context, + modulePin bufmoduleref.ModulePin, + module bufmodule.Module, +) (retErr error) { + moduleManifest := module.Manifest() + if moduleManifest == nil { + return fmt.Errorf("manifest must be non-nil") + } + manifestBlob, err := moduleManifest.Blob() + if err != nil { + return err + } + digest := manifestBlob.Digest() + if digest == nil { + return errors.New("empty manifest digest") + } + if modulePinDigestEncoded := modulePin.Digest(); modulePinDigestEncoded != "" { + modulePinDigest, err := manifest.NewDigestFromString(modulePinDigestEncoded) + if err != nil { + return fmt.Errorf("invalid digest %q: %w", modulePinDigestEncoded, err) + } + if !digest.Equal(*modulePinDigest) { + return fmt.Errorf("manifest digest mismatch: expected=%q, found=%q", modulePinDigest.String(), digest.String()) + } + } + moduleBasedir := normalpath.Join(modulePin.Remote(), modulePin.Owner(), modulePin.Repository()) + // Write blobs + writtenDigests := make(map[string]struct{}) + for _, path := range moduleManifest.Paths() { + blobDigest, found := moduleManifest.DigestFor(path) + if !found { + return fmt.Errorf("failed to find digest for path=%q", path) + } + blobDigestStr := blobDigest.String() + if _, ok := writtenDigests[blobDigestStr]; ok { + continue + } + blob, found := module.BlobSet().BlobFor(blobDigestStr) + if !found { + return fmt.Errorf("blob not found for path=%q, digest=%q", path, blobDigestStr) + } + if err := c.writeBlob(ctx, moduleBasedir, blob); err != nil { + return err + } + writtenDigests[blobDigestStr] = struct{}{} + } + // Write manifest + if err := c.writeBlob(ctx, moduleBasedir, manifestBlob); err != nil { + return err + } + // Write commit + commitPath := normalpath.Join(moduleBasedir, commitsDir, modulePin.Commit()) + if err := c.atomicWrite(ctx, strings.NewReader(manifestBlob.Digest().String()), commitPath); err != nil { + return err + } + return nil +} + +func (c *casModuleCacher) readBlob( + ctx context.Context, + moduleBasedir string, + digest *manifest.Digest, +) (_ manifest.Blob, retErr error) { + hexDigest := digest.Hex() + blobPath := normalpath.Join(moduleBasedir, blobsDir, hexDigest[:2], hexDigest[2:]) + contents, err := c.loadPath(ctx, blobPath) + if err != nil { + return nil, err + } + blob, err := manifest.NewMemoryBlob(*digest, contents, manifest.MemoryBlobWithDigestValidation()) + if err != nil { + return nil, fmt.Errorf("failed to create blob from path %s: %w", blobPath, err) + } + return blob, nil +} + +func (c *casModuleCacher) validateBlob( + ctx context.Context, + moduleBasedir string, + digest *manifest.Digest, +) (bool, error) { + hexDigest := digest.Hex() + blobPath := normalpath.Join(moduleBasedir, blobsDir, hexDigest[:2], hexDigest[2:]) + f, err := c.bucket.Get(ctx, blobPath) + if err != nil { + return false, err + } + defer func() { + if err := f.Close(); err != nil { + c.logger.Debug("err closing blob", zap.Error(err)) + } + }() + digester, err := manifest.NewDigester(digest.Type()) + if err != nil { + return false, err + } + cacheDigest, err := digester.Digest(f) + if err != nil { + return false, err + } + return digest.Equal(*cacheDigest), nil +} + +func (c *casModuleCacher) readManifest( + ctx context.Context, + moduleBasedir string, + manifestDigest *manifest.Digest, +) (_ *manifest.Manifest, retErr error) { + blob, err := c.readBlob(ctx, moduleBasedir, manifestDigest) + if err != nil { + return nil, err + } + f, err := blob.Open(ctx) + if err != nil { + return nil, err + } + defer func() { + retErr = multierr.Append(retErr, f.Close()) + }() + moduleManifest, err := manifest.NewFromReader(f) + if err != nil { + return nil, fmt.Errorf("failed to read manifest %s: %w", manifestDigest.String(), err) + } + return moduleManifest, nil +} + +func (c *casModuleCacher) writeBlob( + ctx context.Context, + moduleBasedir string, + blob manifest.Blob, +) (retErr error) { + // Avoid unnecessary write if the blob is already written to disk + valid, err := c.validateBlob(ctx, moduleBasedir, blob.Digest()) + if err == nil && valid { + return nil + } + if !storage.IsNotExist(err) { + c.logger.Debug( + "repairing cache entry", + zap.String("basedir", moduleBasedir), + zap.String("digest", blob.Digest().String()), + ) + } + contents, err := blob.Open(ctx) + if err != nil { + return err + } + defer func() { + retErr = multierr.Append(retErr, contents.Close()) + }() + hexDigest := blob.Digest().Hex() + blobPath := normalpath.Join(moduleBasedir, blobsDir, hexDigest[:2], hexDigest[2:]) + return c.atomicWrite(ctx, contents, blobPath) +} + +func (c *casModuleCacher) atomicWrite(ctx context.Context, contents io.Reader, path string) (retErr error) { + f, err := c.bucket.Put(ctx, path, storage.PutWithAtomic()) + if err != nil { + return err + } + defer func() { + retErr = multierr.Append(retErr, f.Close()) + }() + if _, err := io.Copy(f, contents); err != nil { + return err + } + return nil +} + +func (c *casModuleCacher) loadPath( + ctx context.Context, + path string, +) (_ []byte, retErr error) { + f, err := c.bucket.Get(ctx, path) + if err != nil { + return nil, err + } + defer func() { + retErr = multierr.Append(retErr, f.Close()) + }() + return io.ReadAll(f) +} diff --git a/private/bufpkg/bufmodule/bufmodulecache/cas_module_reader.go b/private/bufpkg/bufmodule/bufmodulecache/cas_module_reader.go new file mode 100644 index 0000000000..87fdc8c5d3 --- /dev/null +++ b/private/bufpkg/bufmodule/bufmodulecache/cas_module_reader.go @@ -0,0 +1,109 @@ +// Copyright 2020-2023 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bufmodulecache + +import ( + "context" + "fmt" + + "github.com/bufbuild/buf/private/bufpkg/bufmodule" + "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduleref" + "github.com/bufbuild/buf/private/pkg/manifest" + "github.com/bufbuild/buf/private/pkg/storage" + "github.com/bufbuild/buf/private/pkg/verbose" + "go.uber.org/zap" +) + +type casModuleReader struct { + // required parameters + delegate bufmodule.ModuleReader + repositoryClientFactory RepositoryServiceClientFactory + logger *zap.Logger + verbosePrinter verbose.Printer + // initialized in newCASModuleReader + cache *casModuleCacher + stats *cacheStats +} + +var _ bufmodule.ModuleReader = (*casModuleReader)(nil) + +func newCASModuleReader( + bucket storage.ReadWriteBucket, + delegate bufmodule.ModuleReader, + repositoryClientFactory RepositoryServiceClientFactory, + logger *zap.Logger, + verbosePrinter verbose.Printer, +) *casModuleReader { + return &casModuleReader{ + delegate: delegate, + repositoryClientFactory: repositoryClientFactory, + logger: logger, + verbosePrinter: verbosePrinter, + cache: &casModuleCacher{ + logger: logger, + bucket: bucket, + }, + stats: &cacheStats{}, + } +} + +func (c *casModuleReader) GetModule( + ctx context.Context, + modulePin bufmoduleref.ModulePin, +) (bufmodule.Module, error) { + var modulePinDigest *manifest.Digest + if digest := modulePin.Digest(); digest != "" { + var err error + modulePinDigest, err = manifest.NewDigestFromString(digest) + // Fail fast if the buf.lock file contains a malformed digest + if err != nil { + return nil, fmt.Errorf("malformed module digest %q: %w", digest, err) + } + } + cachedModule, err := c.cache.GetModule(ctx, modulePin) + if err == nil { + c.stats.MarkHit() + return cachedModule, nil + } + c.logger.Debug("module cache miss", zap.Error(err)) + c.stats.MarkMiss() + remoteModule, err := c.delegate.GetModule(ctx, modulePin) + if err != nil { + return nil, err + } + // Manifest and BlobSet should always be set if tamper proofing is enabled. + // If not, the BSR doesn't support tamper proofing while the CLI feature is enabled. + if remoteModule.Manifest() == nil || remoteModule.BlobSet() == nil { + return nil, fmt.Errorf("required manifest/blobSet not set on module") + } + if modulePinDigest != nil { + manifestBlob, err := remoteModule.Manifest().Blob() + if err != nil { + return nil, err + } + manifestDigest := manifestBlob.Digest() + if !modulePinDigest.Equal(*manifestDigest) { + // buf.lock module digest and BSR module don't match - fail without overwriting cache + return nil, fmt.Errorf("module digest mismatch - expected: %q, found: %q", modulePinDigest, manifestDigest) + } + } + if err := c.cache.PutModule(ctx, modulePin, remoteModule); err != nil { + return nil, err + } + if err := warnIfDeprecated(ctx, c.repositoryClientFactory, modulePin, c.logger); err != nil { + return nil, err + } + return remoteModule, nil +} diff --git a/private/bufpkg/bufmodule/bufmodulecache/cas_module_reader_test.go b/private/bufpkg/bufmodule/bufmodulecache/cas_module_reader_test.go new file mode 100644 index 0000000000..9820d74f19 --- /dev/null +++ b/private/bufpkg/bufmodule/bufmodulecache/cas_module_reader_test.go @@ -0,0 +1,248 @@ +// Copyright 2020-2023 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bufmodulecache + +import ( + "bytes" + "context" + "io" + "strings" + "testing" + "time" + + "github.com/bufbuild/buf/private/bufpkg/bufmodule" + "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduleref" + "github.com/bufbuild/buf/private/gen/proto/connect/buf/alpha/registry/v1alpha1/registryv1alpha1connect" + registryv1alpha1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/registry/v1alpha1" + "github.com/bufbuild/buf/private/pkg/manifest" + "github.com/bufbuild/buf/private/pkg/normalpath" + "github.com/bufbuild/buf/private/pkg/storage" + "github.com/bufbuild/buf/private/pkg/storage/storageos" + "github.com/bufbuild/buf/private/pkg/verbose" + "github.com/bufbuild/connect-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +const pingProto = `syntax = "proto3"; + +package connect.ping.v1; + +message PingRequest { + int64 number = 1; + string text = 2; +} + +message PingResponse { + int64 number = 1; + string text = 2; +} + +service PingService { + rpc Ping(PingRequest) returns (PingResponse) {} +} +` + +func TestCASModuleReaderHappyPath(t *testing.T) { + t.Parallel() + moduleManifest, blobs := createSampleManifestAndBlobs(t) + moduleBlob, err := moduleManifest.Blob() + require.NoError(t, err) + testModule, err := bufmodule.NewModuleForManifestAndBlobSet(context.Background(), moduleManifest, blobs) + require.NoError(t, err) + storageProvider := storageos.NewProvider() + storageBucket, err := storageProvider.NewReadWriteBucket(t.TempDir()) + require.NoError(t, err) + + moduleReader := newCASModuleReader(storageBucket, &testModuleReader{module: testModule}, func(_ string) registryv1alpha1connect.RepositoryServiceClient { + return &testRepositoryServiceClient{} + }, zaptest.NewLogger(t), &testVerbosePrinter{t: t}) + pin, err := bufmoduleref.NewModulePin( + "buf.build", + "test", + "ping", + "", + "abcd", + moduleBlob.Digest().String(), + time.Now(), + ) + require.NoError(t, err) + _, err = moduleReader.GetModule(context.Background(), pin) + require.NoError(t, err) + assert.Equal(t, 1, moduleReader.stats.Count()) + assert.Equal(t, 0, moduleReader.stats.Hits()) + verifyCache(t, storageBucket, pin, moduleManifest, blobs) + + _, err = moduleReader.GetModule(context.Background(), pin) + require.NoError(t, err) + assert.Equal(t, 2, moduleReader.stats.Count()) + assert.Equal(t, 1, moduleReader.stats.Hits()) // We should have a cache hit the second time + verifyCache(t, storageBucket, pin, moduleManifest, blobs) +} + +func TestCASModuleReaderNoDigest(t *testing.T) { + t.Parallel() + moduleManifest, blobs := createSampleManifestAndBlobs(t) + testModule, err := bufmodule.NewModuleForManifestAndBlobSet(context.Background(), moduleManifest, blobs) + require.NoError(t, err) + storageProvider := storageos.NewProvider() + storageBucket, err := storageProvider.NewReadWriteBucket(t.TempDir()) + require.NoError(t, err) + moduleReader := newCASModuleReader(storageBucket, &testModuleReader{module: testModule}, func(_ string) registryv1alpha1connect.RepositoryServiceClient { + return &testRepositoryServiceClient{} + }, zaptest.NewLogger(t), &testVerbosePrinter{t: t}) + pin, err := bufmoduleref.NewModulePin( + "buf.build", + "test", + "ping", + "", + "abcd", + "", + time.Now(), + ) + require.NoError(t, err) + _, err = moduleReader.GetModule(context.Background(), pin) + require.NoError(t, err) + assert.Equal(t, 1, moduleReader.stats.Count()) + assert.Equal(t, 0, moduleReader.stats.Hits()) + verifyCache(t, storageBucket, pin, moduleManifest, blobs) +} + +func TestCASModuleReaderDigestMismatch(t *testing.T) { + t.Parallel() + moduleManifest, blobs := createSampleManifestAndBlobs(t) + testModule, err := bufmodule.NewModuleForManifestAndBlobSet(context.Background(), moduleManifest, blobs) + require.NoError(t, err) + storageProvider := storageos.NewProvider() + storageBucket, err := storageProvider.NewReadWriteBucket(t.TempDir()) + require.NoError(t, err) + moduleReader := newCASModuleReader(storageBucket, &testModuleReader{module: testModule}, func(_ string) registryv1alpha1connect.RepositoryServiceClient { + return &testRepositoryServiceClient{} + }, zaptest.NewLogger(t), &testVerbosePrinter{t: t}) + pin, err := bufmoduleref.NewModulePin( + "buf.build", + "test", + "ping", + "", + "abcd", + "shake256:"+strings.Repeat("00", 64), // Digest which doesn't match module's digest + time.Now(), + ) + require.NoError(t, err) + _, err = moduleReader.GetModule(context.Background(), pin) + require.Error(t, err) + numFiles := 0 + err = storageBucket.Walk(context.Background(), "", func(info storage.ObjectInfo) error { + numFiles++ + return nil + }) + require.NoError(t, err) + assert.Equal(t, 0, numFiles) // Verify nothing written to cache on digest mismatch +} + +func verifyCache( + t *testing.T, + bucket storage.ReadWriteBucket, + pin bufmoduleref.ModulePin, + moduleManifest *manifest.Manifest, + blobs *manifest.BlobSet, +) { + t.Helper() + ctx := context.Background() + moduleCacheDir := normalpath.Join(pin.Remote(), pin.Owner(), pin.Repository()) + // {remote}/{owner}/{repo}/manifests/{..}/{....} => should return manifest contents + moduleBlob, err := moduleManifest.Blob() + require.NoError(t, err) + verifyBlobContents(t, bucket, normalpath.Join(moduleCacheDir, blobsDir), moduleBlob) + for _, path := range moduleManifest.Paths() { + protoDigest, found := moduleManifest.DigestFor(path) + require.True(t, found) + protoBlob, found := blobs.BlobFor(protoDigest.String()) + require.True(t, found) + // {remote}/{owner}/{repo}/blobs/{..}/{....} => should return proto blob contents + verifyBlobContents(t, bucket, normalpath.Join(moduleCacheDir, blobsDir), protoBlob) + } + f, err := bucket.Get(ctx, normalpath.Join(moduleCacheDir, commitsDir, pin.Commit())) + require.NoError(t, err) + defer f.Close() + commitContents, err := io.ReadAll(f) + require.NoError(t, err) + // {remote}/{owner}/{repo}/commits/{commit} => should return digest string format + assert.Equal(t, []byte(moduleBlob.Digest().String()), commitContents) +} + +func createSampleManifestAndBlobs(t *testing.T) (*manifest.Manifest, *manifest.BlobSet) { + t.Helper() + blob, err := manifest.NewMemoryBlobFromReader(strings.NewReader(pingProto)) + require.NoError(t, err) + var moduleManifest manifest.Manifest + err = moduleManifest.AddEntry("connect/ping/v1/ping.proto", *blob.Digest()) + require.NoError(t, err) + blobSet, err := manifest.NewBlobSet(context.Background(), []manifest.Blob{blob}) + require.NoError(t, err) + return &moduleManifest, blobSet +} + +func verifyBlobContents(t *testing.T, bucket storage.ReadWriteBucket, basedir string, blob manifest.Blob) { + t.Helper() + moduleHexDigest := blob.Digest().Hex() + r, err := blob.Open(context.Background()) + require.NoError(t, err) + var bb bytes.Buffer + _, err = io.Copy(&bb, r) + require.NoError(t, err) + f, err := bucket.Get(context.Background(), normalpath.Join(basedir, moduleHexDigest[:2], moduleHexDigest[2:])) + require.NoError(t, err) + defer f.Close() + cachedModule, err := io.ReadAll(f) + require.NoError(t, err) + assert.Equal(t, bb.Bytes(), cachedModule) +} + +type testModuleReader struct { + module bufmodule.Module +} + +var _ bufmodule.ModuleReader = (*testModuleReader)(nil) + +func (t *testModuleReader) GetModule(_ context.Context, _ bufmoduleref.ModulePin) (bufmodule.Module, error) { + return t.module, nil +} + +type testRepositoryServiceClient struct { + registryv1alpha1connect.UnimplementedRepositoryServiceHandler +} + +var _ registryv1alpha1connect.RepositoryServiceClient = (*testRepositoryServiceClient)(nil) + +func (t *testRepositoryServiceClient) GetRepositoryByFullName( + _ context.Context, + _ *connect.Request[registryv1alpha1.GetRepositoryByFullNameRequest], +) (*connect.Response[registryv1alpha1.GetRepositoryByFullNameResponse], error) { + return connect.NewResponse(®istryv1alpha1.GetRepositoryByFullNameResponse{ + Repository: ®istryv1alpha1.Repository{}, + }), nil +} + +type testVerbosePrinter struct { + t *testing.T +} + +var _ verbose.Printer = (*testVerbosePrinter)(nil) + +func (t testVerbosePrinter) Printf(format string, args ...interface{}) { + t.t.Logf(format, args...) +} diff --git a/private/bufpkg/bufmodule/bufmodulecache/module_cacher.go b/private/bufpkg/bufmodule/bufmodulecache/module_cacher.go index 14f808e2ba..59b841423c 100644 --- a/private/bufpkg/bufmodule/bufmodulecache/module_cacher.go +++ b/private/bufpkg/bufmodule/bufmodulecache/module_cacher.go @@ -25,6 +25,16 @@ import ( "go.uber.org/zap" ) +type moduleCache interface { + bufmodule.ModuleReader + // PutModule stores the module in the cache. + PutModule( + ctx context.Context, + modulePin bufmoduleref.ModulePin, + module bufmodule.Module, + ) error +} + type moduleCacher struct { logger *zap.Logger dataReadWriteBucket storage.ReadWriteBucket @@ -32,6 +42,8 @@ type moduleCacher struct { allowCacheExternalPaths bool } +var _ moduleCache = (*moduleCacher)(nil) + func newModuleCacher( logger *zap.Logger, dataReadWriteBucket storage.ReadWriteBucket, diff --git a/private/bufpkg/bufmodule/bufmodulecache/module_reader.go b/private/bufpkg/bufmodule/bufmodulecache/module_reader.go index c8d4b6ce9b..ac0bbdce18 100644 --- a/private/bufpkg/bufmodule/bufmodulecache/module_reader.go +++ b/private/bufpkg/bufmodule/bufmodulecache/module_reader.go @@ -16,16 +16,12 @@ package bufmodulecache import ( "context" - "fmt" - "sync" "github.com/bufbuild/buf/private/bufpkg/bufmodule" "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduleref" - registryv1alpha1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/registry/v1alpha1" "github.com/bufbuild/buf/private/pkg/filelock" "github.com/bufbuild/buf/private/pkg/storage" "github.com/bufbuild/buf/private/pkg/verbose" - "github.com/bufbuild/connect-go" "go.uber.org/multierr" "go.uber.org/zap" ) @@ -38,9 +34,7 @@ type moduleReader struct { delegate bufmodule.ModuleReader repositoryClientFactory RepositoryServiceClientFactory - count int - cacheHits int - lock sync.RWMutex + stats *cacheStats } func newModuleReader( @@ -69,6 +63,7 @@ func newModuleReader( ), delegate: delegate, repositoryClientFactory: repositoryClientFactory, + stats: &cacheStats{}, } } @@ -90,10 +85,7 @@ func (m *moduleReader) GetModule( "cache_hit", zap.String("module_pin", modulePin.String()), ) - m.lock.Lock() - m.count++ - m.cacheHits++ - m.lock.Unlock() + m.stats.MarkHit() return module, nil } if !storage.IsNotExist(err) { @@ -118,15 +110,13 @@ func (m *moduleReader) GetModule( "cache_hit", zap.String("module_pin", modulePin.String()), ) - m.lock.Lock() - m.count++ - m.cacheHits++ - m.lock.Unlock() + m.stats.MarkHit() return module, nil } if !storage.IsNotExist(err) { return nil, err } + m.stats.MarkMiss() // We now had a IsNotExist error within a write lock, so go to the delegate and then put. m.logger.Debug( @@ -145,40 +135,8 @@ func (m *moduleReader) GetModule( ); err != nil { return nil, err } - - repositoryService := m.repositoryClientFactory(modulePin.Remote()) - resp, err := repositoryService.GetRepositoryByFullName( - ctx, - connect.NewRequest(®istryv1alpha1.GetRepositoryByFullNameRequest{ - FullName: fmt.Sprintf("%s/%s", modulePin.Owner(), modulePin.Repository()), - }), - ) - if err != nil { + if err := warnIfDeprecated(ctx, m.repositoryClientFactory, modulePin, m.logger); err != nil { return nil, err } - repository := resp.Msg.Repository - if repository.Deprecated { - warnMsg := fmt.Sprintf(`Repository "%s" is deprecated`, modulePin.IdentityString()) - if repository.DeprecationMessage != "" { - warnMsg = fmt.Sprintf("%s: %s", warnMsg, repository.DeprecationMessage) - } - m.logger.Sugar().Warn(warnMsg) - } - - m.lock.Lock() - m.count++ - m.lock.Unlock() return module, nil } - -func (m *moduleReader) getCount() int { - m.lock.RLock() - defer m.lock.RUnlock() - return m.count -} - -func (m *moduleReader) getCacheHits() int { - m.lock.RLock() - defer m.lock.RUnlock() - return m.cacheHits -} diff --git a/private/bufpkg/bufmodule/bufmodulecache/util.go b/private/bufpkg/bufmodule/bufmodulecache/util.go index 69245bfe69..585b27140d 100644 --- a/private/bufpkg/bufmodule/bufmodulecache/util.go +++ b/private/bufpkg/bufmodule/bufmodulecache/util.go @@ -15,8 +15,14 @@ package bufmodulecache import ( + "context" + "fmt" + "github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduleref" + "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/registry/v1alpha1" "github.com/bufbuild/buf/private/pkg/normalpath" + "github.com/bufbuild/connect-go" + "go.uber.org/zap" ) // newCacheKey returns the key associated with the given module pin. @@ -24,3 +30,32 @@ import ( func newCacheKey(modulePin bufmoduleref.ModulePin) string { return normalpath.Join(modulePin.Remote(), modulePin.Owner(), modulePin.Repository(), modulePin.Commit()) } + +// warnIfDeprecated emits a warning message to logger if the repository +// is deprecated on the BSR. +func warnIfDeprecated( + ctx context.Context, + clientFactory RepositoryServiceClientFactory, + modulePin bufmoduleref.ModulePin, + logger *zap.Logger, +) error { + repositoryService := clientFactory(modulePin.Remote()) + resp, err := repositoryService.GetRepositoryByFullName( + ctx, + connect.NewRequest(®istryv1alpha1.GetRepositoryByFullNameRequest{ + FullName: fmt.Sprintf("%s/%s", modulePin.Owner(), modulePin.Repository()), + }), + ) + if err != nil { + return err + } + repository := resp.Msg.Repository + if repository.Deprecated { + warnMsg := fmt.Sprintf(`Repository "%s" is deprecated`, modulePin.IdentityString()) + if repository.DeprecationMessage != "" { + warnMsg = fmt.Sprintf("%s: %s", warnMsg, repository.DeprecationMessage) + } + logger.Sugar().Warn(warnMsg) + } + return nil +} diff --git a/private/bufpkg/bufmodule/module.go b/private/bufpkg/bufmodule/module.go index 0be5af9be6..ccc5673666 100644 --- a/private/bufpkg/bufmodule/module.go +++ b/private/bufpkg/bufmodule/module.go @@ -25,6 +25,7 @@ import ( breakingv1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/breaking/v1" lintv1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/lint/v1" modulev1alpha1 "github.com/bufbuild/buf/private/gen/proto/go/buf/alpha/module/v1alpha1" + "github.com/bufbuild/buf/private/pkg/manifest" "github.com/bufbuild/buf/private/pkg/normalpath" "github.com/bufbuild/buf/private/pkg/storage" "github.com/bufbuild/buf/private/pkg/storage/storagemem" @@ -39,6 +40,8 @@ type module struct { license string breakingConfig *bufbreakingconfig.Config lintConfig *buflintconfig.Config + manifest *manifest.Manifest + blobSet *manifest.BlobSet } func newModuleForProto( @@ -171,6 +174,30 @@ func newModuleForBucket( ) } +func newModuleForManifestAndBlobSet( + ctx context.Context, + moduleManifest *manifest.Manifest, + blobSet *manifest.BlobSet, + options ...ModuleOption, +) (*module, error) { + bucket, err := manifest.NewBucket( + *moduleManifest, + *blobSet, + manifest.BucketWithAllManifestBlobsValidation(), + manifest.BucketWithNoExtraBlobsValidation(), + ) + if err != nil { + return nil, err + } + module, err := newModuleForBucket(ctx, bucket, options...) + if err != nil { + return nil, err + } + module.manifest = moduleManifest + module.blobSet = blobSet + return module, nil +} + // this should only be called by other newModule constructors func newModule( ctx context.Context, @@ -277,6 +304,14 @@ func (m *module) LintConfig() *buflintconfig.Config { return m.lintConfig } +func (m *module) Manifest() *manifest.Manifest { + return m.manifest +} + +func (m *module) BlobSet() *manifest.BlobSet { + return m.blobSet +} + func (m *module) getModuleIdentity() bufmoduleref.ModuleIdentity { return m.moduleIdentity } diff --git a/private/pkg/manifest/manifest.go b/private/pkg/manifest/manifest.go index 1767201474..561ade0448 100644 --- a/private/pkg/manifest/manifest.go +++ b/private/pkg/manifest/manifest.go @@ -70,18 +70,10 @@ type Manifest struct { var _ encoding.TextMarshaler = (*Manifest)(nil) var _ encoding.TextUnmarshaler = (*Manifest)(nil) -// New creates an empty manifest. -func New() *Manifest { - return &Manifest{ - pathToDigest: make(map[string]Digest), - digestToPaths: make(map[string][]string), - } -} - // NewFromReader builds a manifest from an encoded manifest, like one produced // by [Manifest.MarshalText]. func NewFromReader(manifest io.Reader) (*Manifest, error) { - m := New() + var m Manifest scanner := bufio.NewScanner(manifest) scanner.Split(splitManifest) lineno := 0 @@ -105,7 +97,7 @@ func NewFromReader(manifest io.Reader) (*Manifest, error) { } return nil, err } - return m, nil + return &m, nil } // AddEntry adds an entry to the manifest with a path and its digest. It fails @@ -126,8 +118,14 @@ func (m *Manifest) AddEntry(path string, digest Digest) error { digest.String(), path, existingDigest.String(), ) } + if m.pathToDigest == nil { + m.pathToDigest = make(map[string]Digest) + } m.pathToDigest[path] = digest key := digest.String() + if m.digestToPaths == nil { + m.digestToPaths = make(map[string][]string) + } m.digestToPaths[key] = append(m.digestToPaths[key], path) return nil } @@ -198,6 +196,11 @@ func (m *Manifest) Blob() (Blob, error) { return NewMemoryBlobFromReader(bytes.NewReader(manifestText)) } +// Empty returns true if the manifest has no entries. +func (m *Manifest) Empty() bool { + return len(m.pathToDigest) == 0 && len(m.digestToPaths) == 0 +} + func splitManifest(data []byte, atEOF bool) (int, []byte, error) { // Return a line without LF. if i := bytes.IndexByte(data, '\n'); i >= 0 { diff --git a/private/pkg/manifest/manifest_test.go b/private/pkg/manifest/manifest_test.go index fa1ca77415..f84cd5c51f 100644 --- a/private/pkg/manifest/manifest_test.go +++ b/private/pkg/manifest/manifest_test.go @@ -73,14 +73,14 @@ func TestRoundTripManifest(t *testing.T) { func TestEmptyManifest(t *testing.T) { t.Parallel() - content, err := manifest.New().MarshalText() + content, err := new(manifest.Manifest).MarshalText() assert.NoError(t, err) assert.Equal(t, 0, len(content)) } func TestAddEntry(t *testing.T) { t.Parallel() - m := manifest.New() + var m manifest.Manifest fileDigest := mustDigestShake256(t, nil) const filePath = "my/path" require.NoError(t, m.AddEntry(filePath, *fileDigest)) @@ -150,7 +150,7 @@ func TestUnmarshalBrokenManifest(t *testing.T) { func TestDigestPaths(t *testing.T) { t.Parallel() - m := manifest.New() + var m manifest.Manifest sharedDigest := mustDigestShake256(t, nil) err := m.AddEntry("path/one", *sharedDigest) require.NoError(t, err) @@ -166,7 +166,7 @@ func TestDigestPaths(t *testing.T) { func TestPathDigest(t *testing.T) { t.Parallel() - m := manifest.New() + var m manifest.Manifest digest := mustDigestShake256(t, nil) err := m.AddEntry("my/path", *digest) require.NoError(t, err) @@ -193,7 +193,7 @@ func testInvalidManifest( func TestAllPaths(t *testing.T) { t.Parallel() - m := manifest.New() + var m manifest.Manifest var addedPaths []string for i := 0; i < 20; i++ { path := fmt.Sprintf("path/to/file%0d", i) @@ -204,3 +204,28 @@ func TestAllPaths(t *testing.T) { assert.Equal(t, len(addedPaths), len(retPaths)) assert.ElementsMatch(t, addedPaths, retPaths) } + +func TestManifestZeroValue(t *testing.T) { + t.Parallel() + var m manifest.Manifest + blob, err := m.Blob() + require.NoError(t, err) + assert.NotEmpty(t, blob.Digest().Hex()) + assert.True(t, m.Empty()) + assert.Empty(t, m.Paths()) + paths, ok := m.PathsFor("anything") + assert.Empty(t, paths) + assert.False(t, ok) + digest, ok := m.DigestFor("anything") + assert.Nil(t, digest) + assert.False(t, ok) + digester, err := manifest.NewDigester(manifest.DigestTypeShake256) + require.NoError(t, err) + emptyDigest, err := digester.Digest(bytes.NewReader(nil)) + require.NoError(t, err) + require.NoError(t, m.AddEntry("a", *emptyDigest)) + digest, ok = m.DigestFor("a") + assert.True(t, ok) + assert.Equal(t, emptyDigest.Hex(), digest.Hex()) + assert.False(t, m.Empty()) +} diff --git a/private/pkg/manifest/module.go b/private/pkg/manifest/module.go index 8a06100f69..5db006e032 100644 --- a/private/pkg/manifest/module.go +++ b/private/pkg/manifest/module.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "io" + "os" "go.uber.org/multierr" ) @@ -78,10 +79,16 @@ func NewMemoryBlob(digest Digest, content []byte, opts ...MemoryBlobOption) (Blo } func (b *memoryBlob) Digest() *Digest { + if b == nil { + return nil + } return &b.digest } func (b *memoryBlob) Open(context.Context) (io.ReadCloser, error) { + if b == nil { + return nil, os.ErrNotExist + } return io.NopCloser(bytes.NewReader(b.content)), nil } diff --git a/private/pkg/manifest/storage.go b/private/pkg/manifest/storage.go index 7684590f73..7fc8067141 100644 --- a/private/pkg/manifest/storage.go +++ b/private/pkg/manifest/storage.go @@ -48,7 +48,7 @@ func NewFromBucket( ctx context.Context, bucket storage.ReadBucket, ) (*Manifest, *BlobSet, error) { - m := New() + var m Manifest digester, err := NewDigester(DigestTypeShake256) if err != nil { return nil, nil, err @@ -74,7 +74,7 @@ func NewFromBucket( if err != nil { return nil, nil, err } - return m, blobSet, nil + return &m, blobSet, nil } type bucketOptions struct { diff --git a/private/pkg/manifest/storage_test.go b/private/pkg/manifest/storage_test.go index db7978d140..beb9573bd2 100644 --- a/private/pkg/manifest/storage_test.go +++ b/private/pkg/manifest/storage_test.go @@ -70,7 +70,7 @@ func TestNewBucket(t *testing.T) { // same "mypkg" prefix for `Walk` test purposes "mypkglongername/v1/baz.proto": []byte("repeated proto content"), } - m := manifest.New() + var m manifest.Manifest var blobs []manifest.Blob digester, err := manifest.NewDigester(manifest.DigestTypeShake256) require.NoError(t, err) @@ -106,12 +106,12 @@ func TestNewBucket(t *testing.T) { require.NoError(t, err) _, err = manifest.NewBucket( - *m, *incompleteBlobSet, + m, *incompleteBlobSet, manifest.BucketWithAllManifestBlobsValidation(), ) assert.Error(t, err) - bucket, err := manifest.NewBucket(*m, *incompleteBlobSet) + bucket, err := manifest.NewBucket(m, *incompleteBlobSet) assert.NoError(t, err) assert.NotNil(t, bucket) var bucketFilesCount int @@ -134,7 +134,7 @@ func TestNewBucket(t *testing.T) { ) require.NoError(t, err) _, err = manifest.NewBucket( - *m, *tooLargeBlobSet, + m, *tooLargeBlobSet, manifest.BucketWithNoExtraBlobsValidation(), ) assert.Error(t, err) @@ -143,7 +143,7 @@ func TestNewBucket(t *testing.T) { t.Run("Valid", func(t *testing.T) { t.Parallel() bucket, err := manifest.NewBucket( - *m, *blobSet, + m, *blobSet, manifest.BucketWithAllManifestBlobsValidation(), manifest.BucketWithNoExtraBlobsValidation(), ) @@ -197,8 +197,8 @@ func TestNewBucket(t *testing.T) { func TestToBucketEmpty(t *testing.T) { t.Parallel() - m := manifest.New() - bucket, err := manifest.NewBucket(*m, manifest.BlobSet{}) + var m manifest.Manifest + bucket, err := manifest.NewBucket(m, manifest.BlobSet{}) require.NoError(t, err) // make sure there are no files in the bucket require.NoError(t, bucket.Walk(context.Background(), "", func(obj storage.ObjectInfo) error { diff --git a/private/pkg/storage/bucket.go b/private/pkg/storage/bucket.go index 96c8868d52..e2cb173818 100644 --- a/private/pkg/storage/bucket.go +++ b/private/pkg/storage/bucket.go @@ -56,6 +56,7 @@ type ReadBucket interface { type PutOptions struct { CustomChunkSize bool ChunkSize int64 // measured in bytes + Atomic bool } // PutOption are options passed when putting an object in a bucket. @@ -73,6 +74,20 @@ func PutWithChunkSize(sizeInBytes int64) PutOption { } } +// PutWithAtomic ensures that the Put fully writes the file before making it +// available to readers. This happens by default for some implementations, +// while others may need to perform a sequence of operations to ensure +// atomic writes. +// +// The Put operation is complete and the path will be readable once the +// returned WriteObjectCloser is written and closed (without an error). +// Any errors will cause the Put to be skipped (no path will be created). +func PutWithAtomic() PutOption { + return func(opts *PutOptions) { + opts.Atomic = true + } +} + // WriteBucket is a write-only bucket. type WriteBucket interface { // Put returns a WriteObjectCloser to write to the path. diff --git a/private/pkg/storage/storagemem/bucket.go b/private/pkg/storage/storagemem/bucket.go index 53c37bc267..8afeb0db5d 100644 --- a/private/pkg/storage/storagemem/bucket.go +++ b/private/pkg/storage/storagemem/bucket.go @@ -93,6 +93,7 @@ func (b *bucket) Put(ctx context.Context, path string, _ ...storage.PutOption) ( if err != nil { return nil, err } + // storagemem writes are already atomic - don't need special handling for PutWithAtomic. return newWriteObjectCloser(b, path), nil } diff --git a/private/pkg/storage/storageos/bucket.go b/private/pkg/storage/storageos/bucket.go index 212040dbba..c07d7446e8 100644 --- a/private/pkg/storage/storageos/bucket.go +++ b/private/pkg/storage/storageos/bucket.go @@ -25,6 +25,8 @@ import ( "github.com/bufbuild/buf/private/pkg/normalpath" "github.com/bufbuild/buf/private/pkg/storage" "github.com/bufbuild/buf/private/pkg/storage/storageutil" + "go.uber.org/atomic" + "go.uber.org/multierr" ) // errNotDir is the error returned if a path is not a directory. @@ -161,7 +163,11 @@ func (b *bucket) Walk( return nil } -func (b *bucket) Put(ctx context.Context, path string, _ ...storage.PutOption) (storage.WriteObjectCloser, error) { +func (b *bucket) Put(ctx context.Context, path string, opts ...storage.PutOption) (storage.WriteObjectCloser, error) { + var putOptions storage.PutOptions + for _, opt := range opts { + opt(&putOptions) + } externalPath, err := b.getExternalPath(path) if err != nil { return nil, err @@ -184,12 +190,20 @@ func (b *bucket) Put(ctx context.Context, path string, _ ...storage.PutOption) ( } else if !fileInfo.IsDir() { return nil, newErrNotDir(externalDir) } - file, err := os.Create(externalPath) + var file *os.File + var finalPath string + if putOptions.Atomic { + file, err = os.CreateTemp(externalDir, ".tmp"+filepath.Base(externalPath)+"*") + finalPath = externalPath + } else { + file, err = os.Create(externalPath) + } if err != nil { return nil, err } return newWriteObjectCloser( file, + finalPath, ), nil } @@ -343,18 +357,29 @@ func (r *readObjectCloser) Close() error { type writeObjectCloser struct { file *os.File + // path is set during atomic writes to the final path where the file should be created. + // If set, the file is a temp file that needs to be renamed to this path if Write/Close are successful. + path string + // writeErr contains the first non-nil error caught by a call to Write. + // This is returned in Close for atomic writes to prevent writing an incomplete file. + writeErr atomic.Error } func newWriteObjectCloser( file *os.File, + path string, ) *writeObjectCloser { return &writeObjectCloser{ file: file, + path: path, } } func (w *writeObjectCloser) Write(p []byte) (int, error) { n, err := w.file.Write(p) + if err != nil { + w.writeErr.CompareAndSwap(nil, err) + } return n, toStorageError(err) } @@ -364,6 +389,17 @@ func (w *writeObjectCloser) SetExternalPath(string) error { func (w *writeObjectCloser) Close() error { err := toStorageError(w.file.Close()) + // This is an atomic write operation - we need to rename to the final path + if w.path != "" { + atomicWriteErr := multierr.Append(w.writeErr.Load(), err) + // Failed during Write or Close - remove temporary file without rename + if atomicWriteErr != nil { + return toStorageError(multierr.Append(atomicWriteErr, os.Remove(w.file.Name()))) + } + if err := os.Rename(w.file.Name(), w.path); err != nil { + return toStorageError(multierr.Append(err, os.Remove(w.file.Name()))) + } + } return err } @@ -373,7 +409,7 @@ func newErrNotDir(path string) *normalpath.Error { } func toStorageError(err error) error { - if err == os.ErrClosed { + if errors.Is(err, os.ErrClosed) { return storage.ErrClosed } return err