Skip to content

Commit

Permalink
winlogbeat/eventlog: add source_lag_count metric
Browse files Browse the repository at this point in the history
This only operates on Windows hosts and is best effort only.
  • Loading branch information
efd6 committed Dec 5, 2022
1 parent c4c211f commit 6a9e46f
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 10 deletions.
3 changes: 2 additions & 1 deletion winlogbeat/docs/configuring-howto.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ These metrics are exposed under the `/dataset` path.
| `errors_total` | Total number of errors encountered by the input.
| `batch_read_period` | A histogram of intervals between non-empty event batch reads.
| `received_events_count` | A histogram of the number of events read in each batch.
| `source_lag_time` | The difference between the timestamp recorded in each event and the time when it was read.
| `source_lag_count` | A histogram of the difference between the consumer's log offset and the producer's log offset.
| `source_lag_time` | A histogram of the difference between the timestamp recorded in each event and the time when it was read.
|=======

include::{libbeat-dir}/shared-instrumentation.asciidoc[]
Expand Down
71 changes: 68 additions & 3 deletions winlogbeat/eventlog/eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ package eventlog

import (
"expvar"
"io"
"runtime"
"strconv"
"syscall"
"time"

"github.com/rcrowley/go-metrics"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic" // TODO: Replace with sync/atomic when go1.19 is supported.
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/winlogbeat/checkpoint"
"github.com/elastic/beats/v7/winlogbeat/sys"
"github.com/elastic/beats/v7/winlogbeat/sys/winevent"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -144,30 +148,38 @@ func incrementMetric(v *expvar.Map, key interface{}) {
}
}

// defaultLagPolling is the default polling period for inputMetrics.sourceLagN.
const defaultLagPolling = time.Minute

// inputMetrics handles event log metric reporting.
type inputMetrics struct {
unregister func()
done chan struct{}

lastBatch time.Time
lastBatch time.Time
lastRecordID *atomic.Uint64

name *monitoring.String // name of the provider being read
events *monitoring.Uint // total number of events received
dropped *monitoring.Uint // total number of discarded events
errors *monitoring.Uint // total number of errors
batchSize metrics.Sample // histogram of the number of events in each non-zero batch
sourceLag metrics.Sample // histogram of the difference between timestamped event's creation and reading
sourceLagN metrics.Sample // histogram of difference between the consumer's log offset and the producer's log offset
batchPeriod metrics.Sample // histogram of the elapsed time between non-zero batch reads
}

