Skip to content

Commit

Permalink
Perfmon metricset improvements (elastic#26886)
Browse files Browse the repository at this point in the history
* work on perfmon improvements

* changelog

* rever changes

* complete checks

* review action

* fmt

* clean up

* docs

* fix tests

* update

* fix tests

* fix changelog
  • Loading branch information
narph authored Aug 5, 2021
1 parent b8cbdee commit 12c8dae
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix index template loading when the new index format is selected. {issue}22482[22482] {pull}22682[22682]
- Periodic metrics in logs will now report `libbeat.output.events.active` and `beat.memstats.rss`
as gauges (rather than counters). {pull}22877[22877]
- Improve `perfmon` metricset performance. {pull}26886[26886]
- Preserve annotations in a kubernetes namespace metadata {pull}27045[27045]
- Allow conditional processing in `decode_xml` and `decode_xml_wineventlog`. {pull}27159[27159]

Expand Down
43 changes: 29 additions & 14 deletions metricbeat/helper/windows/pdh/pdh_query_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"syscall"
"unsafe"

"golang.org/x/sys/windows"

"github.com/pkg/errors"
)

Expand Down Expand Up @@ -172,6 +174,11 @@ func (q *Query) CollectData() error {
return PdhCollectQueryData(q.Handle)
}

// CollectData collects the value for all counters in the query.
func (q *Query) CollectDataEx(interval uint32, event windows.Handle) error {
return PdhCollectQueryDataEx(q.Handle, interval, event)
}

