Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache label strings in ingester to improve memory usage. #2926

Merged
merged 3 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

cortex_distributor "github.com/cortexproject/cortex/pkg/distributor"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
cortex_util "github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -205,7 +206,15 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validatedSamplesCount := 0

for _, stream := range req.Streams {
if err := d.validator.ValidateLabels(userID, stream); err != nil {
ls, err := util.ToClientLabels(stream.Labels)
if err != nil {
validationErr = httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: %v", err)
continue
}
// ensure labels are correctly sorted.
// todo(ctovena) we should lru cache this
stream.Labels = cortex_client.FromLabelAdaptersToLabels(ls).String()
if err := d.validator.ValidateLabels(userID, ls, stream); err != nil {
validationErr = err
continue
}
Expand Down
31 changes: 26 additions & 5 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestDistributor(t *testing.T) {
limits.IngestionBurstSizeMB = ingestionRateLimit
limits.MaxLineSize = fe.ByteSize(tc.maxLineSize)

d := prepare(t, limits, nil)
d := prepare(t, limits, nil, nil)
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck

request := makeWriteRequest(tc.lines, 10)
Expand All @@ -97,6 +97,21 @@ func TestDistributor(t *testing.T) {
}
}

func Test_SortLabelsOnPush(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.EnforceMetricName = false
ingester := &mockIngester{}
d := prepare(t, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck

request := makeWriteRequest(10, 10)
request.Streams[0].Labels = `{buzz="f", a="b"}`
_, err := d.Push(ctx, request)
require.NoError(t, err)
require.Equal(t, `{a="b", buzz="f"}`, ingester.pushed[0].Streams[0].Labels)
}

func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
type testPush struct {
bytes int
Expand Down Expand Up @@ -165,7 +180,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
// Start all expected distributors
distributors := make([]*Distributor, testData.distributors)
for i := 0; i < testData.distributors; i++ {
distributors[i] = prepare(t, limits, kvStore)
distributors[i] = prepare(t, limits, kvStore, nil)
defer services.StopAndAwaitTerminated(context.Background(), distributors[i]) //nolint:errcheck
}

Expand Down Expand Up @@ -211,7 +226,7 @@ func loopbackInterfaceName() (string, error) {
return "", fmt.Errorf("can't retrieve loopback interface name")
}

func prepare(t *testing.T, limits *validation.Limits, kvStore kv.Client) *Distributor {
func prepare(t *testing.T, limits *validation.Limits, kvStore kv.Client, factory func(addr string) (ring_client.PoolClient, error)) *Distributor {
var (
distributorConfig Config
clientConfig client.Config
Expand Down Expand Up @@ -243,8 +258,11 @@ func prepare(t *testing.T, limits *validation.Limits, kvStore kv.Client) *Distri
distributorConfig.DistributorRing.InstanceID = strconv.Itoa(rand.Int())
distributorConfig.DistributorRing.KVStore.Mock = kvStore
distributorConfig.DistributorRing.InstanceInterfaceNames = []string{loopbackName}
distributorConfig.factory = func(addr string) (ring_client.PoolClient, error) {
return ingesters[addr], nil
distributorConfig.factory = factory
if factory == nil {
distributorConfig.factory = func(addr string) (ring_client.PoolClient, error) {
return ingesters[addr], nil
}
}

d, err := New(distributorConfig, clientConfig, ingestersRing, overrides, nil)
Expand Down Expand Up @@ -279,9 +297,12 @@ func makeWriteRequest(lines int, size int) *logproto.PushRequest {
type mockIngester struct {
grpc_health_v1.HealthClient
logproto.PusherClient

pushed []*logproto.PushRequest
}

func (i *mockIngester) Push(ctx context.Context, in *logproto.PushRequest, opts ...grpc.CallOption) (*logproto.PushResponse, error) {
i.pushed = append(i.pushed, in)
return nil, nil
}

Expand Down
12 changes: 1 addition & 11 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -53,16 +52,7 @@ func (v Validator) ValidateEntry(userID string, labels string, entry logproto.En
}

// Validate labels returns an error if the labels are invalid
func (v Validator) ValidateLabels(userID string, stream logproto.Stream) error {
ls, err := util.ToClientLabels(stream.Labels)
if err != nil {
// I wish we didn't return httpgrpc errors here as it seems
// an orthogonal concept (we need not use ValidateLabels in this context)
// but the upstream cortex_validation pkg uses it, so we keep this
// for parity.
return httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: %v", err)
}

func (v Validator) ValidateLabels(userID string, ls []cortex_client.LabelAdapter, stream logproto.Stream) error {
numLabelNames := len(ls)
if numLabelNames > v.MaxLabelNamesPerSeries(userID) {
validation.DiscardedSamples.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Inc()
Expand Down
12 changes: 11 additions & 1 deletion pkg/distributor/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/stretchr/testify/assert"
"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -149,8 +151,16 @@ func TestValidator_ValidateLabels(t *testing.T) {
v, err := NewValidator(o)
assert.NoError(t, err)

err = v.ValidateLabels(tt.userID, logproto.Stream{Labels: tt.labels})
err = v.ValidateLabels(tt.userID, mustParseLabels(tt.labels), logproto.Stream{Labels: tt.labels})
assert.Equal(t, tt.expected, err)
})
}
}

func mustParseLabels(s string) []client.LabelAdapter {
labels, err := util.ToClientLabels(s)
if err != nil {
panic(err)
}
return labels
}
5 changes: 3 additions & 2 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint
instance.streamsMtx.Lock()
defer instance.streamsMtx.Unlock()

stream, ok := instance.streams[fp]
stream, ok := instance.streamsByFP[fp]
if !ok {
return nil, nil
}
Expand Down Expand Up @@ -300,7 +300,8 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {
memoryChunks.Sub(float64(prevNumChunks - len(stream.chunks)))

if len(stream.chunks) == 0 {
delete(instance.streams, stream.fp)
delete(instance.streamsByFP, stream.fp)
delete(instance.streams, stream.labelsString)
instance.index.Delete(stream.labels, stream.fp)
instance.streamsRemovedTotal.Inc()
memoryStreams.WithLabelValues(instance.instanceID).Dec()
Expand Down
59 changes: 36 additions & 23 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,13 @@ var (
type instance struct {
cfg *Config
streamsMtx sync.RWMutex
streams map[model.Fingerprint]*stream // we use 'mapped' fingerprints here.
index *index.InvertedIndex
mapper *fpMapper // using of mapper needs streamsMtx because it calls back

buf []byte // buffer used to compute fps.
streams map[string]*stream
streamsByFP map[model.Fingerprint]*stream

index *index.InvertedIndex
mapper *fpMapper // using of mapper needs streamsMtx because it calls back

instanceID string

Expand All @@ -81,10 +85,12 @@ type instance struct {

func newInstance(cfg *Config, instanceID string, factory func() chunkenc.Chunk, limiter *Limiter, syncPeriod time.Duration, syncMinUtil float64) *instance {
i := &instance{
cfg: cfg,
streams: map[model.Fingerprint]*stream{},
index: index.New(),
instanceID: instanceID,
cfg: cfg,
streams: map[string]*stream{},
streamsByFP: map[model.Fingerprint]*stream{},
buf: make([]byte, 0, 1024),
index: index.New(),
instanceID: instanceID,

streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),
Expand All @@ -106,14 +112,14 @@ func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapte
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()

rawFp := client.FastFingerprint(labels)
fp := i.mapper.mapFP(rawFp, labels)
fp := i.getHashForLabels(labels)

stream, ok := i.streams[fp]
stream, ok := i.streamsByFP[fp]
if !ok {
sortedLabels := i.index.Add(labels, fp)
stream = newStream(i.cfg, fp, sortedLabels, i.factory)
i.streams[fp] = stream
i.streamsByFP[fp] = stream
i.streams[stream.labelsString] = stream
i.streamsCreatedTotal.Inc()
memoryStreams.WithLabelValues(i.instanceID).Inc()
i.addTailersToNewStream(stream)
Expand Down Expand Up @@ -153,19 +159,12 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
}

func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, error) {
labels, err := util.ToClientLabels(pushReqStream.Labels)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
rawFp := client.FastFingerprint(labels)
fp := i.mapper.mapFP(rawFp, labels)

stream, ok := i.streams[fp]
stream, ok := i.streams[pushReqStream.Labels]
if ok {
return stream, nil
}

err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))
err := i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))
if err != nil {
validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
bytes := 0
Expand All @@ -176,19 +175,33 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, er
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg())
}

labels, err := util.ToClientLabels(pushReqStream.Labels)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
fp := i.getHashForLabels(labels)
sortedLabels := i.index.Add(labels, fp)
stream = newStream(i.cfg, fp, sortedLabels, i.factory)
i.streams[fp] = stream
i.streams[pushReqStream.Labels] = stream
i.streamsByFP[fp] = stream

memoryStreams.WithLabelValues(i.instanceID).Inc()
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(stream)

return stream, nil
}

func (i *instance) getHashForLabels(labels []client.LabelAdapter) model.Fingerprint {
var fp uint64
lbsModel := client.FromLabelAdaptersToLabels(labels)
fp, i.buf = lbsModel.HashWithoutLabels(i.buf, []string(nil)...)
return i.mapper.mapFP(model.Fingerprint(fp), labels)
sandeepsukhani marked this conversation as resolved.
Show resolved Hide resolved
}

// Return labels associated with given fingerprint. Used by fingerprint mapper. Must hold streamsMtx.
func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels {
s := i.streams[fp]
s := i.streamsByFP[fp]
if s == nil {
return nil
}
Expand Down Expand Up @@ -361,7 +374,7 @@ func (i *instance) forMatchingStreams(

outer:
for _, streamID := range ids {
stream, ok := i.streams[streamID]
stream, ok := i.streamsByFP[streamID]
if !ok {
return ErrStreamMissing
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,43 @@ func makeRandomLabels() string {
}
return ls.Labels().String()
}

func Benchmark_PushInstance(b *testing.B) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
require.NoError(b, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

i := newInstance(&Config{}, "test", defaultFactory, limiter, 0, 0)
ctx := context.Background()

for n := 0; n < b.N; n++ {
_ = i.Push(ctx, &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{cpu="10",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"}`,
Entries: []logproto.Entry{
{Timestamp: time.Now(), Line: "1"},
{Timestamp: time.Now(), Line: "2"},
{Timestamp: time.Now(), Line: "3"},
},
},
{
Labels: `{cpu="35",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"}`,
Entries: []logproto.Entry{
{Timestamp: time.Now(), Line: "1"},
{Timestamp: time.Now(), Line: "2"},
{Timestamp: time.Now(), Line: "3"},
},
},
{
Labels: `{cpu="89",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"}`,
Entries: []logproto.Entry{
{Timestamp: time.Now(), Line: "1"},
{Timestamp: time.Now(), Line: "2"},
{Timestamp: time.Now(), Line: "3"},
},
},
},
})
}
}