// newInputMetrics returns an input metric for windows event logs. If id is empty
// a nil inputMetric is returned.
func newInputMetrics(name, id string) *inputMetrics {
// a nil inputMetric is returned. The ID delta between OS events and read events
// will be polled each poll duration.
func newInputMetrics(name, id string, poll time.Duration) *inputMetrics {
if id == "" {
return nil
}
reg, unreg := inputmon.NewInputRegistry(name, id, nil)
out := &inputMetrics{
unregister: unreg,
done: make(chan struct{}),
name: monitoring.NewString(reg, "provider"),
events: monitoring.NewUint(reg, "received_events_total"),
dropped: monitoring.NewUint(reg, "discarded_events_total"),
Expand All @@ -184,6 +196,13 @@ func newInputMetrics(name, id string) *inputMetrics {
_ = adapter.NewGoMetrics(reg, "batch_read_period", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.batchPeriod))

if poll > 0 && runtime.GOOS == "windows" {
out.lastRecordID = &atomic.Uint64{}
_ = adapter.NewGoMetrics(reg, "source_lag_count", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.sourceLagN))
go out.poll(poll)
}

return out
}

Expand All @@ -201,6 +220,9 @@ func (m *inputMetrics) log(batch []Record) {
m.batchPeriod.Update(now.Sub(m.lastBatch).Nanoseconds())
}
m.lastBatch = now
if m.lastRecordID != nil {
m.lastRecordID.Store(batch[len(batch)-1].RecordID)
}

m.events.Add(uint64(len(batch)))
m.batchSize.Update(int64(len(batch)))
Expand Down Expand Up @@ -232,9 +254,52 @@ func (m *inputMetrics) logDropped(_ error) {
m.dropped.Inc()
}

// poll gets the oldest event held in the system event log each time.Duration
// and logs the difference between its record ID and the record ID of the oldest
// event that has been read by the input, logging the difference to the
// source_lag_count metric. Polling is best effort only and no metrics are logged
// for the operations required to get the event record.
func (m *inputMetrics) poll(each time.Duration) {
const renderBufferSize = 1 << 14
var (
work [renderBufferSize]byte
buf = sys.NewByteBuffer(renderBufferSize)
)
t := time.NewTicker(each)
for {
select {
case <-t.C:
oldest, err := oldestEvent(work[:], buf)
if err != nil {
if err == io.EOF {
// We are up-to-date.
m.sourceLagN.Update(0)
} else {
m.logError(err)
}
continue
}
delta := int64(oldest.RecordID - m.lastRecordID.Load())
if delta < 0 {
// We have lost a race with the reader goroutine
// so we are completely up-to-date.
delta = 0
}
m.sourceLagN.Update(delta)
case <-m.done:
t.Stop()
return
}
}
}

func (m *inputMetrics) close() {
if m == nil {
return
}
if m.lastRecordID != nil {
// Shut down poller and wait until done before unregistering metrics.
m.done <- struct{}{}
}
m.unregister()
}
31 changes: 31 additions & 0 deletions winlogbeat/eventlog/poll_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build !windows
// +build !windows

package eventlog

import (
"github.com/elastic/beats/v7/winlogbeat/sys"
"github.com/elastic/beats/v7/winlogbeat/sys/winevent"
)

// oldestEvent is a no-op on non-Windows systems.
func oldestEvent(_ []byte, _ *sys.ByteBuffer) (winevent.Event, error) {
return winevent.Event{}, nil
}
72 changes: 72 additions & 0 deletions winlogbeat/eventlog/poll_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build windows
// +build windows

package eventlog

import (
"errors"
"io"

"golang.org/x/sys/windows"

"github.com/elastic/beats/v7/winlogbeat/sys"
"github.com/elastic/beats/v7/winlogbeat/sys/winevent"
win "github.com/elastic/beats/v7/winlogbeat/sys/wineventlog"
)

// oldestEvent returns the oldest event held in the system event log.
func oldestEvent(work []byte, buf *sys.ByteBuffer) (winevent.Event, error) {
event, err := windows.CreateEvent(nil, 0, 0, nil)
if err != nil {
return winevent.Event{}, err
}
defer windows.CloseHandle(event)

s, err := win.Subscribe(0, event, "", "", 0, win.EvtSubscribeStartAtOldestRecord)
if err != nil {
return winevent.Event{}, err
}

h, err := win.EventHandles(s, 1)
switch err { //nolint:errorlint // This is an errno or nil.
case nil:
case win.ERROR_NO_MORE_ITEMS:
// Shim to error that is not guarded by a go:build windows directive.
return winevent.Event{}, io.EOF
default:
return winevent.Event{}, err
}

buf.Reset()
err = win.RenderEventXML(h[0], work, buf)
var bufErr sys.InsufficientBufferError
if errors.As(err, &bufErr) {
// Don't retain work buffer that are over the 16kiB
// allocation; we are calling this infrequently, and
// mostly won't need to work above this value.
work = make([]byte, bufErr.RequiredSize)
buf.Reset()
err = win.RenderEventXML(h[0], work, buf)
}
if err != nil && buf.Len() == 0 {
return winevent.Event{}, err
}
return winevent.UnmarshalXML(buf.Bytes())
}
14 changes: 9 additions & 5 deletions winlogbeat/eventlog/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ func newWinEventLog(options *conf.C) (EventLog, error) {
c.Name = filepath.Clean(c.Name)
}

forwarded := (c.Forwarded == nil && c.Name == "ForwardedEvents") || (c.Forwarded != nil && *c.Forwarded)

var poll time.Duration
if !forwarded {
poll = defaultLagPolling
}
l := &winEventLog{
id: id,
config: c,
Expand All @@ -253,19 +259,17 @@ func newWinEventLog(options *conf.C) (EventLog, error) {
cache: newMessageFilesCache(id, eventMetadataHandle, freeHandle),
winMetaCache: newWinMetaCache(metaTTL),
logPrefix: fmt.Sprintf("WinEventLog[%s]", id),
metrics: newInputMetrics(c.Name, id),
metrics: newInputMetrics(c.Name, id, poll),
}

// Forwarded events should be rendered using RenderEventXML. It is more
// efficient and does not attempt to use local message files for rendering
// the event's message.
switch {
case c.Forwarded == nil && c.Name == "ForwardedEvents",
c.Forwarded != nil && *c.Forwarded:
if forwarded {
l.render = func(event win.EvtHandle, out io.Writer) error {
return win.RenderEventXML(event, l.renderBuf, out)
}
default:
} else {
l.render = func(event win.EvtHandle, out io.Writer) error {
return win.RenderEvent(event, c.EventLanguage, l.renderBuf, l.cache.get, out)
}
Expand Down
8 changes: 7 additions & 1 deletion winlogbeat/eventlog/wineventlog_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"io"
"os"
"path/filepath"
"time"

"go.uber.org/multierr"
"golang.org/x/sys/windows"
Expand Down Expand Up @@ -121,6 +122,11 @@ func newWinEventLogExp(options *conf.C) (EventLog, error) {
return nil, err
}

var poll time.Duration
forwarded := (c.Forwarded == nil && c.Name == "ForwardedEvents") || (c.Forwarded != nil && *c.Forwarded)
if !forwarded {
poll = defaultLagPolling
}
l := &winEventLogExp{
config: c,
query: xmlQuery,
Expand All @@ -130,7 +136,7 @@ func newWinEventLogExp(options *conf.C) (EventLog, error) {
maxRead: c.BatchReadSize,
renderer: renderer,
log: log,
metrics: newInputMetrics(c.Name, id),
metrics: newInputMetrics(c.Name, id, poll),
}

return l, nil
Expand Down

0 comments on commit 6a9e46f

Please sign in to comment.