Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in-memory storage support for adaptive sampling #3335

Merged
merged 10 commits into from
Oct 25, 2021
2 changes: 1 addition & 1 deletion plugin/sampling/strategystore/adaptive/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.S
if err != nil {
return err
}
f.store, err = ssFactory.CreateSamplingStore()
f.store, err = ssFactory.CreateSamplingStore(f.options.AggregationBuckets)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/sampling/strategystore/adaptive/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (m *mockSamplingStoreFactory) CreateLock() (distributedlock.Lock, error) {

return mockLock, nil
}
func (m *mockSamplingStoreFactory) CreateSamplingStore() (samplingstore.Store, error) {
func (m *mockSamplingStoreFactory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) {
if m.storeFailsWith != nil {
return nil, m.storeFailsWith
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/sampling/strategystore/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,6 @@ type mockSamplingStoreFactory struct{}
func (m *mockSamplingStoreFactory) CreateLock() (distributedlock.Lock, error) {
return nil, nil
}
func (m *mockSamplingStoreFactory) CreateSamplingStore() (samplingstore.Store, error) {
func (m *mockSamplingStoreFactory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) {
return nil, nil
}
2 changes: 1 addition & 1 deletion plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (f *Factory) CreateLock() (distributedlock.Lock, error) {
}

// CreateSamplingStore implements storage.SamplingStoreFactory
func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) {
func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) {
return cSamplingStore.New(f.primarySession, f.primaryMetricsFactory, f.logger), nil
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestCassandraFactory(t *testing.T) {
_, err = f.CreateLock()
assert.NoError(t, err)

_, err = f.CreateSamplingStore()
_, err = f.CreateSamplingStore(0)
assert.NoError(t, err)

assert.NoError(t, f.Close())
Expand Down
12 changes: 12 additions & 0 deletions plugin/storage/memory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand Down Expand Up @@ -79,6 +81,16 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return f.store, nil
}

// CreateSamplingStore implements storage.SamplingStoreFactory
func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) {
return NewSamplingStore(maxBuckets), nil
}

// CreateLock implements storage.SamplingStoreFactory
func (f *Factory) CreateLock() (distributedlock.Lock, error) {
return &lock{}, nil
}

func (f *Factory) publishOpts() {
internalFactory := f.metricsFactory.Namespace(metrics.NSOptions{Name: "internal"})
internalFactory.Gauge(metrics.Options{Name: limit}).
Expand Down
6 changes: 6 additions & 0 deletions plugin/storage/memory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ func TestMemoryStorageFactory(t *testing.T) {
depReader, err := f.CreateDependencyReader()
assert.NoError(t, err)
assert.Equal(t, f.store, depReader)
samplingStore, err := f.CreateSamplingStore(2)
assert.NoError(t, err)
assert.Equal(t, 2, samplingStore.(*SamplingStore).maxBuckets)
lock, err := f.CreateLock()
assert.NoError(t, err)
assert.NotNil(t, lock)
}

func TestWithConfiguration(t *testing.T) {
Expand Down
29 changes: 29 additions & 0 deletions plugin/storage/memory/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import "time"

type lock struct{}

// Acquire always returns true for memory storage because it's a single-node
func (l *lock) Acquire(resource string, ttl time.Duration) (bool, error) {
return true, nil
}

// Forfeit always returns true for memory storage
func (l *lock) Forfeit(resource string) (bool, error) {
return true, nil
}
36 changes: 36 additions & 0 deletions plugin/storage/memory/lock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
"testing"
"time"

"github.com/crossdock/crossdock-go/assert"
)

func TestAcquire(t *testing.T) {
l := &lock{}
ok, err := l.Acquire("resource", time.Duration(1))
assert.True(t, ok)
assert.NoError(t, err)
}

func TestForfeit(t *testing.T) {
l := &lock{}
ok, err := l.Forfeit("resource")
assert.True(t, ok)
assert.NoError(t, err)
}
98 changes: 98 additions & 0 deletions plugin/storage/memory/sampling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
"sync"
"time"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
)

// SamplingStroe is an in-memory store for sampling data
type SamplingStore struct {
sync.RWMutex
throughputs []*storedThroughput
probabilitiesAndQPS *storedServiceOperationProbabilitiesAndQPS
maxBuckets int
}

type storedThroughput struct {
throughput []*model.Throughput
time time.Time
}

type storedServiceOperationProbabilitiesAndQPS struct {
hostname string
probabilities model.ServiceOperationProbabilities
qps model.ServiceOperationQPS
time time.Time
}

// NewSamplingStore creates an in-memory sampling store.
func NewSamplingStore(maxBuckets int) *SamplingStore {
return &SamplingStore{maxBuckets: maxBuckets}
}

// InsertThroughput implements samplingstore.Store#InsertThroughput.
func (ss *SamplingStore) InsertThroughput(throughput []*model.Throughput) error {
ss.Lock()
defer ss.Unlock()
now := time.Now()
ss.preprendThroughput(&storedThroughput{throughput, now})
return nil
}

// GetThroughput implements samplingstore.Store#GetThroughput.
func (ss *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput, error) {
ss.Lock()
defer ss.Unlock()
var retSlice []*model.Throughput
for _, t := range ss.throughputs {
if t.time.After(start) && (t.time.Before(end) || t.time.Equal(end)) {
retSlice = append(retSlice, t.throughput...)
}
}
return retSlice, nil
}

// InsertProbabilitiesAndQPS implements samplingstore.Store#InsertProbabilitiesAndQPS.
func (ss *SamplingStore) InsertProbabilitiesAndQPS(
hostname string,
probabilities model.ServiceOperationProbabilities,
qps model.ServiceOperationQPS,
) error {
ss.Lock()
defer ss.Unlock()
ss.probabilitiesAndQPS = &storedServiceOperationProbabilitiesAndQPS{hostname, probabilities, qps, time.Now()}
return nil
}

// GetLatestProbabilities implements samplingstore.Store#GetLatestProbabilities.
func (ss *SamplingStore) GetLatestProbabilities() (model.ServiceOperationProbabilities, error) {
ss.Lock()
defer ss.Unlock()
if ss.probabilitiesAndQPS != nil {
return ss.probabilitiesAndQPS.probabilities, nil
}
return model.ServiceOperationProbabilities{}, nil
}

func (ss *SamplingStore) preprendThroughput(throughput *storedThroughput) {
ss.throughputs = append([]*storedThroughput{throughput}, ss.throughputs...)
if len(ss.throughputs) > ss.maxBuckets {
ss.throughputs = ss.throughputs[0:ss.maxBuckets]
}
}
107 changes: 107 additions & 0 deletions plugin/storage/memory/sampling_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
"fmt"
"testing"
"time"

"github.com/crossdock/crossdock-go/assert"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
)

func withPopulatedSamplingStore(f func(samplingStore *SamplingStore)) {
now := time.Now()
millisAfter := now.Add(time.Millisecond * time.Duration(100))
secondsAfter := now.Add(time.Second * time.Duration(2))
throughputs := []*storedThroughput{
{[]*model.Throughput{{Service: "svc-1", Operation: "op-1", Count: 1}}, now},
{[]*model.Throughput{{Service: "svc-1", Operation: "op-2", Count: 1}}, millisAfter},
{[]*model.Throughput{{Service: "svc-2", Operation: "op-3", Count: 1}}, secondsAfter},
}
pQPS := &storedServiceOperationProbabilitiesAndQPS{
hostname: "guntur38ab8928", probabilities: model.ServiceOperationProbabilities{"svc-1": {"op-1": 0.01}}, qps: model.ServiceOperationQPS{"svc-1": {"op-1": 10.0}}, time: now}
samplingStore := &SamplingStore{throughputs: throughputs, probabilitiesAndQPS: pQPS}
f(samplingStore)
}

func withMemorySamplingStore(f func(samplingStore *SamplingStore)) {
f(NewSamplingStore(5))
}

func TestInsertThroughtput(t *testing.T) {
withMemorySamplingStore(func(samplingStore *SamplingStore) {
start := time.Now()
throughputs := []*model.Throughput{
{Service: "my-svc", Operation: "op"},
{Service: "our-svc", Operation: "op2"},
}
assert.NoError(t, samplingStore.InsertThroughput(throughputs))
ret, _ := samplingStore.GetThroughput(start, start.Add(time.Second*time.Duration(1)))
assert.Equal(t, 2, len(ret))

for i := 0; i < 10; i++ {
in := []*model.Throughput{
{Service: fmt.Sprint("svc-", i), Operation: fmt.Sprint("op-", i)},
}
samplingStore.InsertThroughput(in)
}
assert.Equal(t, 5, len(samplingStore.throughputs))
})
}

func TestGetThroughput(t *testing.T) {
withPopulatedSamplingStore(func(samplingStore *SamplingStore) {
start := time.Now()
ret, err := samplingStore.GetThroughput(start, start.Add(time.Second*time.Duration(1)))
assert.NoError(t, err)
assert.Equal(t, 1, len(ret))
ret1, _ := samplingStore.GetThroughput(start, start)
assert.Equal(t, 0, len(ret1))
ret2, _ := samplingStore.GetThroughput(start, start.Add(time.Hour*time.Duration(1)))
assert.Equal(t, 2, len(ret2))
})
}

func TestInsertProbabilitiesAndQPS(t *testing.T) {
withMemorySamplingStore(func(samplingStore *SamplingStore) {
assert.NoError(t, samplingStore.InsertProbabilitiesAndQPS("dell11eg843d", model.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, model.ServiceOperationQPS{"new-srv": {"op": 4}}))
assert.NotEmpty(t, 1, samplingStore.probabilitiesAndQPS)
// Only latest one is kept in memory
assert.NoError(t, samplingStore.InsertProbabilitiesAndQPS("lncol73", model.ServiceOperationProbabilities{"my-app": {"hello": 0.3}}, model.ServiceOperationQPS{"new-srv": {"op": 7}}))
assert.Equal(t, 0.3, samplingStore.probabilitiesAndQPS.probabilities["my-app"]["hello"])
})
}

func TestGetLatestProbability(t *testing.T) {
withMemorySamplingStore(func(samplingStore *SamplingStore) {
// No priod data
ret, err := samplingStore.GetLatestProbabilities()
assert.NoError(t, err)
assert.Empty(t, ret)
})

withPopulatedSamplingStore(func(samplingStore *SamplingStore) {
// With some pregenerated data
ret, err := samplingStore.GetLatestProbabilities()
assert.NoError(t, err)
assert.Equal(t, ret, model.ServiceOperationProbabilities{"svc-1": {"op-1": 0.01}})
assert.NoError(t, samplingStore.InsertProbabilitiesAndQPS("utfhyolf", model.ServiceOperationProbabilities{"another-service": {"hello": 0.009}}, model.ServiceOperationQPS{"new-srv": {"op": 5}}))
ret, _ = samplingStore.GetLatestProbabilities()
assert.NotEqual(t, ret, model.ServiceOperationProbabilities{"svc-1": {"op-1": 0.01}})
})
}
2 changes: 1 addition & 1 deletion storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type SamplingStoreFactory interface {
// CreateLock creates a distributed lock.
CreateLock() (distributedlock.Lock, error)
// CreateSamplingStore creates a sampling store.
CreateSamplingStore() (samplingstore.Store, error)
CreateSamplingStore(maxBuckets int) (samplingstore.Store, error)
}

var (
Expand Down