Skip to content

Commit

Permalink
feat store: added readiness and livenes prober
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Chodur <m.chodur@seznam.cz>
  • Loading branch information
FUSAKLA committed Aug 26, 2019
1 parent af53b38 commit 0027f18
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 87 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#1358](https://github.com/thanos-io/thanos/pull/1358) Added `part_size` configuration option for HTTP multipart requests minimum part size for S3 storage type
- [#1363](https://github.com/thanos-io/thanos/pull/1363) Thanos Receive now exposes `thanos_receive_hashring_nodes` and `thanos_receive_hashring_tenants` metrics to monitor status of hash-rings
- [TODO](https://github.com/improbable-eng/thanos/pull/1297) Added `/-/ready` and `/-/healthy` endpoints to Thanos store.


### Changed

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func main() {

cmds := map[string]setupFunc{}
registerSidecar(cmds, app, "sidecar")
registerStore(cmds, app, "store")
registerStore(cmds, app)
registerQuery(cmds, app, "query")
registerRule(cmds, app, "rule")
registerCompact(cmds, app)
Expand Down
175 changes: 90 additions & 85 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"net"
"time"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/prober"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/objstore/client"
Expand All @@ -17,12 +20,12 @@ import (
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/storepb"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/alecthomas/kingpin.v2"
)

// registerStore registers a store command.
func registerStore(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift and Tencent COS.")
func registerStore(m map[string]setupFunc, app *kingpin.Application) {
cmd := app.Command(component.Store.String(), "store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift and Tencent COS.")

grpcBindAddr, httpBindAddr, cert, key, clientCA := regCommonServerFlags(cmd)

Expand All @@ -49,7 +52,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
Default("20").Int()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
return runStore(g,
logger,
reg,
Expand All @@ -65,7 +68,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
uint64(*chunkPoolSize),
uint64(*maxSampleCount),
int(*maxConcurrent),
name,
component.Store,
debugLogging,
*syncInterval,
*blockSyncConcurrency,
Expand All @@ -90,102 +93,104 @@ func runStore(
chunkPoolSizeBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
component string,
component component.Component,
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
) error {
{
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}
statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))

bkt, err := client.NewBucket(logger, confContentYaml, reg, component)
if err != nil {
return errors.Wrap(err, "create bucket client")
}

// Ensure we close up everything properly.
defer func() {
if err != nil {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}
}()
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

// TODO(bwplotka): Add as a flag?
maxItemSizeBytes := indexCacheSizeBytes / 2
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.String())
if err != nil {
return errors.Wrap(err, "create bucket client")
}

indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{
MaxSizeBytes: indexCacheSizeBytes,
MaxItemSizeBytes: maxItemSizeBytes,
})
// Ensure we close up everything properly.
defer func() {
if err != nil {
return errors.Wrap(err, "create index cache")
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}
}()

bs, err := store.NewBucketStore(
logger,
reg,
bkt,
dataDir,
indexCache,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrent,
verbose,
blockSyncConcurrency,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
}
// TODO(bwplotka): Add as a flag?
maxItemSizeBytes := indexCacheSizeBytes / 2

begin := time.Now()
level.Debug(logger).Log("msg", "initializing bucket store")
if err := bs.InitialSync(context.Background()); err != nil {
return errors.Wrap(err, "bucket store initial sync")
}
level.Debug(logger).Log("msg", "bucket store ready", "init_duration", time.Since(begin).String())

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

err := runutil.Repeat(syncInterval, ctx.Done(), func() error {
if err := bs.SyncBlocks(ctx); err != nil {
level.Warn(logger).Log("msg", "syncing blocks failed", "err", err)
}
return nil
})

runutil.CloseWithLogOnErr(logger, bs, "bucket store")
return err
}, func(error) {
cancel()
})
indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{
MaxSizeBytes: indexCacheSizeBytes,
MaxItemSizeBytes: maxItemSizeBytes,
})
if err != nil {
return errors.Wrap(err, "create index cache")
}

l, err := net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}
bs, err := store.NewBucketStore(
logger,
reg,
bkt,
dataDir,
indexCache,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrent,
verbose,
blockSyncConcurrency,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
}

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "grpc server options")
}
begin := time.Now()
level.Debug(logger).Log("msg", "initializing bucket store")
if err := bs.InitialSync(context.Background()); err != nil {
return errors.Wrap(err, "bucket store initial sync")
}
level.Debug(logger).Log("msg", "bucket store ready", "init_duration", time.Since(begin).String())

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, bs)
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
s.Stop()
err := runutil.Repeat(syncInterval, ctx.Done(), func() error {
if err := bs.SyncBlocks(ctx); err != nil {
level.Warn(logger).Log("msg", "syncing blocks failed", "err", err)
}
return nil
})
}
if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil {

runutil.CloseWithLogOnErr(logger, bs, "bucket store")
return err
}, func(error) {
cancel()
})

l, err := net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "grpc server options")
}

s := grpc.NewServer(opts...)
storepb.RegisterStoreServer(s, bs)

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
statusProber.SetReady()
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
s.Stop()
})

if err := defaultHTTPListener(g, logger, reg, httpBindAddr, statusProber); err != nil {
return errors.Wrap(err, "create readiness prober")
}

level.Info(logger).Log("msg", "starting store node")
Expand Down
2 changes: 1 addition & 1 deletion pkg/prober/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (p *Prober) writeResponse(w http.ResponseWriter, probeFn func() error, prob
http.Error(w, fmt.Sprintf("thanos %v is not %v. Reason: %v", p.component, probeType, err), probeErrorHTTPStatus)
return
}
if _, err := io.WriteString(w, fmt.Sprintf("thanos %v is %v", p.component, probeType)); err == nil {
if _, err := io.WriteString(w, fmt.Sprintf("thanos %v is %v", p.component, probeType)); err != nil {
level.Error(p.logger).Log("msg", "failed to write probe response", "probe type", probeType, "err", err)
}
}
Expand Down
8 changes: 8 additions & 0 deletions tutorials/kubernetes-demo/manifests/thanos-store-gateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ spec:
containerPort: 10902
- name: grpc
containerPort: 10901
livenessProbe:
httpGet:
port: 10902
path: /-/healthy
readinessProbe:
httpGet:
port: 10902
path: /-/ready
resources:
limits:
cpu: "1"
Expand Down

0 comments on commit 0027f18

Please sign in to comment.