Skip to content

Commit

Permalink
agent config GA
Browse files Browse the repository at this point in the history
  • Loading branch information
jalvz committed Oct 3, 2019
1 parent bda629f commit 23dc36a
Show file tree
Hide file tree
Showing 15 changed files with 375 additions and 315 deletions.
51 changes: 19 additions & 32 deletions agentcfg/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package agentcfg
import (
"time"

"github.com/elastic/apm-server/utility"

gocache "github.com/patrickmn/go-cache"

"github.com/elastic/beats/libbeat/logp"
Expand All @@ -31,53 +33,38 @@ const (

type cache struct {
logger *logp.Logger
exp time.Duration
gocache *gocache.Cache
}

func newCache(logger *logp.Logger, exp time.Duration) *cache {
if logger == nil {
logger = logp.NewLogger("agentcfg")
}
logger.Infof("Cache creation with default expiration %v.", exp)
return &cache{
logger: logger,
exp: exp,
gocache: gocache.New(exp, cleanupInterval)}
}

func (c *cache) fetchAndAdd(q Query, fn func(Query) (*Doc, error)) (doc *Doc, err error) {
id := q.ID()

func (c *cache) fetch(q Query, fetch func() (Result, error)) (Result, error) {
// return from cache if possible
doc, found := c.fetch(id)
if found {
return
value, found := c.gocache.Get(q.Etag)
if found && value != nil {
return value.(Result), nil
}

// call fn to retrieve resource from external source
doc, err = fn(q)
// retrieve resource from external source
result, err := fetch()
if err != nil {
return
return result, err
}

// add resource to cache
c.add(id, doc)
return
}

func (c *cache) add(id string, doc *Doc) {
c.gocache.SetDefault(id, doc)
if !c.logger.IsDebug() {
return
if !authorized(q.IsRum, result.Source.Agent) {
return ZeroResult(), nil
}
c.logger.Debugf("Cache size %v. Added ID %v.", c.gocache.ItemCount(), id)
c.gocache.SetDefault(result.Source.Etag, result)
if c.logger.IsDebug() {
c.logger.Debugf("Cache size %v. Added ID %v.", c.gocache.ItemCount(), result.Source.Etag)
}
return result, nil
}

func (c *cache) fetch(id string) (*Doc, bool) {
val, found := c.gocache.Get(id)
if !found || val == nil {
return nil, found
}
return val.(*Doc), found
func authorized(isRum bool, agent string) bool {
hasRumData := utility.Contains(agent, RumAgent) || agent == ""
return !isRum || hasRumData
}
72 changes: 41 additions & 31 deletions agentcfg/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,62 +21,64 @@ import (
"testing"
"time"

"github.com/elastic/beats/libbeat/logp"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/pkg/errors"
)

var (
defaultDoc = Doc{Settings: Settings{"a": "default"}}
externalDoc = Doc{Settings: Settings{"a": "b"}}
defaultResult = Result{Source{Settings: Settings{"a": "default"}, Etag: "123"}}
externalResult = Result{Source{Settings: Settings{"a": "b"}, Etag: "123"}}
)

type cacheSetup struct {
q Query
c *cache
doc *Doc
q Query
c *cache
result Result
}

func newCacheSetup(service string, exp time.Duration, init bool) cacheSetup {
setup := cacheSetup{
q: Query{Service: Service{Name: service}},
c: newCache(nil, exp),
doc: &defaultDoc,
q: Query{Service: Service{Name: service}, Etag: "123"},
c: newCache(logp.NewLogger(""), exp),
result: defaultResult,
}
if init {
setup.c.add(setup.q.ID(), setup.doc)
setup.c.gocache.SetDefault(setup.result.Source.Etag, setup.result)
}
return setup
}

func TestCache_fetchAndAdd(t *testing.T) {
exp := time.Second
for name, tc := range map[string]struct {
fn func(query Query) (*Doc, error)
fn func() (Result, error)
init bool

doc *Doc
doc Result
fail bool
}{
"DocFromCache": {fn: testFn, init: true, doc: &defaultDoc},
"DocFromCache": {fn: testFn, init: true, doc: defaultResult},
"DocFromFunctionFails": {fn: testFnErr, fail: true},
"DocFromFunction": {fn: testFn, doc: &externalDoc},
"EmptyDocFromFunction": {fn: testFnSettingsNil, doc: &Doc{Settings: Settings{}}},
"DocFromFunction": {fn: testFn, doc: externalResult},
"EmptyDocFromFunction": {fn: testFnSettingsNil, doc: Result{Source{Settings: Settings{}}}},
"NilDocFromFunction": {fn: testFnNil},
} {
t.Run(name, func(t *testing.T) {
setup := newCacheSetup(name, exp, tc.init)

doc, err := setup.c.fetchAndAdd(setup.q, tc.fn)
doc, err := setup.c.fetch(setup.q, tc.fn)
assert.Equal(t, tc.doc, doc)
if tc.fail {
require.Error(t, err)
} else {
assert.NoError(t, err)
//ensure value is cached afterwards
cachedDoc, found := setup.c.fetch(setup.q.ID())
assert.True(t, found)
cachedDoc, error := setup.c.fetch(setup.q, tc.fn)
require.NoError(t, error)
assert.Equal(t, doc, cachedDoc)
}
})
Expand All @@ -85,13 +87,13 @@ func TestCache_fetchAndAdd(t *testing.T) {
t.Run("CacheKeyExpires", func(t *testing.T) {
exp := 100 * time.Millisecond
setup := newCacheSetup(t.Name(), exp, false)
doc, err := setup.c.fetchAndAdd(setup.q, testFn)
doc, err := setup.c.fetch(setup.q, testFn)
require.NoError(t, err)
require.NotNil(t, doc)
time.Sleep(exp)
nilDoc, found := setup.c.fetch(setup.q.ID())
assert.False(t, found)
assert.Nil(t, nilDoc)
emptyDoc, error := setup.c.fetch(setup.q, testFnNil)
require.NoError(t, error)
assert.Equal(t, emptyDoc, Result{})
})
}

Expand All @@ -108,7 +110,7 @@ func BenchmarkFetchAndAdd(b *testing.B) {
exp := 5 * time.Minute
setup := newCacheSetup(b.Name(), exp, true)
for i := 0; i < b.N; i++ {
setup.c.fetchAndAdd(setup.q, testFn)
setup.c.fetch(setup.q, testFn)
}
})

Expand All @@ -120,23 +122,31 @@ func BenchmarkFetchAndAdd(b *testing.B) {
q := Query{Service: Service{}}
for i := 0; i < b.N; i++ {
q.Service.Name = string(b.N)
setup.c.fetchAndAdd(q, testFn)
setup.c.fetch(q, testFn)
}
})
}

func testFnErr(_ Query) (*Doc, error) {
return nil, errors.New("testFn fails")
func TestIsAuthorized(t *testing.T) {
assert.True(t, authorized(true, "rum-js"))
assert.True(t, authorized(true, "base-js"))
assert.True(t, authorized(true, ""))
assert.True(t, authorized(false, "python"))
assert.False(t, authorized(true, "python"))
}

func testFnErr() (Result, error) {
return Result{}, errors.New("testFn fails")
}

func testFnNil(_ Query) (*Doc, error) {
return nil, nil
func testFnNil() (Result, error) {
return Result{}, nil
}

func testFnSettingsNil(_ Query) (*Doc, error) {
return &Doc{Settings: Settings{}}, nil
func testFnSettingsNil() (Result, error) {
return Result{Source{Settings: Settings{}}}, nil
}

func testFn(_ Query) (*Doc, error) {
return &externalDoc, nil
func testFn() (Result, error) {
return externalResult, nil
}
32 changes: 8 additions & 24 deletions agentcfg/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,14 @@ import (
// Error Messages used to signal fetching errors
const (
ErrMsgSendToKibanaFailed = "sending request to kibana failed"
ErrMsgMultipleChoices = "multiple configurations found"
ErrMsgReadKibanaResponse = "unable to read Kibana response body"
)
const endpoint = "/api/apm/settings/agent-configuration/search"

// Fetcher holds static information and information shared between requests.
// It implements the Fetch method to retrieve agent configuration information.
type Fetcher struct {
docCache *cache
*cache
logger *logp.Logger
kbClient kibana.Client
}
Expand All @@ -53,32 +52,19 @@ func NewFetcher(kbClient kibana.Client, cacheExp time.Duration) *Fetcher {
return &Fetcher{
kbClient: kbClient,
logger: logger,
docCache: newCache(logger, cacheExp),
cache: newCache(logger, cacheExp),
}
}

// Fetch retrieves agent configuration, fetched from Kibana or a local temporary cache.
func (f *Fetcher) Fetch(q Query, err error) (map[string]string, string, error) {
req := func(query Query) (*Doc, error) {
resultBytes, err := f.request(convert.ToReader(query), err)
if err != nil {
return nil, err
}
return NewDoc(resultBytes)
func (f *Fetcher) Fetch(query Query) (Result, error) {
req := func() (Result, error) {
return NewResult(f.request(convert.ToReader(query)))
}

doc, err := f.docCache.fetchAndAdd(q, req)
if err != nil {
return nil, "", err
}

return doc.Settings, doc.ID, nil
return f.fetch(query, req)
}

func (f *Fetcher) request(r io.Reader, err error) ([]byte, error) {
if err != nil {
return nil, err
}
func (f *Fetcher) request(r io.Reader) ([]byte, error) {

resp, err := f.kbClient.Send(http.MethodPost, endpoint, nil, nil, r)
if err != nil {
Expand All @@ -91,9 +77,7 @@ func (f *Fetcher) request(r io.Reader, err error) ([]byte, error) {
}

result, err := ioutil.ReadAll(resp.Body)
if resp.StatusCode == http.StatusMultipleChoices {
return nil, errors.Wrap(errors.New(string(result)), ErrMsgMultipleChoices)
} else if resp.StatusCode > http.StatusMultipleChoices {
if resp.StatusCode >= http.StatusBadRequest {
return nil, errors.New(string(result))
}
if err != nil {
Expand Down
Loading

0 comments on commit 23dc36a

Please sign in to comment.