Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make operation_strategies part also be part of default_strategy #1749

Merged
merged 12 commits into from
Dec 29, 2019
2 changes: 1 addition & 1 deletion plugin/sampling/strategystore/static/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
var (
// defaultStrategy is the default sampling strategy the Strategy Store will return
// if none is provided.
defaultStrategy = sampling.SamplingStrategyResponse{
defaultStrategyResponse = sampling.SamplingStrategyResponse{
rutgerbrf marked this conversation as resolved.
Show resolved Hide resolved
StrategyType: sampling.SamplingStrategyType_PROBABILISTIC,
ProbabilisticSampling: &sampling.ProbabilisticSamplingStrategy{
SamplingRate: defaultSamplingProbability,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,41 @@
{
"default_strategy": {
"type": "probabilistic",
"param": 0.5
"param": 0.5,
"operation_strategies": [
{
"operation": "op0",
"type": "probabilistic",
"param": 0.2
},
{
"operation": "op6",
"type": "probabilistic",
"param": 0
},
{
"operation": "spam",
"type": "ratelimiting",
"param": 1
},
{
"operation": "op7",
"type": "probabilistic",
"param": 1
}
]
},
"service_strategies": [
{
"service": "foo",
"type": "probabilistic",
"param": 0.8,
"operation_strategies": [
{
"operation": "op6",
"type": "probabilistic",
"param": 0.5
},
{
"operation": "op1",
"type": "probabilistic",
Expand Down
8 changes: 7 additions & 1 deletion plugin/sampling/strategystore/static/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,14 @@ type serviceStrategy struct {
strategy
}

// defaultStrategy defines the default strategy.
type defaultStrategy struct {
rutgerbrf marked this conversation as resolved.
Show resolved Hide resolved
OperationStrategies []*operationStrategy `json:"operation_strategies"`
strategy
}

// strategies holds a default sampling strategy and service specific sampling strategies.
type strategies struct {
DefaultStrategy *strategy `json:"default_strategy"`
DefaultStrategy *defaultStrategy `json:"default_strategy"`
ServiceStrategies []*serviceStrategy `json:"service_strategies"`
}
118 changes: 106 additions & 12 deletions plugin/sampling/strategystore/static/strategy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"sort"

"github.com/pkg/errors"
"go.uber.org/zap"
Expand Down Expand Up @@ -74,17 +75,67 @@ func loadStrategies(strategiesFile string) (*strategies, error) {
}

func (h *strategyStore) parseStrategies(strategies *strategies) {
h.defaultStrategy = &defaultStrategy
h.defaultStrategy = &defaultStrategyResponse
if strategies == nil {
h.logger.Info("No sampling strategies provided, using defaults")
return
}
if strategies.DefaultStrategy != nil {
h.defaultStrategy = h.parseStrategy(strategies.DefaultStrategy)
h.defaultStrategy = h.parseDefaultStrategy(strategies.DefaultStrategy)
}

for _, s := range strategies.ServiceStrategies {
h.serviceStrategies[s.Service] = h.parseServiceStrategies(s)

// Merge with the default operation strategies, because only merging with
// the default strategy has no effect on service strategies (the default strategy
// is not merged with and only used as a fallback).
opS := h.serviceStrategies[s.Service].OperationSampling
if opS == nil {
// It has no use to merge, just reference the default settings.
rutgerbrf marked this conversation as resolved.
Show resolved Hide resolved
h.serviceStrategies[s.Service].OperationSampling = h.defaultStrategy.OperationSampling
continue
}
if h.defaultStrategy.OperationSampling == nil ||
h.defaultStrategy.OperationSampling.PerOperationStrategies == nil {
continue
}
rutgerbrf marked this conversation as resolved.
Show resolved Hide resolved
opS.PerOperationStrategies = mergePerOperationSamplingStrategies(
opS.PerOperationStrategies,
h.defaultStrategy.OperationSampling.PerOperationStrategies)
}
}

// mergePerOperationStrategies merges two operation strategies a and b, where a takes precedence over b.
func mergePerOperationSamplingStrategies(
a, b []*sampling.OperationSamplingStrategy,
) []*sampling.OperationSamplingStrategy {
// Guess the size of the slice of the two merged.
merged := make([]*sampling.OperationSamplingStrategy, 0, (len(a)+len(b))/4*3)
rutgerbrf marked this conversation as resolved.
Show resolved Hide resolved

ossLess := func(s []*sampling.OperationSamplingStrategy, i, j int) bool {
return s[i].Operation < s[j].Operation
}
sort.Slice(a, func(i, j int) bool { return ossLess(a, i, j) })
sort.Slice(b, func(i, j int) bool { return ossLess(b, i, j) })

j := 0
for i := range a {
// Increment j till b[j] > a[i], such that in the loop after the
// loop over a no remaining element of b with the same operation
// as a[i] is added to the merged slice.
for ; j < len(b) && b[j].Operation <= a[i].Operation; j++ {
if b[j].Operation < a[i].Operation {
merged = append(merged, b[j])
}
}
merged = append(merged, a[i])
}
for ; j < len(b); j++ {
merged = append(merged, b[j])
}
rutgerbrf marked this conversation as resolved.
Show resolved Hide resolved

return merged
}

func (h *strategyStore) parseServiceStrategies(strategy *serviceStrategy) *sampling.SamplingStrategyResponse {
Expand All @@ -99,17 +150,11 @@ func (h *strategyStore) parseServiceStrategies(strategy *serviceStrategy) *sampl
opS.DefaultSamplingProbability = resp.ProbabilisticSampling.SamplingRate
}
for _, operationStrategy := range strategy.OperationStrategies {
s := h.parseStrategy(&operationStrategy.strategy)
if s.StrategyType == sampling.SamplingStrategyType_RATE_LIMITING {
// TODO OperationSamplingStrategy only supports probabilistic sampling
h.logger.Warn(
fmt.Sprintf(
"Operation strategies only supports probabilistic sampling at the moment,"+
"'%s' defaulting to probabilistic sampling with probability %f",
operationStrategy.Operation, opS.DefaultSamplingProbability),
zap.Any("strategy", operationStrategy))
s, ok := h.parseOperationStrategy(operationStrategy, opS)
if !ok {
continue
}

opS.PerOperationStrategies = append(opS.PerOperationStrategies,
&sampling.OperationSamplingStrategy{
Operation: operationStrategy.Operation,
Expand All @@ -120,6 +165,55 @@ func (h *strategyStore) parseServiceStrategies(strategy *serviceStrategy) *sampl
return resp
}

func (h *strategyStore) parseDefaultStrategy(strategy *defaultStrategy) *sampling.SamplingStrategyResponse {
rutgerbrf marked this conversation as resolved.
Show resolved Hide resolved
resp := h.parseStrategy(&strategy.strategy)
if len(strategy.OperationStrategies) == 0 {
return resp
}

opS := resp.OperationSampling
if opS == nil {
rutgerbrf marked this conversation as resolved.
Show resolved Hide resolved
opS = &sampling.PerOperationSamplingStrategies{
DefaultSamplingProbability: defaultSamplingProbability,
}
resp.OperationSampling = opS
}

for _, s := range strategy.OperationStrategies {
strategy, ok := h.parseOperationStrategy(s, opS)
if !ok {
continue
}

opS.PerOperationStrategies = append(opS.PerOperationStrategies,
&sampling.OperationSamplingStrategy{
Operation: s.Operation,
ProbabilisticSampling: strategy.ProbabilisticSampling,
})
}

resp.OperationSampling = opS
return resp
}

func (h *strategyStore) parseOperationStrategy(
strategy *operationStrategy,
parent *sampling.PerOperationSamplingStrategies,
) (s *sampling.SamplingStrategyResponse, ok bool) {
s = h.parseStrategy(&strategy.strategy)
if s.StrategyType == sampling.SamplingStrategyType_RATE_LIMITING {
// TODO OperationSamplingStrategy only supports probabilistic sampling
h.logger.Warn(
fmt.Sprintf(
"Operation strategies only supports probabilistic sampling at the moment,"+
"'%s' defaulting to probabilistic sampling with probability %f",
strategy.Operation, parent.DefaultSamplingProbability),
zap.Any("strategy", strategy))
return nil, false
}
return s, true
}

func (h *strategyStore) parseStrategy(strategy *strategy) *sampling.SamplingStrategyResponse {
switch strategy.Type {
case samplerTypeProbabilistic:
Expand All @@ -138,7 +232,7 @@ func (h *strategyStore) parseStrategy(strategy *strategy) *sampling.SamplingStra
}
default:
h.logger.Warn("Failed to parse sampling strategy", zap.Any("strategy", strategy))
return deepCopy(&defaultStrategy)
return deepCopy(&defaultStrategyResponse)
}
}

Expand Down
54 changes: 46 additions & 8 deletions plugin/sampling/strategystore/static/strategy_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package static

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -77,9 +78,16 @@ func TestPerOperationSamplingStrategies(t *testing.T) {
require.NotNil(t, s.OperationSampling)
os := s.OperationSampling
assert.EqualValues(t, os.DefaultSamplingProbability, 0.8)
require.Len(t, os.PerOperationStrategies, 1)
assert.Equal(t, "op1", os.PerOperationStrategies[0].Operation)
require.Len(t, os.PerOperationStrategies, 4)
fmt.Println(os)
assert.Equal(t, "op0", os.PerOperationStrategies[0].Operation)
assert.EqualValues(t, 0.2, os.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate)
assert.Equal(t, "op1", os.PerOperationStrategies[1].Operation)
assert.EqualValues(t, 0.2, os.PerOperationStrategies[1].ProbabilisticSampling.SamplingRate)
assert.Equal(t, "op6", os.PerOperationStrategies[2].Operation)
assert.EqualValues(t, 0.5, os.PerOperationStrategies[2].ProbabilisticSampling.SamplingRate)
assert.Equal(t, "op7", os.PerOperationStrategies[3].Operation)
assert.EqualValues(t, 1, os.PerOperationStrategies[3].ProbabilisticSampling.SamplingRate)

expected = makeResponse(sampling.SamplingStrategyType_RATE_LIMITING, 5)

Expand All @@ -91,15 +99,45 @@ func TestPerOperationSamplingStrategies(t *testing.T) {
require.NotNil(t, s.OperationSampling)
os = s.OperationSampling
assert.EqualValues(t, os.DefaultSamplingProbability, 0.001)
require.Len(t, os.PerOperationStrategies, 2)
assert.Equal(t, "op3", os.PerOperationStrategies[0].Operation)
assert.EqualValues(t, 0.3, os.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate)
assert.Equal(t, "op5", os.PerOperationStrategies[1].Operation)
assert.EqualValues(t, 0.4, os.PerOperationStrategies[1].ProbabilisticSampling.SamplingRate)
require.Len(t, os.PerOperationStrategies, 5)
assert.Equal(t, "op0", os.PerOperationStrategies[0].Operation)
assert.EqualValues(t, 0.2, os.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate)
assert.Equal(t, "op3", os.PerOperationStrategies[1].Operation)
assert.EqualValues(t, 0.3, os.PerOperationStrategies[1].ProbabilisticSampling.SamplingRate)
assert.Equal(t, "op5", os.PerOperationStrategies[2].Operation)
assert.EqualValues(t, 0.4, os.PerOperationStrategies[2].ProbabilisticSampling.SamplingRate)
assert.Equal(t, "op6", os.PerOperationStrategies[3].Operation)
assert.EqualValues(t, 0, os.PerOperationStrategies[3].ProbabilisticSampling.SamplingRate)
assert.Equal(t, "op7", os.PerOperationStrategies[4].Operation)
assert.EqualValues(t, 1, os.PerOperationStrategies[4].ProbabilisticSampling.SamplingRate)

s, err = store.GetSamplingStrategy("default")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.5), *s)
expectedRsp := makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.5)
expectedRsp.OperationSampling = &sampling.PerOperationSamplingStrategies{
DefaultSamplingProbability: 0.001,
PerOperationStrategies: []*sampling.OperationSamplingStrategy{
{
Operation: "op0",
ProbabilisticSampling: &sampling.ProbabilisticSamplingStrategy{
SamplingRate: 0.2,
},
},
{
Operation: "op6",
ProbabilisticSampling: &sampling.ProbabilisticSamplingStrategy{
SamplingRate: 0,
},
},
{
Operation: "op7",
ProbabilisticSampling: &sampling.ProbabilisticSamplingStrategy{
SamplingRate: 1,
},
},
},
}
assert.EqualValues(t, expectedRsp, *s)
}

func TestMissingServiceSamplingStrategyTypes(t *testing.T) {
Expand Down