Skip to content

Commit

Permalink
Add url option for sampling strategies
Browse files Browse the repository at this point in the history
This change will let user to provide a
URL to download sampling strategies. Default
strategy is the fallback option if the URL is
temporarily unavailable.

Signed-off-by: Deepak <sah.sslpu@gmail.com>
  • Loading branch information
goku321 committed Oct 7, 2020
1 parent 873397e commit 270e96d
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 31 deletions.
2 changes: 1 addition & 1 deletion plugin/sampling/strategystore/static/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

const (
// SamplingStrategiesFile contains the name of CLI opions for config file.
// SamplingStrategiesFile contains the name of CLI option for config file.
SamplingStrategiesFile = "sampling.strategies-file"
samplingStrategiesReloadInterval = "sampling.strategies-reload-interval"
)
Expand Down
104 changes: 80 additions & 24 deletions plugin/sampling/strategystore/static/strategy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"path/filepath"
"net/http"
"net/url"
"sync/atomic"
"time"

Expand All @@ -31,6 +32,9 @@ import (
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

// null represents "null" JSON value.
const null = "null"

type strategyStore struct {
logger *zap.Logger

Expand All @@ -45,6 +49,8 @@ type storedStrategies struct {
serviceStrategies map[string]*sampling.SamplingStrategyResponse
}

type strategyLoader func() ([]byte, error)

// NewStrategyStore creates a strategy store that holds static sampling strategies.
func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) {
ctx, cancelFunc := context.WithCancel(context.Background())
Expand All @@ -55,14 +61,15 @@ func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, er
}
h.storedStrategies.Store(defaultStrategies())

strategies, err := loadStrategies(options.StrategiesFile)
loadFn := samplingStrategyLoader(options.StrategiesFile)
strategies, err := loadStrategies(loadFn)
if err != nil {
return nil, err
}
h.parseStrategies(strategies)

if options.ReloadInterval > 0 {
go h.autoUpdateStrategies(options.ReloadInterval, options.StrategiesFile)
go h.autoUpdateStrategies(options.ReloadInterval, loadFn)
}
return h, nil
}
Expand All @@ -83,35 +90,86 @@ func (h *strategyStore) Close() {
h.cancelFunc()
}

func (h *strategyStore) autoUpdateStrategies(interval time.Duration, filePath string) {
lastValue := ""
func downloadSamplingStrategies(url string) ([]byte, error) {
resp, err := http.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to download sampling strategies: %w", err)
}

defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusServiceUnavailable {
return []byte("null"), nil
}
return nil, fmt.Errorf(
"receiving %s while downloading strategies file",
resp.Status,
)
}

buf := new(bytes.Buffer)
if _, err = buf.ReadFrom(resp.Body); err != nil {
return nil, fmt.Errorf("failed to read sampling strategies from downloaded JSON: %w", err)
}
return buf.Bytes(), nil
}

func isURL(str string) bool {
u, err := url.Parse(str)
return err == nil && u.Scheme != "" && u.Host != ""
}

func samplingStrategyLoader(strategiesFile string) strategyLoader {
if strategiesFile == "" {
return func() ([]byte, error) {
// Using null so that it un-marshals to nil pointer.
return []byte(null), nil
}
}

if isURL(strategiesFile) {
return func() ([]byte, error) {
return downloadSamplingStrategies(strategiesFile)
}
}

return func() ([]byte, error) {
currBytes, err := ioutil.ReadFile(strategiesFile)
if err != nil {
return nil, fmt.Errorf("failed to open strategies file: %w", err)
}
return currBytes, nil
}
}

func (h *strategyStore) autoUpdateStrategies(interval time.Duration, loader strategyLoader) {
lastValue := null
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
lastValue = h.reloadSamplingStrategyFile(filePath, lastValue)
lastValue = h.reloadSamplingStrategy(loader, lastValue)
case <-h.ctx.Done():
return
}
}
}

