Skip to content

Commit

Permalink
Convert Loki modules to services (#1804)
Browse files Browse the repository at this point in the history
* Loki now uses module services to start and stop its work.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Use services methods to initialize some components.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Use Cortex' NewModuleService.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Converted server to a service.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Converted distributor to service.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Use table manager service

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* querier service

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* query-frontend service

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Merged stopping method into shutdown

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Converted ingester to a service.

It now starts all background tasks in Starting state.
Stopping needs little work, as does reacting on lifecycler errors.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Loki

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* If lifecycler fails, ingester fails too.

It now doesn't call os.Exit, but shuts down gracefully and enters Failed state.
That triggers Loki to shutdown completely.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Ignore ErrStopProcess errors from services

This is a signal that Loki should stop.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Use single /ready handler

It checks the state of all services, and asks ingester for its own check as well.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Removed unused value.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Lint

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Fix test.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Go mod tidy, vendor

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Tailers, not trailers.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Adds return for the healtcheck in case of error.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
pstibrany and cyriltovena authored Apr 23, 2020
1 parent 81b3676 commit e60164b
Show file tree
Hide file tree
Showing 1,040 changed files with 228,786 additions and 331 deletions.
15 changes: 3 additions & 12 deletions cmd/loki/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,10 @@ func main() {

// Start Loki
t, err := loki.New(config)
if err != nil {
level.Error(util.Logger).Log("msg", "error initialising loki", "err", err)
os.Exit(1)
}
util.CheckFatal("initialising loki", err)

level.Info(util.Logger).Log("msg", "Starting Loki", "version", version.Info())

if err := t.Run(); err != nil {
level.Error(util.Logger).Log("msg", "error running loki", "err", err)
}

if err := t.Stop(); err != nil {
level.Error(util.Logger).Log("msg", "error stopping loki", "err", err)
os.Exit(1)
}
err = t.Run()
util.CheckFatal("running loki", err)
}
47 changes: 47 additions & 0 deletions go.sum

Large diffs are not rendered by default.

65 changes: 29 additions & 36 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"flag"
"net/http"
"os"
"sync/atomic"
"time"

Expand All @@ -16,7 +15,6 @@ import (
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/pkg/errors"

"github.com/go-kit/kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -34,7 +32,6 @@ const (
metricName = "logs"
)

var readinessProbeSuccess = []byte("Ready")
var (
ingesterAppends = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Expand Down Expand Up @@ -75,6 +72,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

// Distributor coordinates replicates and distribution of log streams.
type Distributor struct {
services.Service

cfg Config
clientCfg client.Config
ingestersRing ring.ReadRing
Expand All @@ -85,6 +84,9 @@ type Distributor struct {
// the number of healthy instances.
distributorsRing *ring.Lifecycler

subservices *services.Manager
subservicesWatcher *services.FailureWatcher

// Per-user rate limiter.
ingestionRateLimiter *limiter.RateLimiter
}
Expand All @@ -107,25 +109,16 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr
var ingestionRateStrategy limiter.RateLimiterStrategy
var distributorsRing *ring.Lifecycler

var servs []services.Service

if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
var err error
distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey, false)
if err != nil {
return nil, err
}

distributorsRing.AddListener(services.NewListener(nil, nil, nil, nil, func(_ services.State, failure error) {
// lifecycler used to do os.Exit(1) on its own failure, but now it just goes into Failed state.
// for now we just simulate old behaviour here. When Distributor itself becomes a service, it will enter Failed state as well.
level.Error(cortex_util.Logger).Log("msg", "lifecycler failed", "err", err)
os.Exit(1)
}))

err = services.StartAndAwaitRunning(context.Background(), distributorsRing)
if err != nil {
return nil, err
}

servs = append(servs, distributorsRing)
ingestionRateStrategy = newGlobalIngestionRateStrategy(overrides, distributorsRing)
} else {
ingestionRateStrategy = newLocalIngestionRateStrategy(overrides)
Expand All @@ -141,18 +134,33 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
}

if err := services.StartAndAwaitRunning(context.Background(), d.pool); err != nil {
return nil, errors.Wrap(err, "starting client pool")
servs = append(servs, d.pool)
d.subservices, err = services.NewManager(servs...)
if err != nil {
return nil, errors.Wrap(err, "services manager")
}
d.subservicesWatcher = services.NewFailureWatcher()
d.subservicesWatcher.WatchManager(d.subservices)
d.Service = services.NewBasicService(d.starting, d.running, d.stopping)

return &d, nil
}

func (d *Distributor) Stop() {
if d.distributorsRing != nil {
_ = services.StopAndAwaitTerminated(context.Background(), d.distributorsRing)
func (d *Distributor) starting(ctx context.Context) error {
return services.StartManagerAndAwaitHealthy(ctx, d.subservices)
}

func (d *Distributor) running(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case err := <-d.subservicesWatcher.Chan():
return errors.Wrap(err, "distributor subservice failed")
}
_ = services.StopAndAwaitTerminated(context.Background(), d.pool)
}

func (d *Distributor) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), d.subservices)
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
Expand All @@ -172,21 +180,6 @@ type pushTracker struct {
err chan error
}

