Skip to content

Commit

Permalink
storage: create a protobuf to hold stores
Browse files Browse the repository at this point in the history
WIP: This commit will be split up eventually.
Epic: none

Release note: None
  • Loading branch information
andrewbaptist committed Jan 8, 2025
1 parent 3aa4b7f commit af8b555
Show file tree
Hide file tree
Showing 16 changed files with 558 additions and 85 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2303,6 +2303,7 @@ GO_TARGETS = [
"//pkg/sql/vtable:vtable",
"//pkg/sql:sql",
"//pkg/sql:sql_test",
"//pkg/storage/configpb:configpb",
"//pkg/storage/disk:disk",
"//pkg/storage/disk:disk_test",
"//pkg/storage/enginepb:enginepb",
Expand Down
7 changes: 7 additions & 0 deletions pkg/cli/cliflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,13 @@ only present for backward compatibility.
`,
}

BootstrapMount = FlagInfo{
Name: "bootstrap",
Description: `
Root directory of one store's mount point. This is used to find the server store configuration and load the store.
`,
}

SecondaryCache = FlagInfo{
Name: "experimental-secondary-cache",
Description: `
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ func init() {

cliflagcfg.VarFlag(f, &storeSpecs, cliflags.Store)
cliflagcfg.VarFlag(f, &serverCfg.StorageEngine, cliflags.StorageEngine)
cliflagcfg.StringFlag(f, &serverCfg.BootstrapMount, cliflags.BootstrapMount)
cliflagcfg.VarFlag(f, &serverCfg.WALFailover, cliflags.WALFailover)
cliflagcfg.StringFlag(f, &serverCfg.SharedStorage, cliflags.SharedStorage)
cliflagcfg.VarFlag(f, &serverCfg.SecondaryCache, cliflags.SecondaryCache)
Expand Down
1 change: 1 addition & 0 deletions pkg/gen/protobuf.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ PROTOBUF_SRCS = [
"//pkg/sql/types:types_go_proto",
"//pkg/sql/vecindex/quantize:quantize_go_proto",
"//pkg/sql/vecindex/vecstore:vecstore_go_proto",
"//pkg/storage/configpb:configpb_go_proto",
"//pkg/storage/enginepb:enginepb_go_proto",
"//pkg/testutils/grpcutils:grpcutils_go_proto",
"//pkg/ts/catalog:catalog_go_proto",
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ go_library(
"//pkg/sql/ttl/ttljob",
"//pkg/sql/ttl/ttlschedule",
"//pkg/storage",
"//pkg/storage/configpb",
"//pkg/storage/disk",
"//pkg/storage/enginepb",
"//pkg/storage/fs",
Expand Down
225 changes: 159 additions & 66 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/license"
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/configpb"
"github.com/cockroachdb/cockroach/pkg/storage/disk"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
Expand Down Expand Up @@ -225,6 +227,9 @@ type BaseConfig struct {
// Stores is specified to enable durable key-value storage.
Stores base.StoreSpecList

//Bootstrap store is the location of one store.
BootstrapMount string

// WALFailover enables and configures automatic WAL failover when latency to
// a store's primary WAL increases.
WALFailover base.WALFailoverConfig
Expand Down Expand Up @@ -700,31 +705,84 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {
return Engines{}, errors.Errorf("engines already created")
}
cfg.enginesCreated = true
bootstrapConfig := configpb.Storage{}
if cfg.BootstrapMount != "" {
config, exists, err := storage.LoadNodeStoreConfig(ctx, cfg.BootstrapMount)
if err != nil {
return Engines{}, errors.Wrap(err, "problem retriving existing config")
}
if exists {
bootstrapConfig = config
}
}
// TODO: Validate the bootstrap config from cfg matches the one from disk.
bootstrapConfig = cfg.convertToStorage()
engines, err := createEnginesInternal(
ctx,
bootstrapConfig,
&cfg.TestingKnobs,
cfg.DiskWriteStats,
cfg.CacheSize,
cfg.EarlyBootExternalStorageAccessor,
cfg.DiskMonitorManager,
cfg.Settings,
)

// Validate all the existing configs before writing out the new ones.
for _, eng := range engines {
storeConfig, found, err := eng.(*storage.Pebble).LoadNodeStoreConfig(ctx)
if err != nil {
return Engines{}, err
}
if found && !bootstrapConfig.Equal(&storeConfig) {
return Engines{}, errors.Newf("config changed %v != %v", bootstrapConfig, storeConfig)
}
}

// Store the new config to each store if there wasn't a previous config.
if len(bootstrapConfig.StoreSpec) > 0 {
for _, eng := range engines {
err = eng.(*storage.Pebble).PersistNodeStoreConfig(ctx, bootstrapConfig)
if err != nil {
fmt.Printf("Different error %+v", err)
return Engines{}, err
}
}
}
return engines, nil
}

// createEnginesInternal creates Engines based on the specs in cfg.Stores.
func createEnginesInternal(
ctx context.Context,
config configpb.Storage,
testingKnobs *base.TestingKnobs,
diskWriteStats disk.WriteStatsManager,
cacheSize int64,
earlyBootAccessor *cloud.EarlyBootExternalStorageAccessor,
diskMonitorManager *disk.MonitorManager,
settings *cluster.Settings,
) (Engines, error) {
var engines Engines
defer engines.Close()

storeKnobs := &kvserver.StoreTestingKnobs{}
if s := testingKnobs.Store; s != nil {
storeKnobs = s.(*kvserver.StoreTestingKnobs)
}

var details []redact.RedactableString
detail := func(msg redact.RedactableString) {
details = append(details, msg)
}
detail(redact.Sprintf("Pebble cache size: %s", humanizeutil.IBytes(cfg.CacheSize)))
pebbleCache := pebble.NewCache(cfg.CacheSize)
detail(redact.Sprintf("Pebble cache size: %s", humanizeutil.IBytes(cacheSize)))
pebbleCache := pebble.NewCache(cacheSize)
defer pebbleCache.Unref()

var sharedStorage cloud.ExternalStorage
if cfg.SharedStorage != "" {
var err error
// Note that we don't pass an io interceptor here. Instead, we record shared
// storage metrics on a per-store basis; see storage.Metrics.
sharedStorage, err = cloud.ExternalStorageFromURI(ctx, cfg.SharedStorage,
base.ExternalIODirConfig{}, cfg.Settings, nil, cfg.User, nil,
nil, cloud.NilMetrics)
if err != nil {
return nil, err
}
}

// TODO: Shared storage
var physicalStores int
for _, spec := range cfg.Stores.Specs {
if !spec.InMemory {
for _, spec := range config.StoreSpec {
if spec.GetPhysical() != nil {
physicalStores++
}
}
Expand All @@ -743,106 +801,122 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {
fileCache = pebble.NewFileCache(pebbleCache, runtime.GOMAXPROCS(0), totalFileLimit)
}

var storeKnobs kvserver.StoreTestingKnobs
var stickyRegistry fs.StickyRegistry
if s := cfg.TestingKnobs.Store; s != nil {
storeKnobs = *s.(*kvserver.StoreTestingKnobs)
}
if cfg.TestingKnobs.Server != nil {
serverKnobs := cfg.TestingKnobs.Server.(*TestingKnobs)
if testingKnobs != nil && testingKnobs.Server != nil {
serverKnobs := testingKnobs.Server.(*TestingKnobs)
stickyRegistry = serverKnobs.StickyVFSRegistry
}

storeEnvs, err := fs.InitEnvsFromStoreSpecs(ctx, cfg.Stores.Specs, fs.ReadWrite, stickyRegistry, cfg.DiskWriteStats)
if err != nil {
return Engines{}, err
}
storeEnvs := make(fs.Envs, len(config.StoreSpec))
defer storeEnvs.CloseAll()
for i, spec := range config.StoreSpec {
var err error
storeEnvs[i], err = fs.InitEnvFromStoreConfig(ctx, spec, fs.ReadWrite, stickyRegistry, diskWriteStats)
if err != nil {
return Engines{}, err
}
}

walFailoverConfig := storage.WALFailover(cfg.WALFailover, storeEnvs, vfs.Default, cfg.DiskWriteStats)
// FIXME
deprecatedConfig := base.WALFailoverConfig{
Mode: base.WALFailoverMode(0),
// Path: persistedConfig.WalFailover.Path,
// PrevPath: persistedConfig.WalFailover.PrevPath,
}
walFailoverConfig := storage.WALFailover(deprecatedConfig, storeEnvs, vfs.Default, diskWriteStats)

for i, spec := range cfg.Stores.Specs {
for i, spec := range config.StoreSpec {
log.Eventf(ctx, "initializing %+v", spec)

storageConfigOpts := []storage.ConfigOption{
walFailoverConfig,
storage.Attributes(spec.Attributes),
storage.If(storeKnobs.SmallEngineBlocks, storage.BlockSize(1)),
storage.BlockConcurrencyLimitDivisor(len(cfg.Stores.Specs)),
// TODO: Should this only include the physical stores?
storage.BlockConcurrencyLimitDivisor(len(config.StoreSpec)),
}
if len(storeKnobs.EngineKnobs) > 0 {
storageConfigOpts = append(storageConfigOpts, storeKnobs.EngineKnobs...)
}
addCfgOpt := func(opt storage.ConfigOption) {
storageConfigOpts = append(storageConfigOpts, opt)
}
addCfgOpt(storage.RemoteStorageFactory(earlyBootAccessor))

if spec.InMemory {
var sizeInBytes = spec.Size.InBytes
if spec.Size.Percent > 0 {
sysMem, err := status.GetTotalMemory(ctx)
if err != nil {
return Engines{}, errors.Errorf("could not retrieve system memory")
}
sizeInBytes = int64(float64(sysMem) * spec.Size.Percent / 100)
}
store := spec.Store
switch s := store.(type) {
case *configpb.Store_Memory:
var sizeInBytes = spec.Properties.Capacity
if sizeInBytes != 0 && !storeKnobs.SkipMinSizeCheck && sizeInBytes < base.MinimumStoreSize {
return Engines{}, errors.Errorf("%f%% of memory is only %s bytes, which is below the minimum requirement of %s",
spec.Size.Percent, humanizeutil.IBytes(sizeInBytes), humanizeutil.IBytes(base.MinimumStoreSize))
spec.Properties.Percent, humanizeutil.IBytes(sizeInBytes), humanizeutil.IBytes(base.MinimumStoreSize))
}
addCfgOpt(storage.MaxSizeBytes(sizeInBytes))
addCfgOpt(storage.CacheSize(cfg.CacheSize))
addCfgOpt(storage.RemoteStorageFactory(cfg.EarlyBootExternalStorageAccessor))
addCfgOpt(storage.CacheSize(cacheSize))

detail(redact.Sprintf("store %d: in-memory, size %s", i, humanizeutil.IBytes(sizeInBytes)))
} else {
case *configpb.Store_Physical:
path := s.Physical.Path.Path

// NB: We've already initialized an *fs.Env backed by the real
// physical filesystem. This initialization will create the
// data directory if it didn't already exist.
du, err := storeEnvs[i].UnencryptedFS.GetDiskUsage(spec.Path)
du, err := storeEnvs[i].UnencryptedFS.GetDiskUsage(path)
if err != nil {
return Engines{}, errors.Wrap(err, "retrieving disk usage")
}
var sizeInBytes = spec.Size.InBytes
if spec.Size.Percent > 0 {
sizeInBytes = int64(float64(du.TotalBytes) * spec.Size.Percent / 100)
sizeInBytes := spec.Properties.Capacity
if spec.Properties.Percent > 0 {
sizeInBytes = int64(float64(du.TotalBytes) * spec.Properties.Percent / 100)
}
if sizeInBytes != 0 && !storeKnobs.SkipMinSizeCheck && sizeInBytes < base.MinimumStoreSize {
return Engines{}, errors.Errorf("%f%% of %s's total free space is only %s bytes, which is below the minimum requirement of %s",
spec.Size.Percent, spec.Path, humanizeutil.IBytes(sizeInBytes), humanizeutil.IBytes(base.MinimumStoreSize))
spec.Properties.Percent, path, humanizeutil.IBytes(sizeInBytes), humanizeutil.IBytes(base.MinimumStoreSize))
}
monitor, err := cfg.DiskMonitorManager.Monitor(spec.Path)
monitor, err := diskMonitorManager.Monitor(path)
if err != nil {
return Engines{}, errors.Wrap(err, "creating disk monitor")
}

statsCollector, err := cfg.DiskWriteStats.GetOrCreateCollector(spec.Path)
statsCollector, err := diskWriteStats.GetOrCreateCollector(path)
if err != nil {
return Engines{}, errors.Wrap(err, "retrieving stats collector")
}
addCfgOpt(storage.DiskWriteStatsCollector(statsCollector))

if spec.Size.Percent > 0 {
detail(redact.Sprintf("store %d: max size %s (calculated from %.2f percent of total), max open file limit %d", i, humanizeutil.IBytes(sizeInBytes), spec.Size.Percent, openFileLimitPerStore))
addCfgOpt(storage.MaxSizePercent(spec.Size.Percent / 100))
} else {
detail(redact.Sprintf("store %d: max size %s, max open file limit %d", i, humanizeutil.IBytes(sizeInBytes), openFileLimitPerStore))
addCfgOpt(storage.MaxSizeBytes(sizeInBytes))
detail(redact.Sprintf("store %d: max size %s, max open file limit %d", i, humanizeutil.IBytes(sizeInBytes), openFileLimitPerStore))
addCfgOpt(storage.MaxSizeBytes(sizeInBytes))
if s.Physical.Ballast != nil {
addCfgOpt(storage.BallastSize(storage.BallastSizeBytes(s.Physical.Ballast.Capacity, s.Physical.Ballast.Percent, du)))
}
addCfgOpt(storage.BallastSize(storage.BallastSizeBytes(spec, du)))
addCfgOpt(storage.Caches(pebbleCache, fileCache))
// TODO(radu): move up all remaining settings below so they apply to in-memory stores as well.
addCfgOpt(storage.MaxOpenFiles(int(openFileLimitPerStore)))
addCfgOpt(storage.MaxWriterConcurrency(2))
addCfgOpt(storage.RemoteStorageFactory(cfg.EarlyBootExternalStorageAccessor))
if sharedStorage != nil {

var sharedStorage cloud.ExternalStorage
if spec.SharedStorage != nil {
var err error
// Note that we don't pass an io interceptor here. Instead, we record shared
// storage metrics on a per-store basis; see storage.Metrics.
user, err := username.MakeSQLUsernameFromUserInput(spec.SharedStorage.User, username.PurposeValidation)
if err != nil {
return nil, err
}
sharedStorage, err = cloud.ExternalStorageFromURI(ctx, spec.SharedStorage.Uri,
base.ExternalIODirConfig{}, settings, nil, user, nil,
nil, cloud.NilMetrics)
if err != nil {
return nil, err
}
addCfgOpt(storage.SharedStorage(sharedStorage))
addCfgOpt(storage.SecondaryCache(storage.SecondaryCacheBytes(spec.SharedStorage.Cache.Capacity, spec.SharedStorage.Cache.Percent, du)))
}
addCfgOpt(storage.SecondaryCache(storage.SecondaryCacheBytes(cfg.SecondaryCache, du)))

addCfgOpt(storage.DiskMonitor(monitor))
// If the spec contains Pebble options, set those too.
if spec.PebbleOptions != "" {
addCfgOpt(storage.PebbleOptions(spec.PebbleOptions, &pebble.ParseHooks{
if s.Physical.Options != "" {
addCfgOpt(storage.PebbleOptions(s.Physical.Options, &pebble.ParseHooks{
NewFilterPolicy: func(name string) (pebble.FilterPolicy, error) {
switch name {
case "none":
Expand All @@ -854,11 +928,8 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {
},
}))
}
if len(spec.RocksDBOptions) > 0 {
return nil, errors.Errorf("store %d: using Pebble storage engine but StoreSpec provides RocksDB options", i)
}
}
eng, err := storage.Open(ctx, storeEnvs[i], cfg.Settings, storageConfigOpts...)
eng, err := storage.Open(ctx, storeEnvs[i], settings, storageConfigOpts...)
if err != nil {
return Engines{}, err
}
Expand Down Expand Up @@ -890,6 +961,28 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {
return enginesCopy, nil
}

// TODO: Convert the rest of the spec.
func (cfg *Config) convertToStorage() configpb.Storage {
config := configpb.Storage{}
for _, spec := range cfg.Stores.Specs {
// TODO: Handle memory stores.
store :=
configpb.Store_Physical{
Physical: &configpb.PhysicalStore{
Path: &configpb.ExternalPath{
Path: spec.Path,
Encryption: &configpb.EncryptionOptions{},
},
Options: spec.PebbleOptions,
Ballast: &configpb.DiskProperties{Percent: 1},
},
}

config.StoreSpec = append(config.StoreSpec, configpb.Store{Store: &store})
}
return config
}

// InitSQLServer finalizes the configuration of a SQL-only node.
// It initializes additional configuration flags from the environment.
func (cfg *Config) InitSQLServer(ctx context.Context) error {
Expand Down
Loading

0 comments on commit af8b555

Please sign in to comment.