func (h *strategyStore) reloadSamplingStrategyFile(filePath string, lastValue string) string {
currBytes, err := ioutil.ReadFile(filepath.Clean(filePath))
func (h *strategyStore) reloadSamplingStrategy(loadFn strategyLoader, lastValue string) string {
newValue, err := loadFn()
if err != nil {
h.logger.Error("failed to load sampling strategies", zap.String("file", filePath), zap.Error(err))
h.logger.Error("failed to re-load sampling strategies", zap.Error(err))
return lastValue
}
newValue := string(currBytes)
if lastValue == newValue {
if lastValue == string(newValue) {
return lastValue
}
if err = h.updateSamplingStrategy(currBytes); err != nil {
h.logger.Error("failed to update sampling strategies from file", zap.Error(err))
if err := h.updateSamplingStrategy(newValue); err != nil {
h.logger.Error("failed to update sampling strategies", zap.Error(err))
return lastValue
}
return newValue
return string(newValue)
}

func (h *strategyStore) updateSamplingStrategy(bytes []byte) error {
Expand All @@ -125,24 +183,22 @@ func (h *strategyStore) updateSamplingStrategy(bytes []byte) error {
}

// TODO good candidate for a global util function
func loadStrategies(strategiesFile string) (*strategies, error) {
if strategiesFile == "" {
return nil, nil
}
data, err := ioutil.ReadFile(strategiesFile) /* nolint #nosec , this comes from an admin, not user */
func loadStrategies(loadFn strategyLoader) (*strategies, error) {
strategyBytes, err := loadFn()
if err != nil {
return nil, fmt.Errorf("failed to open strategies file: %w", err)
return nil, err
}
var strategies strategies
if err := json.Unmarshal(data, &strategies); err != nil {

var strategies *strategies
if err := json.Unmarshal(strategyBytes, &strategies); err != nil {
return nil, fmt.Errorf("failed to unmarshal strategies: %w", err)
}
return &strategies, nil
return strategies, nil
}

func (h *strategyStore) parseStrategies(strategies *strategies) {
if strategies == nil {
h.logger.Info("No sampling strategies provided, using defaults")
h.logger.Info("No sampling strategies provided or URL is unavailable, using defaults")
return
}
newStore := defaultStrategies()
Expand Down
138 changes: 132 additions & 6 deletions plugin/sampling/strategystore/static/strategy_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package static
import (
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
Expand All @@ -31,6 +33,37 @@ import (
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

// Returns strategies in JSON format. Used for testing
// URL option for sampling strategies.
func mockStrategyServer() *httptest.Server {
f := func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/bad-content":
w.Write([]byte("bad-content"))
return

case "/bad-status":
w.WriteHeader(404)
return

case "/service-unavailable":
w.WriteHeader(503)
return

default:
data, err := ioutil.ReadFile("fixtures/strategies.json")
if err != nil {
w.WriteHeader(500)
return
}
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
w.Write(data)
}
}
return httptest.NewServer(http.HandlerFunc(f))
}

func TestStrategyStore(t *testing.T) {
_, err := NewStrategyStore(Options{StrategiesFile: "fileNotFound.json"}, zap.NewNop())
assert.EqualError(t, err, "failed to open strategies file: open fileNotFound.json: no such file or directory")
Expand All @@ -43,7 +76,7 @@ func TestStrategyStore(t *testing.T) {
logger, buf := testutils.NewLogger()
store, err := NewStrategyStore(Options{}, logger)
require.NoError(t, err)
assert.Contains(t, buf.String(), "No sampling strategies provided, using defaults")
assert.Contains(t, buf.String(), "No sampling strategies provided or URL is unavailable, using defaults")
s, err := store.GetSamplingStrategy(context.Background(), "foo")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.001), *s)
Expand All @@ -62,6 +95,26 @@ func TestStrategyStore(t *testing.T) {
s, err = store.GetSamplingStrategy(context.Background(), "default")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.5), *s)

// Test default strategy when URL is temporarily unavailable.
mockServer := mockStrategyServer()
store, err = NewStrategyStore(Options{StrategiesFile: mockServer.URL+"/service-unavailable"}, logger)
assert.Contains(t, buf.String(), "No sampling strategies provided or URL is unavailable, using defaults")
s, err = store.GetSamplingStrategy(context.Background(), "foo")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.001), *s)

// Test downloading strategies from a URL.
store, err = NewStrategyStore(Options{StrategiesFile: mockServer.URL}, logger)
require.NoError(t, err)

s, err = store.GetSamplingStrategy(context.Background(), "foo")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s)

s, err = store.GetSamplingStrategy(context.Background(), "bar")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_RATE_LIMITING, 5), *s)
}

func TestPerOperationSamplingStrategies(t *testing.T) {
Expand Down Expand Up @@ -276,7 +329,7 @@ func TestAutoUpdateStrategy(t *testing.T) {
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s)

// verify that reloading in no-op
value := store.reloadSamplingStrategyFile(dstFile, string(srcBytes))
value := store.reloadSamplingStrategy(samplingStrategyLoader(dstFile), string(srcBytes))
assert.Equal(t, string(srcBytes), value)

// update file with new probability of 0.9
Expand All @@ -293,6 +346,49 @@ func TestAutoUpdateStrategy(t *testing.T) {
time.Sleep(1 * time.Millisecond)
}
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s)

