Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add URL option for sampling strategies file #2519

Merged
merged 9 commits into from
Oct 13, 2020
2 changes: 1 addition & 1 deletion cmd/query/app/static_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func loadUIConfig(uiConfig string) (map[string]interface{}, error) {
return nil, nil
}
ext := filepath.Ext(uiConfig)
bytes, err := ioutil.ReadFile(uiConfig) /* nolint #nosec , this comes from an admin, not user */
bytes, err := ioutil.ReadFile(filepath.Clean(uiConfig))
if err != nil {
return nil, fmt.Errorf("cannot read UI config file %v: %w", uiConfig, err)
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/sampling/strategystore/static/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

const (
// SamplingStrategiesFile contains the name of CLI opions for config file.
// SamplingStrategiesFile contains the name of CLI option for config file.
SamplingStrategiesFile = "sampling.strategies-file"
samplingStrategiesReloadInterval = "sampling.strategies-reload-interval"
)
Expand Down
104 changes: 81 additions & 23 deletions plugin/sampling/strategystore/static/strategy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path/filepath"
"sync/atomic"
"time"
Expand All @@ -31,6 +33,10 @@ import (
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

// null represents "null" JSON value and
// it un-marshals to nil pointer.
var nullJSON = []byte("null")

type strategyStore struct {
logger *zap.Logger

Expand All @@ -45,6 +51,8 @@ type storedStrategies struct {
serviceStrategies map[string]*sampling.SamplingStrategyResponse
}

type strategyLoader func() ([]byte, error)

// NewStrategyStore creates a strategy store that holds static sampling strategies.
func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) {
ctx, cancelFunc := context.WithCancel(context.Background())
Expand All @@ -55,14 +63,20 @@ func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, er
}
h.storedStrategies.Store(defaultStrategies())

strategies, err := loadStrategies(options.StrategiesFile)
if options.StrategiesFile == "" {
h.parseStrategies(nil)
return h, nil
}

loadFn := samplingStrategyLoader(options.StrategiesFile)
strategies, err := loadStrategies(loadFn)
if err != nil {
return nil, err
}
h.parseStrategies(strategies)

if options.ReloadInterval > 0 {
go h.autoUpdateStrategies(options.ReloadInterval, options.StrategiesFile)
go h.autoUpdateStrategies(options.ReloadInterval, loadFn)
}
return h, nil
}
Expand All @@ -83,35 +97,81 @@ func (h *strategyStore) Close() {
h.cancelFunc()
}

func (h *strategyStore) autoUpdateStrategies(interval time.Duration, filePath string) {
lastValue := ""
func downloadSamplingStrategies(url string) ([]byte, error) {
resp, err := http.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to download sampling strategies: %w", err)
}

defer resp.Body.Close()
buf := new(bytes.Buffer)
if _, err = buf.ReadFrom(resp.Body); err != nil {
return nil, fmt.Errorf("failed to read sampling strategies HTTP response body: %w", err)
}

if resp.StatusCode == http.StatusServiceUnavailable {
return nullJSON, nil
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf(
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
"receiving %s while downloading strategies file: %s",
resp.Status,
buf.String(),
)
}

return buf.Bytes(), nil
}

func isURL(str string) bool {
u, err := url.Parse(str)
return err == nil && u.Scheme != "" && u.Host != ""
}

func samplingStrategyLoader(strategiesFile string) strategyLoader {
if isURL(strategiesFile) {
return func() ([]byte, error) {
return downloadSamplingStrategies(strategiesFile)
}
}

return func() ([]byte, error) {
currBytes, err := ioutil.ReadFile(filepath.Clean(strategiesFile))
if err != nil {
return nil, fmt.Errorf("failed to read strategies file %s: %w", strategiesFile, err)
}
return currBytes, nil
}
}

func (h *strategyStore) autoUpdateStrategies(interval time.Duration, loader strategyLoader) {
lastValue := string(nullJSON)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, we could have already loaded something before calling this function, so lastValue would be different. But since it's a periodic check anyway, I think it's fine.

ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
lastValue = h.reloadSamplingStrategyFile(filePath, lastValue)
lastValue = h.reloadSamplingStrategy(loader, lastValue)
case <-h.ctx.Done():
return
}
}
}

func (h *strategyStore) reloadSamplingStrategyFile(filePath string, lastValue string) string {
currBytes, err := ioutil.ReadFile(filepath.Clean(filePath))
func (h *strategyStore) reloadSamplingStrategy(loadFn strategyLoader, lastValue string) string {
newValue, err := loadFn()
if err != nil {
h.logger.Error("failed to load sampling strategies", zap.String("file", filePath), zap.Error(err))
h.logger.Error("failed to re-load sampling strategies", zap.Error(err))
return lastValue
}
newValue := string(currBytes)
if lastValue == newValue {
if lastValue == string(newValue) {
return lastValue
}
if err = h.updateSamplingStrategy(currBytes); err != nil {
h.logger.Error("failed to update sampling strategies from file", zap.Error(err))
if err := h.updateSamplingStrategy(newValue); err != nil {
h.logger.Error("failed to update sampling strategies", zap.Error(err))
return lastValue
}
return newValue
return string(newValue)
}

func (h *strategyStore) updateSamplingStrategy(bytes []byte) error {
Expand All @@ -125,24 +185,22 @@ func (h *strategyStore) updateSamplingStrategy(bytes []byte) error {
}

// TODO good candidate for a global util function
func loadStrategies(strategiesFile string) (*strategies, error) {
if strategiesFile == "" {
return nil, nil
}
data, err := ioutil.ReadFile(strategiesFile) /* nolint #nosec , this comes from an admin, not user */
func loadStrategies(loadFn strategyLoader) (*strategies, error) {
strategyBytes, err := loadFn()
if err != nil {
return nil, fmt.Errorf("failed to open strategies file: %w", err)
return nil, err
}
var strategies strategies
if err := json.Unmarshal(data, &strategies); err != nil {

var strategies *strategies
if err := json.Unmarshal(strategyBytes, &strategies); err != nil {
return nil, fmt.Errorf("failed to unmarshal strategies: %w", err)
}
return &strategies, nil
return strategies, nil
}

func (h *strategyStore) parseStrategies(strategies *strategies) {
if strategies == nil {
h.logger.Info("No sampling strategies provided, using defaults")
h.logger.Info("No sampling strategies provided or URL is unavailable, using defaults")
return
}
newStore := defaultStrategies()
Expand Down
Loading