Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat store: added readiness and livenes prober #1460

Merged
merged 1 commit into from
Sep 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ We use *breaking* word for marking changes that are not backward compatible (rel
## Unreleased

## Added
- [#1540](https://github.com/thanos-io/thanos/pull/1540) Added `/-/ready` and `/-/healthy` endpoints to Thanos Downsample.
- [#1538](https://github.com/thanos-io/thanos/pull/1538) Added `/-/ready` and `/-/healthy` endpoints to Thanos Rule.
- [#1537](https://github.com/thanos-io/thanos/pull/1537) Added `/-/ready` and `/-/healthy` endpoints to Thanos Receive.
- [#1534](https://github.com/thanos-io/thanos/pull/1534) Added `/-/ready` endpoint to Thanos Query.
- [#1540](https://github.com/thanos-io/thanos/pull/1540) Thanos Downsample added `/-/ready` and `/-/healthy` endpoints.
- [#1538](https://github.com/thanos-io/thanos/pull/1538) Thanos Rule added `/-/ready` and `/-/healthy` endpoints.
- [#1537](https://github.com/thanos-io/thanos/pull/1537) Thanos Receive added `/-/ready` and `/-/healthy` endpoints.
- [#1460](https://github.com/thanos-io/thanos/pull/1460) Thanos Store Added `/-/ready` and `/-/healthy` endpoints.
- [#1534](https://github.com/thanos-io/thanos/pull/1534) Thanos Query Added `/-/ready` and `/-/healthy` endpoints.
- [#1533](https://github.com/thanos-io/thanos/pull/1533) Thanos inspect now supports the timeout flag.

### Fixed
Expand Down
23 changes: 1 addition & 22 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func main() {

cmds := map[string]setupFunc{}
registerSidecar(cmds, app)
registerStore(cmds, app, "store")
registerStore(cmds, app)
registerQuery(cmds, app)
registerRule(cmds, app)
registerCompact(cmds, app)
Expand Down Expand Up @@ -333,27 +333,6 @@ func newStoreGRPCServer(logger log.Logger, reg *prometheus.Registry, tracer open
return s
}

// TODO Remove once all components are migrated to the new scheduleHTTPServer.
// metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics.
func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error {
mux := http.NewServeMux()
registerMetrics(mux, reg)
registerProfile(mux)

l, err := net.Listen("tcp", httpBindAddr)
if err != nil {
return errors.Wrap(err, "listen metrics address")
}

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for metrics", "address", httpBindAddr)
return errors.Wrap(http.Serve(l, mux), "serve metrics")
}, func(error) {
runutil.CloseWithLogOnErr(logger, l, "metric listener")
})
return nil
}

// scheduleHTTPServer starts a run.Group that servers HTTP endpoint with default endpoints providing Prometheus metrics,
// profiling and liveness/readiness probes.
func scheduleHTTPServer(g *run.Group, logger log.Logger, reg *prometheus.Registry, readinessProber *prober.Prober, httpBindAddr string, handler http.Handler, comp component.Component) error {
Expand Down
172 changes: 88 additions & 84 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,22 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/alecthomas/kingpin.v2"
)

// registerStore registers a store command.
func registerStore(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift and Tencent COS.")
func registerStore(m map[string]setupFunc, app *kingpin.Application) {
cmd := app.Command(component.Store.String(), "store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift and Tencent COS.")

grpcBindAddr, httpBindAddr, cert, key, clientCA := regCommonServerFlags(cmd)

Expand Down Expand Up @@ -54,7 +56,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
minTime, maxTime)
Expand All @@ -75,7 +77,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
uint64(*chunkPoolSize),
uint64(*maxSampleCount),
int(*maxConcurrent),
name,
component.Store,
debugLogging,
*syncInterval,
*blockSyncConcurrency,
Expand Down Expand Up @@ -104,102 +106,104 @@ func runStore(
chunkPoolSizeBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
component string,
component component.Component,
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
filterConf *store.FilterConfig,
) error {
{
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}
statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
FUSAKLA marked this conversation as resolved.
Show resolved Hide resolved

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.String())
if err != nil {
return errors.Wrap(err, "create bucket client")
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component)
// Ensure we close up everything properly.
defer func() {
if err != nil {
return errors.Wrap(err, "create bucket client")
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should remove if err !=nil, as it is run in defer, err will be overwritten by non related code which happens after this line.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say it was meant to close the client if the runStore ended up with an error.
Otherwise it should not close the client since others are still using it.
Maybe that's why the if err != nil is there and the fact err will be overwritten by the following code was the intention?

But I did not touch this code actually, just removed the unneeded code block wrapping it.

}
}()

// Ensure we close up everything properly.
defer func() {
if err != nil {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}
}()
// TODO(bwplotka): Add as a flag?
maxItemSizeBytes := indexCacheSizeBytes / 2

// TODO(bwplotka): Add as a flag?
maxItemSizeBytes := indexCacheSizeBytes / 2
indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{
MaxSizeBytes: indexCacheSizeBytes,
MaxItemSizeBytes: maxItemSizeBytes,
})
if err != nil {
return errors.Wrap(err, "create index cache")
}

indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{
MaxSizeBytes: indexCacheSizeBytes,
MaxItemSizeBytes: maxItemSizeBytes,
})
if err != nil {
return errors.Wrap(err, "create index cache")
}
bs, err := store.NewBucketStore(
logger,
reg,
bkt,
dataDir,
indexCache,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrent,
verbose,
blockSyncConcurrency,
filterConf,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
}

bs, err := store.NewBucketStore(
logger,
reg,
bkt,
dataDir,
indexCache,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrent,
verbose,
blockSyncConcurrency,
filterConf,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
}
begin := time.Now()
level.Debug(logger).Log("msg", "initializing bucket store")
if err := bs.InitialSync(context.Background()); err != nil {
return errors.Wrap(err, "bucket store initial sync")
}
level.Debug(logger).Log("msg", "bucket store ready", "init_duration", time.Since(begin).String())

begin := time.Now()
level.Debug(logger).Log("msg", "initializing bucket store")
if err := bs.InitialSync(context.Background()); err != nil {
return errors.Wrap(err, "bucket store initial sync")
}
level.Debug(logger).Log("msg", "bucket store ready", "init_duration", time.Since(begin).String())

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

err := runutil.Repeat(syncInterval, ctx.Done(), func() error {
if err := bs.SyncBlocks(ctx); err != nil {
level.Warn(logger).Log("msg", "syncing blocks failed", "err", err)
}
return nil
})

runutil.CloseWithLogOnErr(logger, bs, "bucket store")
return err
}, func(error) {
cancel()
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit confusing, so we were closing bkt earlier, why are we closing here again?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented here #1460 (comment)


err := runutil.Repeat(syncInterval, ctx.Done(), func() error {
if err := bs.SyncBlocks(ctx); err != nil {
level.Warn(logger).Log("msg", "syncing blocks failed", "err", err)
}
return nil
})

l, err := net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}
runutil.CloseWithLogOnErr(logger, bs, "bucket store")
return err
}, func(error) {
cancel()
})

opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "grpc server options")
}
s := newStoreGRPCServer(logger, reg, tracer, bs, opts)
l, err := net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
s.Stop()
})
opts, err := defaultGRPCServerOpts(logger, cert, key, clientCA)
if err != nil {
return errors.Wrap(err, "grpc server options")
}
if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil {
return err
s := newStoreGRPCServer(logger, reg, tracer, bs, opts)

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
statusProber.SetReady()
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
s.Stop()
})

if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, component); err != nil {
return errors.Wrap(err, "schedule HTTP server")
}

level.Info(logger).Log("msg", "starting store node")
Expand Down
8 changes: 8 additions & 0 deletions tutorials/kubernetes-demo/manifests/thanos-store-gateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ spec:
containerPort: 10902
- name: grpc
containerPort: 10901
livenessProbe:
httpGet:
port: 10902
path: /-/healthy
readinessProbe:
httpGet:
port: 10902
path: /-/ready
resources:
limits:
cpu: "1"
Expand Down