// ReadinessHandler is used to indicate to k8s when the distributor is ready.
// Returns 200 when the distributor is ready, 500 otherwise.
func (d *Distributor) ReadinessHandler(w http.ResponseWriter, r *http.Request) {
_, err := d.ingestersRing.GetAll()
if err != nil {
http.Error(w, "Not ready: "+err.Error(), http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
if _, err := w.Write(readinessProbeSuccess); err != nil {
level.Error(cortex_util.Logger).Log("msg", "error writing success message", "error", err)
}
}

// Push a set of streams.
func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
userID, err := user.ExtractOrgID(ctx)
Expand Down
5 changes: 4 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/test"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -80,6 +81,7 @@ func TestDistributor(t *testing.T) {
limits.MaxLineSize = fe.ByteSize(tc.maxLineSize)

d := prepare(t, limits, nil)
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck

request := makeWriteRequest(tc.lines, 10)

Expand Down Expand Up @@ -163,7 +165,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
distributors := make([]*Distributor, testData.distributors)
for i := 0; i < testData.distributors; i++ {
distributors[i] = prepare(t, limits, kvStore)
defer distributors[i].Stop()
defer services.StopAndAwaitTerminated(context.Background(), distributors[i]) //nolint:errcheck
}

// If the distributors ring is setup, wait until the first distributor
Expand Down Expand Up @@ -226,6 +228,7 @@ func prepare(t *testing.T, limits *validation.Limits, kvStore kv.Client) *Distri

d, err := New(distributorConfig, clientConfig, ingestersRing, overrides)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))

return d
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/client"
Expand Down Expand Up @@ -43,6 +44,7 @@ func TestChunkFlushingIdle(t *testing.T) {
cfg.RetainPeriod = 500 * time.Millisecond

store, ing := newTestStore(t, cfg)
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
testData := pushTestSamples(t, ing)

// wait beyond idle time so samples flush
Expand All @@ -53,7 +55,7 @@ func TestChunkFlushingIdle(t *testing.T) {
func TestChunkFlushingShutdown(t *testing.T) {
store, ing := newTestStore(t, defaultIngesterTestConfig(t))
testData := pushTestSamples(t, ing)
ing.Shutdown()
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
store.checkData(t, testData)
}

Expand Down Expand Up @@ -90,7 +92,7 @@ func TestFlushingCollidingLabels(t *testing.T) {
require.NoError(t, err)

// force flush
ing.Shutdown()
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))

// verify that we get all the data back
store.checkData(t, map[string][]*logproto.Stream{userID: req.Streams})
Expand Down Expand Up @@ -154,6 +156,7 @@ func TestFlushMaxAge(t *testing.T) {
},
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
}

type testStore struct {
Expand All @@ -172,6 +175,7 @@ func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) {

ing, err := New(cfg, client.Config{}, store, limits)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))

return store, ing
}
Expand Down
Loading

0 comments on commit e60164b

Please sign in to comment.