Skip to content

Commit

Permalink
feat store: added readiness and livenes prober
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Chodur <m.chodur@seznam.cz>
  • Loading branch information
FUSAKLA committed Aug 29, 2019
1 parent 7b60bfb commit f15ba68
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 88 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel

## Unreleased

### Added
- [#1460](https://github.com/thanos-io/thanos/pull/1460) Thanos store Added `/-/ready` and `/-/healthy` endpoints.


## v0.7.0-rc.0 - 2019.08.28

Accepted into CNCF:
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func main() {

cmds := map[string]setupFunc{}
registerSidecar(cmds, app)
registerStore(cmds, app, "store")
registerStore(cmds, app)
registerQuery(cmds, app, "query")
registerRule(cmds, app, "rule")
registerCompact(cmds, app)
Expand Down
180 changes: 93 additions & 87 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"net"
"time"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/prober"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/model"
Expand All @@ -18,12 +21,12 @@ import (
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/storepb"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/alecthomas/kingpin.v2"
)

// registerStore registers a store command.
func registerStore(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift and Tencent COS.")
func registerStore(m map[string]setupFunc, app *kingpin.Application) {
cmd := app.Command(component.Store.String(), "store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift and Tencent COS.")

grpcBindAddr, httpBindAddr, cert, key, clientCA := regCommonServerFlags(cmd)

Expand All @@ -40,7 +43,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
"Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: for efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit.").
Default("0").Uint()

maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int()
maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").
Default("20").Int()

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

Expand All @@ -56,7 +60,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
minTime, maxTime)
Expand All @@ -77,7 +81,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
uint64(*chunkPoolSize),
uint64(*maxSampleCount),
int(*maxConcurrent),
name,
component.Store,
debugLogging,
*syncInterval,
*blockSyncConcurrency,
Expand Down Expand Up @@ -106,104 +110,106 @@ func runStore(
chunkPoolSizeBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
component string,
component component.Component,
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
filterConf *store.FilterConfig,
) error {
{
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}
statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))

bkt, err := client.NewBucket(logger, confContentYaml, reg, component)
if err != nil {
return errors.Wrap(err, "create bucket client")
}

// Ensure we close up everything properly.
defer func() {
if err != nil {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}
}()
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

// TODO(bwplotka): Add as a flag?
maxItemSizeBytes := indexCacheSizeBytes / 2
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.String())
if err != nil {
return errors.Wrap(err, "create bucket client")
}

indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{
MaxSizeBytes: indexCacheSizeBytes,
MaxItemSizeBytes: maxItemSizeBytes,
})
// Ensure we close up everything properly.
defer func() {
if err != nil {
return errors.Wrap(err, "create index cache")
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}
}()

bs, err := store.NewBucketStore(
logger,
reg,
bkt,
dataDir,
indexCache,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrent,
verbose,
blockSyncConcurrency,
filterConf,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
}
// TODO(bwplotka): Add as a flag?
maxItemSizeBytes := indexCacheSizeBytes / 2

begin := time.Now()
level.Debug(logger).Log("msg", "initializing bucket store")
if err := bs.InitialSync(context.Background()); err != nil {
return errors.Wrap(err, "bucket store initial sync")
}
level.Debug(logger).Log("msg", "bucket store ready", "init_duration", time.Since(begin).String())

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

err := runutil.Repeat(syncInterval, ctx.Done(), func() error {
if err := bs.SyncBlocks(ctx); err != nil {
level.Warn(logger).Log("msg", "syncing blocks failed", "err", err)
}
return nil
})

runutil.CloseWithLogOnErr(logger, bs, "bucket store")
return err
}, func(error) {
cancel()
})
indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{
MaxSizeBytes: indexCacheSizeBytes,
MaxItemSizeBytes: maxItemSizeBytes,
})
if err != nil {
return errors.Wrap(err, "create index cache")
}

l, err := net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}
bs, err := store.NewBucketStore(
logger,
reg,
bkt,
dataDir,
indexCache,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrent,
verbose,
blockSyncConcurrency,
filterConf,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
}

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "grpc server options")
}
begin := time.Now()
level.Debug(logger).Log("msg", "initializing bucket store")
if err := bs.InitialSync(context.Background()); err != nil {
return errors.Wrap(err, "bucket store initial sync")
}
level.Debug(logger).Log("msg", "bucket store ready", "init_duration", time.Since(begin).String())

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, bs)
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
s.Stop()
err := runutil.Repeat(syncInterval, ctx.Done(), func() error {
if err := bs.SyncBlocks(ctx); err != nil {
level.Warn(logger).Log("msg", "syncing blocks failed", "err", err)
}
return nil
})
}
if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil {

runutil.CloseWithLogOnErr(logger, bs, "bucket store")
return err
}, func(error) {
cancel()
})

l, err := net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "grpc server options")
}

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, bs)

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
statusProber.SetReady()
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
s.Stop()
})

if err := defaultHTTPListener(g, logger, reg, httpBindAddr, statusProber); err != nil {
return errors.Wrap(err, "create readiness prober")
}

level.Info(logger).Log("msg", "starting store node")
Expand Down
8 changes: 8 additions & 0 deletions tutorials/kubernetes-demo/manifests/thanos-store-gateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ spec:
containerPort: 10902
- name: grpc
containerPort: 10901
livenessProbe:
httpGet:
port: 10902
path: /-/healthy
readinessProbe:
httpGet:
port: 10902
path: /-/ready
resources:
limits:
cpu: "1"
Expand Down

0 comments on commit f15ba68

Please sign in to comment.