Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): AWS backend using thanos.io/objstore #11221

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
562 changes: 562 additions & 0 deletions docs/sources/shared/configuration.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func New(
builderID := uuid.NewString()
logger = log.With(logger, "builder_id", builderID)

tsdbStore, err := common.NewTSDBStores(schemaCfg, storeCfg, storageMetrics, logger)
tsdbStore, err := common.NewTSDBStores("bloom-builder", schemaCfg, storeCfg, storageMetrics, logger, r)
if err != nil {
return nil, fmt.Errorf("error creating TSDB store: %w", err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

Expand Down Expand Up @@ -175,10 +176,12 @@ type TSDBStores struct {
}

func NewTSDBStores(
component string,
schemaCfg config.SchemaConfig,
storeCfg baseStore.Config,
clientMetrics baseStore.ClientMetrics,
logger log.Logger,
reg prometheus.Registerer,
) (*TSDBStores, error) {
res := &TSDBStores{
schemaCfg: schemaCfg,
Expand All @@ -188,7 +191,7 @@ func NewTSDBStores(
for i, cfg := range schemaCfg.Configs {
if cfg.IndexType == types.TSDBType {

c, err := baseStore.NewObjectClient(cfg.ObjectType, storeCfg, clientMetrics)
c, err := baseStore.NewObjectClient(component, cfg.ObjectType, storeCfg, clientMetrics, reg)
if err != nil {
return nil, errors.Wrap(err, "failed to create object client")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func New(
) (*Planner, error) {
utillog.WarnExperimentalUse("Bloom Planner", logger)

tsdbStore, err := common.NewTSDBStores(schemaCfg, storeCfg, storageMetrics, logger)
tsdbStore, err := common.NewTSDBStores("bloom-planner", schemaCfg, storeCfg, storageMetrics, logger, r)
if err != nil {
return nil, fmt.Errorf("error creating TSDB store: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloomcompactor/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

Expand Down Expand Up @@ -188,7 +189,7 @@ func NewTSDBStores(
for i, cfg := range schemaCfg.Configs {
if cfg.IndexType == types.TSDBType {

c, err := baseStore.NewObjectClient(cfg.ObjectType, storeCfg, clientMetrics)
c, err := baseStore.NewObjectClient("tsdb-store", cfg.ObjectType, storeCfg, clientMetrics, prometheus.DefaultRegisterer)
if err != nil {
return nil, errors.Wrap(err, "failed to create object client")
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,9 +539,11 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string

func GetObjectClient(store string, conf loki.Config, cm storage.ClientMetrics) (chunk.ObjectClient, error) {
oc, err := storage.NewObjectClient(
"log-cli-query",
store,
conf.StorageConfig,
cm,
prometheus.DefaultRegisterer,
)
if err != nil {
return nil, err
Expand Down
7 changes: 7 additions & 0 deletions pkg/loki/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/netutil"

"github.com/grafana/loki/v3/pkg/storage/bucket"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/alibaba"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/aws"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/azure"
Expand Down Expand Up @@ -58,6 +59,8 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Ring.RegisterFlagsWithPrefix("common.storage.", "collectors/", f)
c.Ring.RegisterFlagsWithPrefix("common.storage.", "collectors/", throwaway)

f.BoolVar(&c.Storage.ThanosObjStore, "common.thanos.enable", false, "Enable the thanos.io/objstore to be the backend for object storage")

// instance related flags.
c.InstanceInterfaceNames = netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, util_log.Logger)
throwaway.StringVar(&c.InstanceAddr, "common.instance-addr", "", "Default advertised address to be used by Loki components.")
Expand All @@ -78,6 +81,8 @@ type Storage struct {
Hedging hedging.Config `yaml:"hedging"`
COS ibmcloud.COSConfig `yaml:"cos"`
CongestionControl congestion.Config `yaml:"congestion_control,omitempty"`
ThanosObjStore bool `yaml:"thanos_objstore"`
ObjStoreConf bucket.Config `yaml:"objstore_config"`
}

func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
Expand All @@ -91,6 +96,8 @@ func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
s.Hedging.RegisterFlagsWithPrefix(prefix, f)
s.COS.RegisterFlagsWithPrefix(prefix, f)
s.CongestionControl.RegisterFlagsWithPrefix(prefix, f)

s.ObjStoreConf.RegisterFlagsWithPrefix(prefix+"thanos.", f)
}

type FilesystemConfig struct {
Expand Down
11 changes: 11 additions & 0 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,17 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
}
}

if !reflect.DeepEqual(cfg.Common.Storage.ObjStoreConf, defaults.StorageConfig.ObjStoreConf) {
configsFound++

applyConfig = func(r *ConfigWrapper) {
//TODO (JoaoBraveCoding) Add Ruler config
r.StorageConfig.ThanosObjStore = r.Common.Storage.ThanosObjStore
r.StorageConfig.ObjStoreConf = r.Common.Storage.ObjStoreConf
r.StorageConfig.Hedging = r.Common.Storage.Hedging
}
}

if configsFound > 1 {
return ErrTooManyStorageConfigs
}
Expand Down
12 changes: 5 additions & 7 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,9 +673,7 @@ func (t *Loki) initTableManager() (services.Service, error) {
os.Exit(1)
}

reg := prometheus.WrapRegistererWith(prometheus.Labels{"component": "table-manager-store"}, prometheus.DefaultRegisterer)

tableClient, err := storage.NewTableClient(lastConfig.IndexType, *lastConfig, t.Cfg.StorageConfig, t.ClientMetrics, reg, util_log.Logger)
tableClient, err := storage.NewTableClient("table-manager-store", lastConfig.IndexType, *lastConfig, t.Cfg.StorageConfig, t.ClientMetrics, prometheus.DefaultRegisterer, util_log.Logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1363,7 +1361,7 @@ func (t *Loki) initCompactor() (services.Service, error) {
continue
}

objectClient, err := storage.NewObjectClient(periodConfig.ObjectType, t.Cfg.StorageConfig, t.ClientMetrics)
objectClient, err := storage.NewObjectClient("compactor", periodConfig.ObjectType, t.Cfg.StorageConfig, t.ClientMetrics, prometheus.DefaultRegisterer)
if err != nil {
return nil, fmt.Errorf("failed to create object client: %w", err)
}
Expand All @@ -1374,7 +1372,7 @@ func (t *Loki) initCompactor() (services.Service, error) {
var deleteRequestStoreClient client.ObjectClient
if t.Cfg.CompactorConfig.RetentionEnabled {
if deleteStore := t.Cfg.CompactorConfig.DeleteRequestStore; deleteStore != "" {
if deleteRequestStoreClient, err = storage.NewObjectClient(deleteStore, t.Cfg.StorageConfig, t.ClientMetrics); err != nil {
if deleteRequestStoreClient, err = storage.NewObjectClient("compactor", deleteStore, t.Cfg.StorageConfig, t.ClientMetrics, prometheus.DefaultRegisterer); err != nil {
return nil, fmt.Errorf("failed to create delete request store object client: %w", err)
}
} else {
Expand Down Expand Up @@ -1441,7 +1439,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
}
tableRange := period.GetIndexTableNumberRange(periodEndTime)

indexClient, err := storage.NewIndexClient(period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.ClientMetrics, shardingStrategy,
indexClient, err := storage.NewIndexClient("index-gateway", period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.ClientMetrics, shardingStrategy,
prometheus.DefaultRegisterer, log.With(util_log.Logger, "index-store", fmt.Sprintf("%s-%s", period.IndexType, period.From.String())), t.Cfg.MetricsNamespace,
)
if err != nil {
Expand Down Expand Up @@ -1711,7 +1709,7 @@ func (t *Loki) initAnalytics() (services.Service, error) {
return nil, err
}

objectClient, err := storage.NewObjectClient(period.ObjectType, t.Cfg.StorageConfig, t.ClientMetrics)
objectClient, err := storage.NewObjectClient("analytics", period.ObjectType, t.Cfg.StorageConfig, t.ClientMetrics, prometheus.DefaultRegisterer)
if err != nil {
level.Info(util_log.Logger).Log("msg", "failed to initialize usage report", "err", err)
return nil, nil
Expand Down
92 changes: 73 additions & 19 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"errors"
"flag"
"fmt"
"regexp"
"strings"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
opentracing "github.com/thanos-io/objstore/tracing/opentracing"
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"

"github.com/grafana/loki/v3/pkg/storage/bucket/azure"
"github.com/grafana/loki/v3/pkg/storage/bucket/filesystem"
Expand All @@ -35,54 +36,59 @@ const (

// Filesystem is the value for the filesystem storage backend.
Filesystem = "filesystem"

// validPrefixCharactersRegex allows only alphanumeric characters to prevent subtle bugs and simplify validation
validPrefixCharactersRegex = `^[\da-zA-Z]+$`
)

var (
SupportedBackends = []string{S3, GCS, Azure, Swift, Filesystem}

ErrUnsupportedStorageBackend = errors.New("unsupported storage backend")
ErrUnsupportedStorageBackend = errors.New("unsupported storage backend")
ErrInvalidCharactersInStoragePrefix = errors.New("storage prefix contains invalid characters, it may only contain digits and English alphabet letters")
)

// Config holds configuration for accessing long-term storage.
type Config struct {
// StorageBackendConfig holds configuration for accessing long-term storage.
type StorageBackendConfig struct {
Backend string `yaml:"backend"`

// Backends
S3 s3.Config `yaml:"s3"`
GCS gcs.Config `yaml:"gcs"`
Azure azure.Config `yaml:"azure"`
Swift swift.Config `yaml:"swift"`
Filesystem filesystem.Config `yaml:"filesystem"`

// Not used internally, meant to allow callers to wrap Buckets
// created using this config
Middlewares []func(objstore.Bucket) (objstore.Bucket, error) `yaml:"-"`

// Used to inject additional backends into the config. Allows for this config to
// be embedded in multiple contexts and support non-object storage based backends.
ExtraBackends []string `yaml:"-"`
}

// Returns the supportedBackends for the package and any custom backends injected into the config.
func (cfg *Config) supportedBackends() []string {
func (cfg *StorageBackendConfig) supportedBackends() []string {
return append(SupportedBackends, cfg.ExtraBackends...)
}

// RegisterFlags registers the backend storage config.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *StorageBackendConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
func (cfg *StorageBackendConfig) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) {
cfg.S3.RegisterFlagsWithPrefix(prefix, f)
cfg.GCS.RegisterFlagsWithPrefix(prefix, f)
cfg.Azure.RegisterFlagsWithPrefix(prefix, f)
cfg.Swift.RegisterFlagsWithPrefix(prefix, f)
cfg.Filesystem.RegisterFlagsWithPrefix(prefix, f)
cfg.Filesystem.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir, f)

f.StringVar(&cfg.Backend, prefix+"backend", S3, fmt.Sprintf("Backend storage to use. Supported backends are: %s.", strings.Join(cfg.supportedBackends(), ", ")))
f.StringVar(&cfg.Backend, prefix+"backend", Filesystem, fmt.Sprintf("Backend storage to use. Supported backends are: %s.", strings.Join(cfg.supportedBackends(), ", ")))
}

func (cfg *Config) Validate() error {
func (cfg *StorageBackendConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, "", f)
}

func (cfg *StorageBackendConfig) Validate() error {
if !util.StringsContain(cfg.supportedBackends(), cfg.Backend) {
return ErrUnsupportedStorageBackend
}
Expand All @@ -96,8 +102,49 @@ func (cfg *Config) Validate() error {
return nil
}

// Config holds configuration for accessing long-term storage.
type Config struct {
StorageBackendConfig `yaml:",inline"`

StoragePrefix string `yaml:"storage_prefix"`

// Not used internally, meant to allow callers to wrap Buckets
// created using this config
Middlewares []func(objstore.InstrumentedBucket) (objstore.InstrumentedBucket, error) `yaml:"-"`
}

// RegisterFlags registers the backend storage config.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}

func (cfg *Config) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) {
cfg.StorageBackendConfig.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir, f)
f.StringVar(&cfg.StoragePrefix, prefix+"storage-prefix", "", "Prefix for all objects stored in the backend storage. For simplicity, it may only contain digits and English alphabet letters.")
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, "", f)
}

func (cfg *Config) Validate() error {
if cfg.StoragePrefix != "" {
acceptablePrefixCharacters := regexp.MustCompile(validPrefixCharactersRegex)
if !acceptablePrefixCharacters.MatchString(cfg.StoragePrefix) {
return ErrInvalidCharactersInStoragePrefix
}
}

return cfg.StorageBackendConfig.Validate()
}

// NewClient creates a new bucket client based on the configured backend
func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (client objstore.Bucket, err error) {
func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
var (
client objstore.Bucket
err error
)

switch cfg.Backend {
case S3:
client, err = s3.NewBucketClient(cfg.S3, name, logger)
Expand All @@ -117,26 +164,33 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger,
return nil, err
}

client = opentracing.WrapWithTraces(bucketWithMetrics(client, name, reg))
if cfg.StoragePrefix != "" {
client = NewPrefixedBucketClient(client, cfg.StoragePrefix)
}

instrumentedClient := objstoretracing.WrapWithTraces(bucketWithMetrics(client, name, reg))

// Wrap the client with any provided middleware
for _, wrap := range cfg.Middlewares {
client, err = wrap(client)
instrumentedClient, err = wrap(instrumentedClient)
if err != nil {
return nil, err
}
}

return client, nil
return instrumentedClient, nil
}

func bucketWithMetrics(bucketClient objstore.Bucket, name string, reg prometheus.Registerer) objstore.Bucket {
if reg == nil {
return bucketClient
}

reg = prometheus.WrapRegistererWithPrefix("loki_", reg)
reg = prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, reg)

return objstore.WrapWithMetrics(
bucketClient,
prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, reg),
reg,
"")
}
8 changes: 7 additions & 1 deletion pkg/storage/bucket/filesystem/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}

// RegisterFlagsWithPrefixAndDefaultDirectory registers the flags for filesystem
// storage with the provided prefix and sets the default directory to dir.
func (cfg *Config) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) {
f.StringVar(&cfg.Directory, prefix+"filesystem.dir", dir, "Local filesystem storage directory.")
}

// RegisterFlagsWithPrefix registers the flags for filesystem storage with the provided prefix
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Directory, prefix+"filesystem.dir", "", "Local filesystem storage directory.")
cfg.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, "", f)
}
8 changes: 5 additions & 3 deletions pkg/storage/bucket/http/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Config struct {
MaxIdleConns int `yaml:"max_idle_connections"`
MaxIdleConnsPerHost int `yaml:"max_idle_connections_per_host"`
MaxConnsPerHost int `yaml:"max_connections_per_host"`
CAFile string `yaml:"ca_file"`
}

// RegisterFlags registers the flags for the storage HTTP client.
Expand All @@ -24,12 +25,13 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

// RegisterFlagsWithPrefix registers the flags for the storage HTTP client with the provided prefix.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.IdleConnTimeout, prefix+"http.idle-conn-timeout", 90*time.Second, "The time an idle connection will remain idle before closing.")
f.DurationVar(&cfg.ResponseHeaderTimeout, prefix+"http.response-header-timeout", 2*time.Minute, "The amount of time the client will wait for a servers response headers.")
f.BoolVar(&cfg.InsecureSkipVerify, prefix+"http.insecure-skip-verify", false, "If the client connects via HTTPS and this option is enabled, the client will accept any certificate and hostname.")
f.DurationVar(&cfg.IdleConnTimeout, prefix+"idle-conn-timeout", 90*time.Second, "The time an idle connection will remain idle before closing.")
f.DurationVar(&cfg.ResponseHeaderTimeout, prefix+"response-header-timeout", 2*time.Minute, "The amount of time the client will wait for a servers response headers.")
f.BoolVar(&cfg.InsecureSkipVerify, prefix+"insecure-skip-verify", false, "If the client connects via HTTPS and this option is enabled, the client will accept any certificate and hostname.")
f.DurationVar(&cfg.TLSHandshakeTimeout, prefix+"tls-handshake-timeout", 10*time.Second, "Maximum time to wait for a TLS handshake. 0 means no limit.")
f.DurationVar(&cfg.ExpectContinueTimeout, prefix+"expect-continue-timeout", 1*time.Second, "The time to wait for a server's first response headers after fully writing the request headers if the request has an Expect header. 0 to send the request body immediately.")
f.IntVar(&cfg.MaxIdleConns, prefix+"max-idle-connections", 100, "Maximum number of idle (keep-alive) connections across all hosts. 0 means no limit.")
f.IntVar(&cfg.MaxIdleConnsPerHost, prefix+"max-idle-connections-per-host", 100, "Maximum number of idle (keep-alive) connections to keep per-host. If 0, a built-in default value is used.")
f.IntVar(&cfg.MaxConnsPerHost, prefix+"max-connections-per-host", 0, "Maximum number of connections per host. 0 means no limit.")
f.StringVar(&cfg.CAFile, prefix+"ca-file", "", "Path to the trusted CA file that signed the SSL certificate of the object storage endpoint.")
}
Loading
Loading