// Test auto update strategy with URL option.
mockServer := mockStrategyServer()
ss, err = NewStrategyStore(Options{
StrategiesFile: mockServer.URL,
ReloadInterval: 10 * time.Millisecond,
}, zap.NewNop())
require.NoError(t, err)
store = ss.(*strategyStore)
defer store.Close()

// copy existing fixture content to restore it later.
srcBytes, err = ioutil.ReadFile(srcFile)
require.NoError(t, err)
originalBytes := srcBytes

// confirm baseline value
s, err = store.GetSamplingStrategy(context.Background(), "foo")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s)

// verify that reloading in no-op
value = store.reloadSamplingStrategy(samplingStrategyLoader(mockServer.URL), string(srcBytes))
assert.Equal(t, string(srcBytes), value)

// update original strategies file with new probability of 0.9
newStr = strings.Replace(string(srcBytes), "0.8", "0.9", 1)
require.NoError(t, ioutil.WriteFile(srcFile, []byte(newStr), 0644))
defer func() {
// replace original strategies file with old content.
require.NoError(t, ioutil.WriteFile(srcFile, originalBytes, 0644), "failed to restore original file content")
}()

// wait for reload timer
for i := 0; i < 1000; i++ { // wait up to 1sec
s, err = store.GetSamplingStrategy(context.Background(), "foo")
require.NoError(t, err)
if s.ProbabilisticSampling != nil && s.ProbabilisticSampling.SamplingRate == 0.9 {
break
}
time.Sleep(1 * time.Millisecond)
}
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s)
}

func TestAutoUpdateStrategyErrors(t *testing.T) {
Expand All @@ -314,13 +410,25 @@ func TestAutoUpdateStrategyErrors(t *testing.T) {
defer store.Close()

// check invalid file path or read failure
assert.Equal(t, "blah", store.reloadSamplingStrategyFile(tempFile.Name()+"bad-path", "blah"))
assert.Len(t, logs.FilterMessage("failed to load sampling strategies").All(), 1)
assert.Equal(t, "blah", store.reloadSamplingStrategy(samplingStrategyLoader(tempFile.Name()+"bad-path"), "blah"))
assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 1)

// check bad file content
require.NoError(t, ioutil.WriteFile(tempFile.Name(), []byte("bad value"), 0644))
assert.Equal(t, "blah", store.reloadSamplingStrategyFile(tempFile.Name(), "blah"))
assert.Len(t, logs.FilterMessage("failed to update sampling strategies from file").All(), 1)
assert.Equal(t, "blah", store.reloadSamplingStrategy(samplingStrategyLoader(tempFile.Name()), "blah"))
assert.Len(t, logs.FilterMessage("failed to update sampling strategies").All(), 1)

// check invalid url
assert.Equal(t, "duh", store.reloadSamplingStrategy(samplingStrategyLoader("bad-url"), "duh"))
assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 2)

// check status code other than 200
assert.Equal(t, "duh", store.reloadSamplingStrategy(samplingStrategyLoader(mockStrategyServer().URL+"/bad-status"), "duh"))
assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 3)

// check bad content from url
assert.Equal(t, "duh", store.reloadSamplingStrategy(samplingStrategyLoader(mockStrategyServer().URL+"/bad-content"), "duh"))
assert.Len(t, logs.FilterMessage("failed to update sampling strategies").All(), 2)
}

func TestServiceNoPerOperationStrategies(t *testing.T) {
Expand All @@ -337,3 +445,21 @@ func TestServiceNoPerOperationStrategies(t *testing.T) {
expected := makeResponse(sampling.SamplingStrategyType_RATE_LIMITING, 3)
assert.Equal(t, *expected.RateLimitingSampling, *s.RateLimitingSampling)
}

func TestSamplingStrategyLoader(t *testing.T) {
// invalid file path
loader := samplingStrategyLoader("not-exists")
_, err := loader()
assert.Contains(t, err.Error(), "failed to open strategies file")

// status code other than 200
mockServer := mockStrategyServer()
loader = samplingStrategyLoader(mockServer.URL + "/bad-status")
_, err = loader()
assert.Contains(t, err.Error(), "receiving 404 Not Found while downloading strategies file")

// should download content from URL
loader = samplingStrategyLoader(mockServer.URL + "/bad-content")
content, err := loader()
assert.Equal(t, "bad-content", string(content))
}

0 comments on commit 270e96d

Please sign in to comment.