From b3a413eff314f5349c8408aede4c6aa99e384edf Mon Sep 17 00:00:00 2001 From: slayer321 Date: Wed, 11 Oct 2023 20:12:39 +0530 Subject: [PATCH 1/7] feat: Implement badger db for sampling store Signed-off-by: slayer321 --- plugin/storage/badger/factory.go | 7 + .../storage/badger/samplingstore/storage.go | 176 ++++++++++++++++++ .../badger/samplingstore/storage_test.go | 68 +++++++ .../storage/integration/badgerstore_test.go | 3 + plugin/storage/integration/integration.go | 2 +- 5 files changed, 255 insertions(+), 1 deletion(-) create mode 100644 plugin/storage/badger/samplingstore/storage.go create mode 100644 plugin/storage/badger/samplingstore/storage_test.go diff --git a/plugin/storage/badger/factory.go b/plugin/storage/badger/factory.go index b5665e0b176..f8b52168a37 100644 --- a/plugin/storage/badger/factory.go +++ b/plugin/storage/badger/factory.go @@ -29,8 +29,10 @@ import ( "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin" depStore "github.com/jaegertracing/jaeger/plugin/storage/badger/dependencystore" + samplingStore "github.com/jaegertracing/jaeger/plugin/storage/badger/samplingstore" badgerStore "github.com/jaegertracing/jaeger/plugin/storage/badger/spanstore" "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -170,6 +172,11 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { return depStore.NewDependencyStore(sr), nil } +// CreateSamplingStore implements storage.SamplingStoreFactory +func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) { + return samplingStore.NewSamplingStore(f.store), nil +} + // Close Implements io.Closer and closes the underlying storage func (f *Factory) Close() error { close(f.maintenanceDone) diff --git a/plugin/storage/badger/samplingstore/storage.go b/plugin/storage/badger/samplingstore/storage.go new file mode 100644 index 00000000000..7b220e12923 --- /dev/null +++ b/plugin/storage/badger/samplingstore/storage.go @@ -0,0 +1,176 @@ +package samplingstore + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "fmt" + "time" + + "github.com/dgraph-io/badger/v3" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" + jaegermodel "github.com/jaegertracing/jaeger/model" +) + +const ( + throughputKeyPrefix byte = 0x08 + probabilitiesKeyPrefix byte = 0x09 +) + +type SamplingStore struct { + store *badger.DB +} + +func NewSamplingStore(db *badger.DB) *SamplingStore { + return &SamplingStore{ + store: db, + } +} + +func (s *SamplingStore) InsertThroughput(throughput []*model.Throughput) error { + fmt.Println("Inside badger samplingstore InsertThroughput") + startTime := jaegermodel.TimeAsEpochMicroseconds(time.Now()) + entriesToStore := make([]*badger.Entry, 0) + entries, err := s.createThroughputEntry(throughput, startTime) + if err != nil { + return err + } + entriesToStore = append(entriesToStore, entries) + err = s.store.Update(func(txn *badger.Txn) error { + // Write the entries + for i := range entriesToStore { + err = txn.SetEntry(entriesToStore[i]) + fmt.Println("Writing entry to badger") + if err != nil { + // Most likely primary key conflict, but let the caller check this + return err + } + } + + return nil + }) + + return nil +} + +func (s *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput, error) { + var retSlice []*model.Throughput + fmt.Println("Inside badger samplingstore GetThroughput") + + prefix := []byte{throughputKeyPrefix} + + err := s.store.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + it := txn.NewIterator(opts) + defer it.Close() + + val := []byte{} + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + k := item.Key() + startTime := k[1:9] + fmt.Printf("key=%s\n", k) + val, err := item.ValueCopy(val) + if err != nil { + return err + } + t, err := initalStartTime(startTime) + if err != nil { + return err + } + throughputs, err := decodeValue(val) + if err != nil { + return err + } + + if t.After(start) && (t.Before(end) || t.Equal(end)) { + retSlice = append(retSlice, throughputs...) + } + return nil + } + return nil + }) + if err != nil { + return nil, err + } + + return retSlice, nil +} + +func (s *SamplingStore) InsertProbabilitiesAndQPS(hostname string, + probabilities model.ServiceOperationProbabilities, + qps model.ServiceOperationQPS) error { + return nil +} + +// GetLatestProbabilities implements samplingstore.Reader#GetLatestProbabilities. +func (s *SamplingStore) GetLatestProbabilities() (model.ServiceOperationProbabilities, error) { + return nil, nil +} + +func (s *SamplingStore) createThroughputEntry(throughput []*model.Throughput, startTime uint64) (*badger.Entry, error) { + pK, pV, err := s.createThroughputKV(throughput, startTime) + if err != nil { + return nil, err + } + + e := s.createBadgerEntry(pK, pV) + + return e, nil +} + +func (s *SamplingStore) createBadgerEntry(key []byte, value []byte) *badger.Entry { + return &badger.Entry{ + Key: key, + Value: value, + } +} + +func (s *SamplingStore) createThroughputKV(throughput []*model.Throughput, startTime uint64) ([]byte, []byte, error) { + + key := make([]byte, 16) + key[0] = throughputKeyPrefix + pos := 1 + binary.BigEndian.PutUint64(key[pos:], startTime) + + var bb []byte + var err error + + bb, err = json.Marshal(throughput) + fmt.Printf("Badger key %v, value %v\n", key, string(bb)) + return key, bb, err +} + +func createPrimaryKeySeekPrefix(startTime uint64) []byte { + key := make([]byte, 16) + key[0] = throughputKeyPrefix + pos := 1 + binary.BigEndian.PutUint64(key[pos:], startTime) + + return key +} + +func decodeValue(val []byte) ([]*model.Throughput, error) { + var throughput []*model.Throughput + + err := json.Unmarshal(val, &throughput) + if err != nil { + fmt.Println("Error while unmarshalling") + return nil, err + } + fmt.Printf("Throughput %v\n", throughput) + return throughput, nil +} + +func initalStartTime(timeBytes []byte) (time.Time, error) { + var usec int64 + + buf := bytes.NewReader(timeBytes) + + if err := binary.Read(buf, binary.BigEndian, &usec); err != nil { + panic(nil) + } + + t := time.UnixMicro(usec) + return t, nil +} diff --git a/plugin/storage/badger/samplingstore/storage_test.go b/plugin/storage/badger/samplingstore/storage_test.go new file mode 100644 index 00000000000..3553db09b80 --- /dev/null +++ b/plugin/storage/badger/samplingstore/storage_test.go @@ -0,0 +1,68 @@ +package samplingstore + +import ( + "testing" + "time" + + "github.com/dgraph-io/badger/v3" + samplemodel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" + "github.com/stretchr/testify/assert" +) + +type samplingStoreTest struct { + store *SamplingStore +} + +func NewtestSamplingStore(db *badger.DB) *samplingStoreTest { + return &samplingStoreTest{ + store: NewSamplingStore(db), + } +} + +func TestInsertThroughput(t *testing.T) { + runWithBadger(t, func(s *samplingStoreTest, t *testing.T) { + throughputs := []*samplemodel.Throughput{ + {Service: "my-svc", Operation: "op"}, + {Service: "our-svc", Operation: "op2"}, + } + err := s.store.InsertThroughput(throughputs) + assert.NoError(t, err) + }) +} + +func TestGetThroughput(t *testing.T) { + runWithBadger(t, func(s *samplingStoreTest, t *testing.T) { + start := time.Now() + + expected := 2 + throughputs := []*samplemodel.Throughput{ + {Service: "my-svc", Operation: "op"}, + {Service: "our-svc", Operation: "op2"}, + } + err := s.store.InsertThroughput(throughputs) + assert.NoError(t, err) + + actual, err := s.store.GetThroughput(start, start.Add(time.Second*time.Duration(10))) + assert.NoError(t, err) + assert.Equal(t, expected, len(actual)) + }) +} + +func runWithBadger(t *testing.T, test func(s *samplingStoreTest, t *testing.T)) { + opts := badger.DefaultOptions("") + + opts.SyncWrites = false + dir := t.TempDir() + opts.Dir = dir + opts.ValueDir = dir + + store, err := badger.Open(opts) + defer func() { + store.Close() + }() + ss := NewtestSamplingStore(store) + + assert.NoError(t, err) + + test(ss, t) +} diff --git a/plugin/storage/integration/badgerstore_test.go b/plugin/storage/integration/badgerstore_test.go index 778aea2af43..233f4e0b8e4 100644 --- a/plugin/storage/integration/badgerstore_test.go +++ b/plugin/storage/integration/badgerstore_test.go @@ -49,6 +49,9 @@ func (s *BadgerIntegrationStorage) initialize() error { if err != nil { return err } + if s.SamplingStore, err = s.factory.CreateSamplingStore(0); err != nil { + return err + } s.SpanReader = sr s.SpanWriter = sw diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index afb743506c9..6374267c410 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -487,5 +487,5 @@ func (s *StorageIntegration) IntegrationTestAll(t *testing.T) { t.Run("FindTraces", s.testFindTraces) t.Run("GetDependencies", s.testGetDependencies) t.Run("GetThroughput", s.testGetThroughput) - t.Run("GetLatestProbability", s.testGetLatestProbability) + //t.Run("GetLatestProbability", s.testGetLatestProbability) } From 6c72f42dc75fe93ad1712ecacaf44dc6432aa4bd Mon Sep 17 00:00:00 2001 From: slayer321 Date: Thu, 12 Oct 2023 20:32:20 +0530 Subject: [PATCH 2/7] fix lint issue Signed-off-by: slayer321 --- .../storage/badger/samplingstore/storage.go | 29 +++++++++++-------- .../badger/samplingstore/storage_test.go | 17 ++++++++++- plugin/storage/integration/integration.go | 2 +- 3 files changed, 34 insertions(+), 14 deletions(-) diff --git a/plugin/storage/badger/samplingstore/storage.go b/plugin/storage/badger/samplingstore/storage.go index 7b220e12923..0d47bd3a25c 100644 --- a/plugin/storage/badger/samplingstore/storage.go +++ b/plugin/storage/badger/samplingstore/storage.go @@ -1,3 +1,17 @@ +// Copyright (c) 2023 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package samplingstore import ( @@ -8,6 +22,7 @@ import ( "time" "github.com/dgraph-io/badger/v3" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" jaegermodel "github.com/jaegertracing/jaeger/model" ) @@ -86,7 +101,6 @@ func (s *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput if t.After(start) && (t.Before(end) || t.Equal(end)) { retSlice = append(retSlice, throughputs...) } - return nil } return nil }) @@ -99,7 +113,8 @@ func (s *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput func (s *SamplingStore) InsertProbabilitiesAndQPS(hostname string, probabilities model.ServiceOperationProbabilities, - qps model.ServiceOperationQPS) error { + qps model.ServiceOperationQPS, +) error { return nil } @@ -127,7 +142,6 @@ func (s *SamplingStore) createBadgerEntry(key []byte, value []byte) *badger.Entr } func (s *SamplingStore) createThroughputKV(throughput []*model.Throughput, startTime uint64) ([]byte, []byte, error) { - key := make([]byte, 16) key[0] = throughputKeyPrefix pos := 1 @@ -141,15 +155,6 @@ func (s *SamplingStore) createThroughputKV(throughput []*model.Throughput, start return key, bb, err } -func createPrimaryKeySeekPrefix(startTime uint64) []byte { - key := make([]byte, 16) - key[0] = throughputKeyPrefix - pos := 1 - binary.BigEndian.PutUint64(key[pos:], startTime) - - return key -} - func decodeValue(val []byte) ([]*model.Throughput, error) { var throughput []*model.Throughput diff --git a/plugin/storage/badger/samplingstore/storage_test.go b/plugin/storage/badger/samplingstore/storage_test.go index 3553db09b80..f9042edf295 100644 --- a/plugin/storage/badger/samplingstore/storage_test.go +++ b/plugin/storage/badger/samplingstore/storage_test.go @@ -1,3 +1,17 @@ +// Copyright (c) 2023 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package samplingstore import ( @@ -5,8 +19,9 @@ import ( "time" "github.com/dgraph-io/badger/v3" - samplemodel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" "github.com/stretchr/testify/assert" + + samplemodel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" ) type samplingStoreTest struct { diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 6374267c410..afb743506c9 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -487,5 +487,5 @@ func (s *StorageIntegration) IntegrationTestAll(t *testing.T) { t.Run("FindTraces", s.testFindTraces) t.Run("GetDependencies", s.testGetDependencies) t.Run("GetThroughput", s.testGetThroughput) - //t.Run("GetLatestProbability", s.testGetLatestProbability) + t.Run("GetLatestProbability", s.testGetLatestProbability) } From 8900b39b7be0251336b14edb80e70360ce7bca15 Mon Sep 17 00:00:00 2001 From: slayer321 Date: Fri, 13 Oct 2023 18:50:28 +0530 Subject: [PATCH 3/7] implement InsertProbabilitiesAndQPS and GetLatestProbabilities Signed-off-by: slayer321 --- .../storage/badger/samplingstore/storage.go | 102 +++++++++++++++--- .../badger/samplingstore/storage_test.go | 21 ++++ plugin/storage/integration/integration.go | 1 + 3 files changed, 112 insertions(+), 12 deletions(-) diff --git a/plugin/storage/badger/samplingstore/storage.go b/plugin/storage/badger/samplingstore/storage.go index 0d47bd3a25c..c7823ff05b8 100644 --- a/plugin/storage/badger/samplingstore/storage.go +++ b/plugin/storage/badger/samplingstore/storage.go @@ -36,6 +36,12 @@ type SamplingStore struct { store *badger.DB } +type ProbabilitiesAndQPS struct { + Hostname string + Probabilities model.ServiceOperationProbabilities + QPS model.ServiceOperationQPS +} + func NewSamplingStore(db *badger.DB) *SamplingStore { return &SamplingStore{ store: db, @@ -43,7 +49,6 @@ func NewSamplingStore(db *badger.DB) *SamplingStore { } func (s *SamplingStore) InsertThroughput(throughput []*model.Throughput) error { - fmt.Println("Inside badger samplingstore InsertThroughput") startTime := jaegermodel.TimeAsEpochMicroseconds(time.Now()) entriesToStore := make([]*badger.Entry, 0) entries, err := s.createThroughputEntry(throughput, startTime) @@ -52,12 +57,9 @@ func (s *SamplingStore) InsertThroughput(throughput []*model.Throughput) error { } entriesToStore = append(entriesToStore, entries) err = s.store.Update(func(txn *badger.Txn) error { - // Write the entries for i := range entriesToStore { err = txn.SetEntry(entriesToStore[i]) - fmt.Println("Writing entry to badger") if err != nil { - // Most likely primary key conflict, but let the caller check this return err } } @@ -70,8 +72,6 @@ func (s *SamplingStore) InsertThroughput(throughput []*model.Throughput) error { func (s *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput, error) { var retSlice []*model.Throughput - fmt.Println("Inside badger samplingstore GetThroughput") - prefix := []byte{throughputKeyPrefix} err := s.store.View(func(txn *badger.Txn) error { @@ -93,7 +93,7 @@ func (s *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput if err != nil { return err } - throughputs, err := decodeValue(val) + throughputs, err := decodeThroughtputValue(val) if err != nil { return err } @@ -115,12 +115,83 @@ func (s *SamplingStore) InsertProbabilitiesAndQPS(hostname string, probabilities model.ServiceOperationProbabilities, qps model.ServiceOperationQPS, ) error { + startTime := jaegermodel.TimeAsEpochMicroseconds(time.Now()) + entriesToStore := make([]*badger.Entry, 0) + entries, err := s.createProbabilitiesEntry(hostname, probabilities, qps, startTime) + if err != nil { + return err + } + entriesToStore = append(entriesToStore, entries) + err = s.store.Update(func(txn *badger.Txn) error { + // Write the entries + for i := range entriesToStore { + err = txn.SetEntry(entriesToStore[i]) + if err != nil { + return err + } + } + + return nil + }) + return nil } // GetLatestProbabilities implements samplingstore.Reader#GetLatestProbabilities. func (s *SamplingStore) GetLatestProbabilities() (model.ServiceOperationProbabilities, error) { - return nil, nil + var retVal model.ServiceOperationProbabilities + var unMarshalProbabilities ProbabilitiesAndQPS + prefix := []byte{probabilitiesKeyPrefix} + + err := s.store.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + it := txn.NewIterator(opts) + defer it.Close() + + val := []byte{} + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + val, err := item.ValueCopy(val) + if err != nil { + return err + } + unMarshalProbabilities, err = decodeProbabilitiesValue(val) + retVal = unMarshalProbabilities.Probabilities + } + return nil + }) + if err != nil { + return nil, err + } + return retVal, nil +} + +func (s *SamplingStore) createProbabilitiesEntry(hostname string, probabilities model.ServiceOperationProbabilities, qps model.ServiceOperationQPS, startTime uint64) (*badger.Entry, error) { + pK, pV, err := s.createProbabilitiesKV(hostname, probabilities, qps, startTime) + if err != nil { + return nil, err + } + + e := s.createBadgerEntry(pK, pV) + + return e, nil +} + +func (s *SamplingStore) createProbabilitiesKV(hostname string, probabilities model.ServiceOperationProbabilities, qps model.ServiceOperationQPS, startTime uint64) ([]byte, []byte, error) { + key := make([]byte, 16) + key[0] = probabilitiesKeyPrefix + pos := 1 + binary.BigEndian.PutUint64(key[pos:], startTime) + + var bb []byte + var err error + val := ProbabilitiesAndQPS{ + Hostname: hostname, + Probabilities: probabilities, + QPS: qps, + } + bb, err = json.Marshal(val) + return key, bb, err } func (s *SamplingStore) createThroughputEntry(throughput []*model.Throughput, startTime uint64) (*badger.Entry, error) { @@ -151,22 +222,29 @@ func (s *SamplingStore) createThroughputKV(throughput []*model.Throughput, start var err error bb, err = json.Marshal(throughput) - fmt.Printf("Badger key %v, value %v\n", key, string(bb)) return key, bb, err } -func decodeValue(val []byte) ([]*model.Throughput, error) { +func decodeThroughtputValue(val []byte) ([]*model.Throughput, error) { var throughput []*model.Throughput err := json.Unmarshal(val, &throughput) if err != nil { - fmt.Println("Error while unmarshalling") return nil, err } - fmt.Printf("Throughput %v\n", throughput) return throughput, nil } +func decodeProbabilitiesValue(val []byte) (ProbabilitiesAndQPS, error) { + var probabilities ProbabilitiesAndQPS + + err := json.Unmarshal(val, &probabilities) + if err != nil { + return ProbabilitiesAndQPS{}, err + } + return probabilities, nil +} + func initalStartTime(timeBytes []byte) (time.Time, error) { var usec int64 diff --git a/plugin/storage/badger/samplingstore/storage_test.go b/plugin/storage/badger/samplingstore/storage_test.go index f9042edf295..d11e5524d72 100644 --- a/plugin/storage/badger/samplingstore/storage_test.go +++ b/plugin/storage/badger/samplingstore/storage_test.go @@ -63,6 +63,27 @@ func TestGetThroughput(t *testing.T) { }) } +func TestInsertProbabilitiesAndQPS(t *testing.T) { + runWithBadger(t, func(s *samplingStoreTest, t *testing.T) { + err := s.store.InsertProbabilitiesAndQPS("dell11eg843d", samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}}) + assert.NoError(t, err) + }) +} + +func TestGetLatestProbabilities(t *testing.T) { + runWithBadger(t, func(s *samplingStoreTest, t *testing.T) { + err := s.store.InsertProbabilitiesAndQPS("dell11eg843d", samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}}) + assert.NoError(t, err) + err = s.store.InsertProbabilitiesAndQPS("newhostname", samplemodel.ServiceOperationProbabilities{"new-srv2": {"op": 0.123}}, samplemodel.ServiceOperationQPS{"new-srv2": {"op": 1}}) + assert.NoError(t, err) + + expected := samplemodel.ServiceOperationProbabilities{"new-srv2": {"op": 0.123}} + actual, err := s.store.GetLatestProbabilities() + assert.NoError(t, err) + assert.Equal(t, expected, actual) + }) +} + func runWithBadger(t *testing.T, test func(s *samplingStoreTest, t *testing.T)) { opts := badger.DefaultOptions("") diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index afb743506c9..9f1cf163376 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -452,6 +452,7 @@ func (s *StorageIntegration) testGetLatestProbability(t *testing.T) { } defer s.cleanUp(t) + s.SamplingStore.InsertProbabilitiesAndQPS("newhostname1", samplemodel.ServiceOperationProbabilities{"new-srv3": {"op": 0.123}}, samplemodel.ServiceOperationQPS{"new-srv2": {"op": 11}}) s.SamplingStore.InsertProbabilitiesAndQPS("dell11eg843d", samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}}) expected := samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}} From 8180f18210532990746a2603ea2d37ec352a935b Mon Sep 17 00:00:00 2001 From: slayer321 Date: Sun, 15 Oct 2023 14:27:33 +0530 Subject: [PATCH 4/7] handle error Signed-off-by: slayer321 --- plugin/storage/badger/samplingstore/storage.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/plugin/storage/badger/samplingstore/storage.go b/plugin/storage/badger/samplingstore/storage.go index c7823ff05b8..f15a22c8c94 100644 --- a/plugin/storage/badger/samplingstore/storage.go +++ b/plugin/storage/badger/samplingstore/storage.go @@ -156,6 +156,9 @@ func (s *SamplingStore) GetLatestProbabilities() (model.ServiceOperationProbabil return err } unMarshalProbabilities, err = decodeProbabilitiesValue(val) + if err != nil { + return err + } retVal = unMarshalProbabilities.Probabilities } return nil From 6ee58639f49238e6162542240d9b93a90a736343 Mon Sep 17 00:00:00 2001 From: slayer321 Date: Mon, 16 Oct 2023 19:06:31 +0530 Subject: [PATCH 5/7] add suggested changes Signed-off-by: slayer321 --- plugin/storage/badger/factory.go | 4 +- .../badger/samplingstore/storage_test.go | 94 +++++++++++++------ plugin/storage/integration/integration.go | 9 +- 3 files changed, 75 insertions(+), 32 deletions(-) diff --git a/plugin/storage/badger/factory.go b/plugin/storage/badger/factory.go index f8b52168a37..6c0788864ae 100644 --- a/plugin/storage/badger/factory.go +++ b/plugin/storage/badger/factory.go @@ -29,7 +29,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin" depStore "github.com/jaegertracing/jaeger/plugin/storage/badger/dependencystore" - samplingStore "github.com/jaegertracing/jaeger/plugin/storage/badger/samplingstore" + badgerSampling "github.com/jaegertracing/jaeger/plugin/storage/badger/samplingstore" badgerStore "github.com/jaegertracing/jaeger/plugin/storage/badger/spanstore" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/samplingstore" @@ -174,7 +174,7 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { // CreateSamplingStore implements storage.SamplingStoreFactory func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) { - return samplingStore.NewSamplingStore(f.store), nil + return badgerSampling.NewSamplingStore(f.store), nil } // Close Implements io.Closer and closes the underlying storage diff --git a/plugin/storage/badger/samplingstore/storage_test.go b/plugin/storage/badger/samplingstore/storage_test.go index d11e5524d72..05737f85bad 100644 --- a/plugin/storage/badger/samplingstore/storage_test.go +++ b/plugin/storage/badger/samplingstore/storage_test.go @@ -15,6 +15,7 @@ package samplingstore import ( + "encoding/json" "testing" "time" @@ -24,67 +25,106 @@ import ( samplemodel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" ) -type samplingStoreTest struct { - store *SamplingStore -} - -func NewtestSamplingStore(db *badger.DB) *samplingStoreTest { - return &samplingStoreTest{ - store: NewSamplingStore(db), - } +func newTestSamplingStore(db *badger.DB) *SamplingStore { + return NewSamplingStore(db) } func TestInsertThroughput(t *testing.T) { - runWithBadger(t, func(s *samplingStoreTest, t *testing.T) { + runWithBadger(t, func(t *testing.T, store *SamplingStore) { throughputs := []*samplemodel.Throughput{ {Service: "my-svc", Operation: "op"}, {Service: "our-svc", Operation: "op2"}, } - err := s.store.InsertThroughput(throughputs) + err := store.InsertThroughput(throughputs) assert.NoError(t, err) }) } func TestGetThroughput(t *testing.T) { - runWithBadger(t, func(s *samplingStoreTest, t *testing.T) { + runWithBadger(t, func(t *testing.T, store *SamplingStore) { start := time.Now() - - expected := 2 - throughputs := []*samplemodel.Throughput{ + expected := []*samplemodel.Throughput{ {Service: "my-svc", Operation: "op"}, {Service: "our-svc", Operation: "op2"}, } - err := s.store.InsertThroughput(throughputs) + err := store.InsertThroughput(expected) assert.NoError(t, err) - actual, err := s.store.GetThroughput(start, start.Add(time.Second*time.Duration(10))) + actual, err := store.GetThroughput(start, start.Add(time.Second*time.Duration(10))) assert.NoError(t, err) - assert.Equal(t, expected, len(actual)) + assert.Equal(t, expected, actual) }) } func TestInsertProbabilitiesAndQPS(t *testing.T) { - runWithBadger(t, func(s *samplingStoreTest, t *testing.T) { - err := s.store.InsertProbabilitiesAndQPS("dell11eg843d", samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}}) + runWithBadger(t, func(t *testing.T, store *SamplingStore) { + err := store.InsertProbabilitiesAndQPS( + "dell11eg843d", + samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, + samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}}, + ) assert.NoError(t, err) }) } func TestGetLatestProbabilities(t *testing.T) { - runWithBadger(t, func(s *samplingStoreTest, t *testing.T) { - err := s.store.InsertProbabilitiesAndQPS("dell11eg843d", samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}}) + runWithBadger(t, func(t *testing.T, store *SamplingStore) { + err := store.InsertProbabilitiesAndQPS( + "dell11eg843d", + samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, + samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}}, + ) assert.NoError(t, err) - err = s.store.InsertProbabilitiesAndQPS("newhostname", samplemodel.ServiceOperationProbabilities{"new-srv2": {"op": 0.123}}, samplemodel.ServiceOperationQPS{"new-srv2": {"op": 1}}) + err = store.InsertProbabilitiesAndQPS( + "newhostname", + samplemodel.ServiceOperationProbabilities{"new-srv2": {"op": 0.123}}, + samplemodel.ServiceOperationQPS{"new-srv2": {"op": 1}}, + ) assert.NoError(t, err) expected := samplemodel.ServiceOperationProbabilities{"new-srv2": {"op": 0.123}} - actual, err := s.store.GetLatestProbabilities() + actual, err := store.GetLatestProbabilities() assert.NoError(t, err) assert.Equal(t, expected, actual) }) } -func runWithBadger(t *testing.T, test func(s *samplingStoreTest, t *testing.T)) { +func TestDecodeProbabilitiesValue(t *testing.T) { + expected := ProbabilitiesAndQPS{ + Hostname: "dell11eg843d", + Probabilities: samplemodel.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, + QPS: samplemodel.ServiceOperationQPS{"new-srv": {"op": 4}}, + } + + marshalBytes, err := json.Marshal(expected) + assert.NoError(t, err) + actual, err := decodeProbabilitiesValue(marshalBytes) + assert.NoError(t, err) + assert.Equal(t, expected, actual) +} + +func TestDecodeThroughtputValue(t *testing.T) { + expected := []*samplemodel.Throughput{ + {Service: "my-svc", Operation: "op"}, + {Service: "our-svc", Operation: "op2"}, + } + + marshalBytes, err := json.Marshal(expected) + assert.NoError(t, err) + acrual, err := decodeThroughtputValue(marshalBytes) + assert.NoError(t, err) + assert.Equal(t, expected, acrual) +} + +func TestInitalStartTime(t *testing.T) { + expected := "2023-10-10 20:17:19.993838 +0530 IST" + timeBytes := []byte{0, 6, 7, 93, 200, 166, 213, 238} + actual, err := initalStartTime(timeBytes) + assert.NoError(t, err) + assert.Equal(t, expected, actual.String()) +} + +func runWithBadger(t *testing.T, test func(t *testing.T, store *SamplingStore)) { opts := badger.DefaultOptions("") opts.SyncWrites = false @@ -94,11 +134,11 @@ func runWithBadger(t *testing.T, test func(s *samplingStoreTest, t *testing.T)) store, err := badger.Open(opts) defer func() { - store.Close() + assert.NoError(t, store.Close()) }() - ss := NewtestSamplingStore(store) + ss := newTestSamplingStore(store) assert.NoError(t, err) - test(ss, t) + test(t, ss) } diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 9f1cf163376..ed884845acc 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -433,15 +433,18 @@ func (s *StorageIntegration) testGetThroughput(t *testing.T) { s.insertThroughput(t) - expected := 2 + expected := []*samplemodel.Throughput{ + {Service: "my-svc", Operation: "op"}, + {Service: "our-svc", Operation: "op2"}, + } var actual []*samplemodel.Throughput _ = s.waitForCondition(t, func(t *testing.T) bool { var err error actual, err = s.SamplingStore.GetThroughput(start, start.Add(time.Second*time.Duration(10))) require.NoError(t, err) - return assert.ObjectsAreEqualValues(expected, len(actual)) + return assert.ObjectsAreEqualValues(expected, actual) }) - assert.Len(t, actual, expected) + assert.Equal(t, expected, actual) } func (s *StorageIntegration) testGetLatestProbability(t *testing.T) { From 32e24d6612de1cc7c74efa7c8fcd373e2405fae3 Mon Sep 17 00:00:00 2001 From: slayer321 Date: Mon, 16 Oct 2023 21:12:29 +0530 Subject: [PATCH 6/7] fix failing CI for intergration test Signed-off-by: slayer321 --- plugin/storage/badger/samplingstore/storage_test.go | 8 -------- plugin/storage/integration/integration.go | 9 +++------ 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/plugin/storage/badger/samplingstore/storage_test.go b/plugin/storage/badger/samplingstore/storage_test.go index 05737f85bad..91fdae02a56 100644 --- a/plugin/storage/badger/samplingstore/storage_test.go +++ b/plugin/storage/badger/samplingstore/storage_test.go @@ -116,14 +116,6 @@ func TestDecodeThroughtputValue(t *testing.T) { assert.Equal(t, expected, acrual) } -func TestInitalStartTime(t *testing.T) { - expected := "2023-10-10 20:17:19.993838 +0530 IST" - timeBytes := []byte{0, 6, 7, 93, 200, 166, 213, 238} - actual, err := initalStartTime(timeBytes) - assert.NoError(t, err) - assert.Equal(t, expected, actual.String()) -} - func runWithBadger(t *testing.T, test func(t *testing.T, store *SamplingStore)) { opts := badger.DefaultOptions("") diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index ed884845acc..9f1cf163376 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -433,18 +433,15 @@ func (s *StorageIntegration) testGetThroughput(t *testing.T) { s.insertThroughput(t) - expected := []*samplemodel.Throughput{ - {Service: "my-svc", Operation: "op"}, - {Service: "our-svc", Operation: "op2"}, - } + expected := 2 var actual []*samplemodel.Throughput _ = s.waitForCondition(t, func(t *testing.T) bool { var err error actual, err = s.SamplingStore.GetThroughput(start, start.Add(time.Second*time.Duration(10))) require.NoError(t, err) - return assert.ObjectsAreEqualValues(expected, actual) + return assert.ObjectsAreEqualValues(expected, len(actual)) }) - assert.Equal(t, expected, actual) + assert.Len(t, actual, expected) } func (s *StorageIntegration) testGetLatestProbability(t *testing.T) { From 2928d5e10845ac44d0f4a1d9c418d7f9b8074929 Mon Sep 17 00:00:00 2001 From: slayer321 Date: Tue, 17 Oct 2023 18:43:05 +0530 Subject: [PATCH 7/7] minor suggested changes Signed-off-by: slayer321 --- plugin/storage/badger/samplingstore/storage.go | 2 +- plugin/storage/badger/samplingstore/storage_test.go | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/plugin/storage/badger/samplingstore/storage.go b/plugin/storage/badger/samplingstore/storage.go index f15a22c8c94..49d51ee3668 100644 --- a/plugin/storage/badger/samplingstore/storage.go +++ b/plugin/storage/badger/samplingstore/storage.go @@ -235,7 +235,7 @@ func decodeThroughtputValue(val []byte) ([]*model.Throughput, error) { if err != nil { return nil, err } - return throughput, nil + return throughput, err } func decodeProbabilitiesValue(val []byte) (ProbabilitiesAndQPS, error) { diff --git a/plugin/storage/badger/samplingstore/storage_test.go b/plugin/storage/badger/samplingstore/storage_test.go index 91fdae02a56..c13638b6c22 100644 --- a/plugin/storage/badger/samplingstore/storage_test.go +++ b/plugin/storage/badger/samplingstore/storage_test.go @@ -98,9 +98,15 @@ func TestDecodeProbabilitiesValue(t *testing.T) { marshalBytes, err := json.Marshal(expected) assert.NoError(t, err) + // This should pass without error actual, err := decodeProbabilitiesValue(marshalBytes) assert.NoError(t, err) assert.Equal(t, expected, actual) + + // Simulate data corruption by removing the first byte. + corruptedBytes := marshalBytes[1:] + _, err = decodeProbabilitiesValue(corruptedBytes) + assert.Error(t, err) // Expect an error } func TestDecodeThroughtputValue(t *testing.T) { @@ -125,12 +131,10 @@ func runWithBadger(t *testing.T, test func(t *testing.T, store *SamplingStore)) opts.ValueDir = dir store, err := badger.Open(opts) + assert.NoError(t, err) defer func() { assert.NoError(t, store.Close()) }() ss := newTestSamplingStore(store) - - assert.NoError(t, err) - test(t, ss) }