Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Winlogbeat Fix an issue with caching #1100

Merged
merged 1 commit into from
Mar 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff]

*Winlogbeat*
- Add caching of event metadata handles and the system render context for the wineventlog API {pull}888[888]
- Improve config validation by checking for unknown top-level YAML keys. {pull}1100[1100]

==== Deprecated

Expand Down
2 changes: 1 addition & 1 deletion libbeat/common/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (c *Cache) CleanUp() int {
return count
}

// Entries returns a copy of the non-expired elements in the cache.
// Entries returns a shallow copy of the non-expired elements in the cache.
func (c *Cache) Entries() map[Key]Value {
c.RLock()
defer c.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion winlogbeat/beater/winlogbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (eb *Winlogbeat) Config(b *beat.Beat) error {
}

// Validate configuration.
err = eb.config.Winlogbeat.Validate()
err = eb.config.Validate()
if err != nil {
return fmt.Errorf("Error validating configuration file. %v", err)
}
Expand Down
29 changes: 28 additions & 1 deletion winlogbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package config
import (
"fmt"
"net"
"sort"
"strconv"
"strings"
"time"
Expand All @@ -27,7 +28,33 @@ type Validator interface {

// Settings is the root of the Winlogbeat configuration data hierarchy.
type Settings struct {
Winlogbeat WinlogbeatConfig
Winlogbeat WinlogbeatConfig `config:"winlogbeat"`
All map[string]interface{} `config:",inline"`
}

// Validate validates the Settings data and returns an error describing
// all problems or nil if there are none.
func (s Settings) Validate() error {
validKeys := []string{"winlogbeat", "output", "shipper", "logging"}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks like something we should move to libbeat, as these 4 keys must exist for each beat.

sort.Strings(validKeys)

// Check for invalid top-level keys.
var errs multierror.Errors
for k, _ := range s.All {
k = strings.ToLower(k)
i := sort.SearchStrings(validKeys, k)
if i >= len(validKeys) || validKeys[i] != k {
errs = append(errs, fmt.Errorf("Invalid top-level key '%s' "+
"found. Valid keys are %s", k, strings.Join(validKeys, ", ")))
}
}

err := s.Winlogbeat.Validate()
if err != nil {
errs = append(errs, err)
}

return errs.Err()
}

// WinlogbeatConfig contains all of Winlogbeat configuration data.
Expand Down
12 changes: 12 additions & 0 deletions winlogbeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ func TestConfigValidate(t *testing.T) {
},
"", // No Error
},
{
Settings{
WinlogbeatConfig{
EventLogs: []EventLogConfig{
{Name: "App"},
},
},
map[string]interface{}{"other": "value"},
},
"1 error: Invalid top-level key 'other' found. Valid keys are " +
"logging, output, shipper, winlogbeat",
},
{
WinlogbeatConfig{},
"1 error: At least one event log must be configured as part of " +
Expand Down
47 changes: 40 additions & 7 deletions winlogbeat/eventlog/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@ package eventlog
// to event message files.

import (
"expvar"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/winlogbeat/sys/eventlogging"
)

// Stats for the message file caches.
var (
cacheStats = expvar.NewMap("msgFileCacheStats")
)

// Constants that control the cache behavior.
const (
expirationTimeout time.Duration = 2 * time.Minute
Expand All @@ -31,6 +37,11 @@ type messageFilesCache struct {
loader messageFileLoaderFunc
freer freeHandleFunc
eventLogName string

// Cache metrics.
hit func() // Increments number of cache hits.
miss func() // Increments number of cache misses.
size func() // Sets the current cache size.
}

// newHandleCache creates and returns a new handleCache that has been
Expand All @@ -39,14 +50,24 @@ type messageFilesCache struct {
func newMessageFilesCache(eventLogName string, loader messageFileLoaderFunc,
freer freeHandleFunc) *messageFilesCache {

size := &expvar.Int{}
cacheStats.Set(eventLogName+"Size", size)

hc := &messageFilesCache{
loader: loader,
freer: freer,
eventLogName: eventLogName,
hit: func() { cacheStats.Add(eventLogName+"Hits", 1) },
miss: func() { cacheStats.Add(eventLogName+"Misses", 1) },
}
hc.cache = common.NewCacheWithRemovalListener(expirationTimeout,
initialSize, hc.evictionHandler)
hc.cache.StartJanitor(janitorInterval)
hc.size = func() {
s := hc.cache.Size()
size.Set(int64(s))
debugf("messageFilesCache[%s] size=%d", hc.eventLogName, s)
}
return hc
}

Expand All @@ -57,6 +78,8 @@ func newMessageFilesCache(eventLogName string, loader messageFileLoaderFunc,
func (hc *messageFilesCache) get(sourceName string) eventlogging.MessageFiles {
v := hc.cache.Get(sourceName)
if v == nil {
hc.miss()

// Handle to event message file for sourceName is not cached. Attempt
// to load the Handles into the cache.
v = hc.loader(hc.eventLogName, sourceName)
Expand All @@ -65,11 +88,17 @@ func (hc *messageFilesCache) get(sourceName string) eventlogging.MessageFiles {
// check if a value was already loaded.
existing := hc.cache.PutIfAbsent(sourceName, v)
if existing != nil {
// A value was already loaded, so free the handles we created.
existingMessageFiles, _ := existing.(eventlogging.MessageFiles)
hc.freeHandles(existingMessageFiles)
return existingMessageFiles
// A value was already loaded, so free the handles we just created.
messageFiles, _ := v.(eventlogging.MessageFiles)
hc.freeHandles(messageFiles)

// Return the existing cached value.
messageFiles, _ = existing.(eventlogging.MessageFiles)
return messageFiles
}
hc.size()
} else {
hc.hit()
}

messageFiles, _ := v.(eventlogging.MessageFiles)
Expand All @@ -79,13 +108,16 @@ func (hc *messageFilesCache) get(sourceName string) eventlogging.MessageFiles {
// evictionHandler is the callback handler that receives notifications when
// a key-value pair is evicted from the messageFilesCache.
func (hc *messageFilesCache) evictionHandler(k common.Key, v common.Value) {
// Update the size on a different goroutine after the callback completes.
defer func() { go hc.size() }()

messageFiles, ok := v.(eventlogging.MessageFiles)
if !ok {
return
}

logp.Debug("eventlog", "Evicting messageFiles %+v for sourceName %v.",
messageFiles, k)
debugf("messageFilesCache[%s] Evicting messageFiles %+v for sourceName %v.",
hc.eventLogName, messageFiles, k)
hc.freeHandles(messageFiles)
}

Expand All @@ -95,7 +127,8 @@ func (hc *messageFilesCache) freeHandles(mf eventlogging.MessageFiles) {
for _, fh := range mf.Handles {
err := hc.freer(fh.Handle)
if err != nil {
logp.Warn("FreeLibrary error for handle %v", fh.Handle)
logp.Warn("messageFilesCache[%s] FreeLibrary error for handle %v",
hc.eventLogName, fh.Handle)
}
}
}
2 changes: 1 addition & 1 deletion winlogbeat/eventlog/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (l *winEventLog) Close() error {
func newWinEventLog(c Config) (EventLog, error) {
eventMetadataHandle := func(providerName, sourceName string) eventlogging.MessageFiles {
mf := eventlogging.MessageFiles{SourceName: sourceName}
h, err := sys.OpenPublisherMetadata(0, providerName, 0)
h, err := sys.OpenPublisherMetadata(0, sourceName, 0)
if err != nil {
mf.Err = err
return mf
Expand Down