// GetFormattedCounterValues returns an array of formatted values for a query.
func (q *Query) GetFormattedCounterValues() (map[string][]CounterValue, error) {
if q.Counters == nil || len(q.Counters) == 0 {
Expand Down Expand Up @@ -209,23 +216,31 @@ func (q *Query) ExpandWildCardPath(wildCardPath string) ([]string, error) {

// PdhExpandWildCardPath will not return the counter paths for windows 32 bit systems but PdhExpandCounterPath will.
if runtime.GOARCH == "386" {
expdPaths, err = PdhExpandCounterPath(utfPath)
if expdPaths, err = PdhExpandCounterPath(utfPath); err != nil {
return nil, err
}
if expdPaths == nil {
return nil, errors.New("no counter paths found")
}
return UTF16ToStringArray(expdPaths), nil
} else {
expdPaths, err = PdhExpandWildCardPath(utfPath)
// rarely the PdhExpandWildCardPathW will not retrieve the expanded buffer size initially so the next call will encounter the PDH_MORE_DATA error since the specified size on the input is still less than
// the required size. If this is the case we will fallback on the PdhExpandCounterPathW api since it looks to act in a more stable manner. The PdhExpandCounterPathW api does come with some limitations but will
// satisfy most cases and return valid paths.
if err == PDH_MORE_DATA {
expdPaths, err = PdhExpandCounterPath(utfPath)
if expdPaths, err = PdhExpandWildCardPath(utfPath); err != nil {
return nil, err
}
paths := UTF16ToStringArray(expdPaths)
// in several cases ExpandWildCardPath win32 api seems to return initial wildcard without any errors, adding some waiting time between the 2 ExpandWildCardPath api calls seems to be succesfull but that will delay data retrieval
// A call is triggered again
if len(paths) == 1 && strings.Contains(paths[0], "*") && paths[0] == wildCardPath {
expdPaths, err = PdhExpandWildCardPath(utfPath)
if err == nil {
return paths, err
}
} else {
return paths, err
}
}
if err != nil {
return nil, err
}
if expdPaths == nil {
return nil, errors.New("no counter paths found")
}
return UTF16ToStringArray(expdPaths), nil

return nil, PdhErrno(syscall.ERROR_NOT_FOUND)
}

// Close closes the query and all of its counters.
Expand Down
15 changes: 12 additions & 3 deletions metricbeat/helper/windows/pdh/pdh_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
//sys _PdhAddCounter(query PdhQueryHandle, counterPath string, userData uintptr, counter *PdhCounterHandle) (errcode error) [failretval!=0] = pdh.PdhAddCounterW
//sys _PdhRemoveCounter(counter PdhCounterHandle) (errcode error) [failretval!=0] = pdh.PdhRemoveCounter
//sys _PdhCollectQueryData(query PdhQueryHandle) (errcode error) [failretval!=0] = pdh.PdhCollectQueryData
//sys _PdhCollectQueryDataEx(query PdhQueryHandle, interval uint32, event windows.Handle) (errcode error) [failretval!=0] = pdh.PdhCollectQueryDataEx
//sys _PdhGetFormattedCounterValueDouble(counter PdhCounterHandle, format PdhCounterFormat, counterType *uint32, value *PdhCounterValueDouble) (errcode error) [failretval!=0] = pdh.PdhGetFormattedCounterValue
//sys _PdhGetFormattedCounterValueLarge(counter PdhCounterHandle, format PdhCounterFormat, counterType *uint32, value *PdhCounterValueLarge) (errcode error) [failretval!=0] = pdh.PdhGetFormattedCounterValue
//sys _PdhGetFormattedCounterValueLong(counter PdhCounterHandle, format PdhCounterFormat, counterType *uint32, value *PdhCounterValueLong) (errcode error) [failretval!=0]= pdh.PdhGetFormattedCounterValue
Expand Down Expand Up @@ -156,6 +157,14 @@ func PdhCollectQueryData(query PdhQueryHandle) error {
return nil
}

// PdhCollectQueryDataEx collects the current raw data value for all counters in the specified query.
func PdhCollectQueryDataEx(query PdhQueryHandle, interval uint32, event windows.Handle) error {
if err := _PdhCollectQueryDataEx(query, interval, event); err != nil {
return PdhErrno(err.(syscall.Errno))
}
return nil
}

// PdhGetFormattedCounterValueDouble computes a displayable double value for the specified counter.
func PdhGetFormattedCounterValueDouble(counter PdhCounterHandle) (uint32, *PdhCounterValueDouble, error) {
var counterType uint32
Expand Down Expand Up @@ -197,12 +206,12 @@ func PdhExpandWildCardPath(utfPath *uint16) ([]uint16, error) {
return nil, PdhErrno(err.(syscall.Errno))
}
expandPaths := make([]uint16, bufferSize)
if err := _PdhExpandWildCardPath(nil, utfPath, &expandPaths[0], &bufferSize); err != nil {
if err = _PdhExpandWildCardPath(nil, utfPath, &expandPaths[0], &bufferSize); err != nil {
return nil, PdhErrno(err.(syscall.Errno))
}
return expandPaths, nil
return expandPaths, err
}
return nil, nil
return nil, PdhErrno(syscall.ERROR_NOT_FOUND)
}

// PdhExpandCounterPath returns counter paths that match the given counter path, for 32 bit windows.
Expand Down
11 changes: 9 additions & 2 deletions metricbeat/helper/windows/pdh/zpdh_windows.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions metricbeat/module/windows/perfmon/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ event. In the above example, this will cause the physical_disk.write.per_sec
and physical_disk.write.time.pct measurements to be sent as a single event.
The default behaviour is for all measurements to be sent as separate events.

*`refresh_wildcard_counters`*:: A boolean option to refresh the counter list at each fetch. By default, the counter list will be retrieved at the starting time, to refresh the list at each fetch, users will have to enable this setting.

*`counters`*:: Counters specifies a list of queries to perform. Each individual
counter requires three config options - `instance_label`, `measurement_label`,
and `query`.
Expand Down
14 changes: 9 additions & 5 deletions metricbeat/module/windows/perfmon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package perfmon

import (
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
Expand All @@ -29,11 +31,13 @@ var allowedFormats = []string{"float", "large", "long"}

// Config for the windows perfmon metricset.
type Config struct {
IgnoreNECounters bool `config:"perfmon.ignore_non_existent_counters"`
GroupMeasurements bool `config:"perfmon.group_measurements_by_instance"`
Counters []Counter `config:"perfmon.counters"`
Queries []Query `config:"perfmon.queries"`
GroupAllCountersTo string `config:"perfmon.group_all_counter"`
Period time.Duration `config:"period" validate:"required"`
IgnoreNECounters bool `config:"perfmon.ignore_non_existent_counters"`
GroupMeasurements bool `config:"perfmon.group_measurements_by_instance"`
RefreshWildcardCounters bool `config:"perfmon.refresh_wildcard_counters"`
Counters []Counter `config:"perfmon.counters"`
Queries []Query `config:"perfmon.queries"`
GroupAllCountersTo string `config:"perfmon.group_all_counter"`
}

// Counter for the perfmon counters (old implementation deprecated).
Expand Down
10 changes: 0 additions & 10 deletions metricbeat/module/windows/perfmon/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ func (re *Reader) groupToEvents(counters map[string][]pdh.CounterValue) []mb.Eve
// Some counters, such as rate counters, require two counter values in order to compute a displayable value. In this case we must call PdhCollectQueryData twice before calling PdhGetFormattedCounterValue.
// For more information, see Collecting Performance Data (https://docs.microsoft.com/en-us/windows/desktop/PerfCtrs/collecting-performance-data).
if val.Err.Error != nil {
if !re.executed {
re.log.Debugw("Ignoring the first measurement because the data isn't ready",
"error", val.Err.Error, logp.Namespace("perfmon"), "query", counterPath)
continue
}
// The counter has a negative value or the counter was successfully found, but the data returned is not valid.
// This error can occur if the counter value is less than the previous value. (Because counter values always increment, the counter value rolls over to zero when it reaches its maximum value.)
// This is not an error that stops the application from running successfully and a positive counter value should be retrieved in the later calls.
Expand Down Expand Up @@ -118,11 +113,6 @@ func (re *Reader) groupToSingleEvent(counters map[string][]pdh.CounterValue) mb.
// Some counters, such as rate counters, require two counter values in order to compute a displayable value. In this case we must call PdhCollectQueryData twice before calling PdhGetFormattedCounterValue.
// For more information, see Collecting Performance Data (https://docs.microsoft.com/en-us/windows/desktop/PerfCtrs/collecting-performance-data).
if val.Err.Error != nil {
if !re.executed {
re.log.Debugw("Ignoring the first measurement because the data isn't ready",
"error", val.Err, logp.Namespace("perfmon"), "query", counterPath)
continue
}
if val.Err.Error == pdh.PDH_CALC_NEGATIVE_VALUE || val.Err.Error == pdh.PDH_INVALID_DATA {
re.log.Debugw("Counter value retrieval returned",
"error", val.Err.Error, "cstatus", pdh.PdhErrno(val.Err.CStatus), logp.Namespace("perfmon"), "query", counterPath)
Expand Down
10 changes: 4 additions & 6 deletions metricbeat/module/windows/perfmon/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ func TestGroupToEvents(t *testing.T) {
config: Config{
GroupMeasurements: true,
},
query: pdh.Query{},
executed: true,
log: nil,
query: pdh.Query{},
log: nil,
counters: []PerfCounter{
{
QueryField: "datagrams_sent_per_sec",
Expand Down Expand Up @@ -149,9 +148,8 @@ func TestGroupToEvents(t *testing.T) {

func TestGroupToSingleEvent(t *testing.T) {
reader := Reader{
query: pdh.Query{},
executed: true,
log: nil,
query: pdh.Query{},
log: nil,
config: Config{
GroupAllCountersTo: "processor_count",
},
Expand Down
4 changes: 1 addition & 3 deletions metricbeat/module/windows/perfmon/perfmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,10 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
if len(m.reader.query.Counters) == 0 {
m.log.Error("no counter paths were found")
}

// refresh performance counter list
// Some counters, such as rate counters, require two counter values in order to compute a displayable value. In this case we must call PdhCollectQueryData twice before calling PdhGetFormattedCounterValue.
// For more information, see Collecting Performance Data (https://docs.microsoft.com/en-us/windows/desktop/PerfCtrs/collecting-performance-data).
// A flag is set if the second call has been executed else refresh will fail (reader.executed)
if m.reader.executed {
if m.reader.config.RefreshWildcardCounters {
err := m.reader.RefreshCounterPaths()
if err != nil {
return errors.Wrap(err, "failed retrieving counters")
Expand Down
49 changes: 46 additions & 3 deletions metricbeat/module/windows/perfmon/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@ import (
"fmt"
"regexp"
"strings"
"time"
"unicode"

"github.com/elastic/beats/v7/metricbeat/helper/windows/pdh"

"github.com/pkg/errors"

"math/rand"

"golang.org/x/sys/windows"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/mb"
)
Expand All @@ -44,7 +49,6 @@ const (
// Reader will contain the config options
type Reader struct {
query pdh.Query // PDH Query
executed bool // Indicates if the query has been executed.
log *logp.Logger //
config Config // Metricset configuration
counters []PerfCounter
Expand Down Expand Up @@ -108,7 +112,7 @@ func (re *Reader) Read() ([]mb.Event, error) {
}

// Get the values.
values, err := re.query.GetFormattedCounterValues()
values, err := re.getValues()
if err != nil {
return nil, errors.Wrap(err, "failed formatting counter values")
}
Expand All @@ -120,10 +124,49 @@ func (re *Reader) Read() ([]mb.Event, error) {
} else {
events = re.groupToEvents(values)
}
re.executed = true
return events, nil
}

func (re *Reader) getValues() (map[string][]pdh.CounterValue, error) {
var val map[string][]pdh.CounterValue
rand.Seed(time.Now().UnixNano())
title := windows.StringToUTF16Ptr("metricbeat_perfmon" + randSeq(5))
event, err := windows.CreateEvent(nil, 0, 0, title)
if err != nil {
return nil, err
}
defer windows.CloseHandle(event)
err = re.query.CollectDataEx(uint32(re.config.Period.Seconds()), event)
if err != nil {
return nil, err
}
waitFor, err := windows.WaitForSingleObject(event, windows.INFINITE)
if err != nil {
return nil, err
}
switch waitFor {
case windows.WAIT_OBJECT_0:
val, err = re.query.GetFormattedCounterValues()
if err != nil {
return nil, err
}
case windows.WAIT_FAILED:
return nil, errors.New("WaitForSingleObject has failed")
default:
return nil, errors.New("WaitForSingleObject was abandoned or still waiting for completion")
}
return val, err
}

func randSeq(n int) string {
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
b := make([]rune, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}

// Close will close the PDH query for now.
func (re *Reader) Close() error {
return re.query.Close()
Expand Down
5 changes: 2 additions & 3 deletions metricbeat/module/windows/perfmon/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ import (

func TestGetCounter(t *testing.T) {
reader := Reader{
query: pdh.Query{},
executed: true,
log: nil,
query: pdh.Query{},
log: nil,
counters: []PerfCounter{
{
QueryField: "datagrams_sent_per_sec",
Expand Down

0 comments on commit 12c8dae

Please sign in to comment.